opencode/backend/app/processing.py

218 lines
9.4 KiB
Python
Raw Normal View History

# app/processing.py
"""
Интеграция с существующими скриптами OCR, QC, DZI.
Запускает pipeline в фоне и сохраняет результаты в БД.
"""
import os
import sys
import json
import re
import subprocess
import shutil
from pathlib import Path
from typing import Optional
from sqlalchemy.orm import Session
# Добавить корень проекта в path для импорта скриптов
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import app.crud as crud
import app.models as models
def run_pipeline(project_id: int, pdf_path: Path, output_base: Path, db: Session, use_tiling: bool = False):
"""Запускает полный pipeline: PDF → OCR → Layout → Elements → QC → DZI → БД."""
project = crud.get_project(db, project_id)
if not project:
return
crud.update_project_status(db, project_id, "processing")
try:
# 1. Создать output папку
output_folder = output_base / f"project_{project_id}"
output_folder.mkdir(parents=True, exist_ok=True)
script_dir = Path(__file__).parent.parent.parent # backend/app -> backend -> opencode
# 2. OCR + PNG (RapidOCR или Tiling OCR)
cmd = [sys.executable, str(script_dir / "process_any_pdf.py"), str(pdf_path), str(output_folder)]
if use_tiling:
cmd.append("--use-tiling")
print("[INFO] Tiling OCR enabled")
_run_command(cmd, cwd=str(script_dir))
# 3. Layout Detection для каждой оригинальной страницы
# Только page_NNN.png, исключая визуализации (_dims, _layout и т.д.)
page_pngs = sorted([p for p in output_folder.glob("page_*.png")
if not any(suffix in p.stem for suffix in ["_dims", "_layout", "_detected", "_preproc", "_ocr_compare"])])
for png in page_pngs:
ocr_json = output_folder / "full_ocr_results.json"
if ocr_json.exists():
try:
_run_command([
sys.executable, str(script_dir / "layout_detector.py"),
str(png), str(ocr_json)
], cwd=str(script_dir))
print(f"[INFO] Layout detection done for {png.name}")
except Exception as e:
print(f"[WARN] Layout detection failed for {png.name}: {e}")
# 4. Multi-Element Extraction (dimensions, positions, GOSTs, etc.)
for png in page_pngs:
ocr_json = output_folder / "full_ocr_results.json"
layout_json = output_folder / "layout.json"
if ocr_json.exists() and layout_json.exists():
try:
_run_command([
sys.executable, str(script_dir / "multi_element_extractor.py"),
str(png), str(ocr_json), str(layout_json)
], cwd=str(script_dir))
print(f"[INFO] Element extraction done for {png.name}")
except Exception as e:
print(f"[WARN] Element extraction failed for {png.name}: {e}")
# 5. QC (dimension_qc_checker.py) — правила
_run_command([
sys.executable, str(script_dir / "dimension_qc_checker.py"),
str(output_folder)
], cwd=str(script_dir))
# 6. DZI для каждой страницы
for png in page_pngs:
_run_command([
sys.executable, str(script_dir / "generate_dzi.py"),
str(png)
], cwd=str(script_dir))
# 7. Проверить результаты
ocr_path = output_folder / "full_ocr_results.json"
if not ocr_path.exists():
raise RuntimeError(f"OCR results not generated: {ocr_path}")
# 8. Загрузить в БД
_import_results(db, project_id, output_folder)
crud.update_project_status(db, project_id, "completed", output_folder=str(output_folder))
except Exception as e:
crud.update_project_status(db, project_id, "error", error_message=str(e))
raise
def _run_command(cmd: list, cwd: Optional[Path] = None):
"""Запускает команду, проверяет exit code, выбрасывает исключение при ошибке."""
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=cwd
)
if result.returncode != 0:
stderr = result.stderr[:1000]
stdout = result.stdout[:500]
raise RuntimeError(f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDERR: {stderr}\nSTDOUT: {stdout}")
return result
def _import_results(db: Session, project_id: int, output_folder: Path):
"""Импорт OCR и QC результатов в БД. Очищает старые данные проекта перед импортом."""
# Очистить старые страницы и замечания (cascade удалит issues и feedback)
db.query(models.Page).filter(models.Page.project_id == project_id).delete(synchronize_session=False)
db.query(models.Issue).filter(models.Issue.project_id == project_id).delete(synchronize_session=False)
db.commit()
# Загрузить OCR
ocr_path = output_folder / "full_ocr_results.json"
if ocr_path.exists():
ocr = json.loads(ocr_path.read_text(encoding="utf-8"))
for page_data in ocr.get("pages", []):
page_num = page_data["page_number"]
png_path = output_folder / f"page_{page_num:03d}.png"
page = crud.create_page(
db, project_id=project_id, page_number=page_num,
png_path=str(png_path) if png_path.exists() else None,
ocr_data=page_data
)
# Загрузить VLM extraction descriptions в pages
vlm_path = output_folder / "vlm_extraction.json"
vlm_data = {}
if vlm_path.exists():
vlm_data = json.loads(vlm_path.read_text(encoding="utf-8"))
# Обновить vlm_description для каждой страницы
for img_name, extraction in vlm_data.items():
page_num_match = re.search(r"page_(\d+)", img_name)
if page_num_match:
page_num = int(page_num_match.group(1))
page = crud.get_page_by_number(db, project_id, page_num)
if page:
desc = extraction.get("description", "")
page.vlm_description = desc[:2000] if desc else None # ограничим длину
db.commit()
# Загрузить QC issues (только rules — VLM issues удалены как ненадёжные)
qc_path = output_folder / "dimension_qc_report.json"
if qc_path.exists():
qc = json.loads(qc_path.read_text(encoding="utf-8"))
for severity in ["errors", "warnings", "infos"]:
for item in qc.get(severity, []):
page_num = item.get("page")
if not page_num:
continue
page = crud.get_page_by_number(db, project_id, page_num)
# Извлечь bbox
bbox = item.get("bbox") or item.get("bbox1") or item.get("bbox_dim")
x1 = y1 = x2 = y2 = None
if bbox:
if isinstance(bbox[0], list):
xs = [p[0] for p in bbox]
ys = [p[1] for p in bbox]
x1, y1, x2, y2 = min(xs), min(ys), max(xs), max(ys)
else:
x1, y1, x2, y2 = bbox[0], bbox[1], bbox[2], bbox[3]
crud.create_issue(
db, project_id=project_id, page_id=page.id if page else None,
issue_type=item.get("type", "UNKNOWN"),
severity=item.get("severity", "warning"),
message=item.get("message", ""),
bbox_x1=x1, bbox_y1=y1, bbox_x2=x2, bbox_y2=y2,
dimension_text=item.get("text"),
confidence=item.get("confidence"),
source="rules",
extra_data={k: v for k, v in item.items() if k not in ["type", "severity", "message", "page", "text", "confidence", "bbox", "bbox1", "bbox2", "bbox_dim", "source"]}
)
def generate_viewer_html(db: Session, project_id: int, page_number: int) -> Optional[str]:
"""Генерирует HTML viewer для конкретной страницы."""
project = crud.get_project(db, project_id)
if not project or not project.output_folder:
return None
output_folder = Path(project.output_folder)
# Перегенерировать viewer для нужной страницы
# Запускаем из папки, где находится generate_web_viewer.py
script_dir = Path(__file__).parent.parent.parent # backend/app -> backend -> opencode
result = _run_command([
sys.executable, str(script_dir / "generate_web_viewer.py"), str(output_folder), str(page_number)
], cwd=str(script_dir))
if result.returncode != 0:
print(f"[ERROR] generate_web_viewer.py failed: {result.stderr[:500]}")
return None
viewer_path = output_folder / "web_viewer" / "index.html"
if viewer_path.exists():
return str(viewer_path)
return None