# app/processing.py """ Интеграция с существующими скриптами OCR, QC, DZI. Запускает pipeline в фоне и сохраняет результаты в БД. """ import os import sys import json import re import subprocess import shutil from pathlib import Path from typing import Optional from sqlalchemy.orm import Session # Добавить корень проекта в path для импорта скриптов sys.path.insert(0, str(Path(__file__).parent.parent.parent)) import app.crud as crud import app.models as models def run_pipeline(project_id: int, pdf_path: Path, output_base: Path, db: Session, use_tiling: bool = False): """Запускает полный pipeline: PDF → OCR → Layout → Elements → QC → DZI → БД.""" project = crud.get_project(db, project_id) if not project: return crud.update_project_status(db, project_id, "processing") try: # 1. Создать output папку output_folder = output_base / f"project_{project_id}" output_folder.mkdir(parents=True, exist_ok=True) script_dir = Path(__file__).parent.parent.parent # backend/app -> backend -> opencode # 2. OCR + PNG (RapidOCR или Tiling OCR) cmd = [sys.executable, str(script_dir / "process_any_pdf.py"), str(pdf_path), str(output_folder)] if use_tiling: cmd.append("--use-tiling") print("[INFO] Tiling OCR enabled") _run_command(cmd, cwd=str(script_dir)) # 3. Layout Detection для каждой оригинальной страницы # Только page_NNN.png, исключая визуализации (_dims, _layout и т.д.) page_pngs = sorted([p for p in output_folder.glob("page_*.png") if not any(suffix in p.stem for suffix in ["_dims", "_layout", "_detected", "_preproc", "_ocr_compare"])]) for png in page_pngs: ocr_json = output_folder / "full_ocr_results.json" if ocr_json.exists(): try: _run_command([ sys.executable, str(script_dir / "layout_detector.py"), str(png), str(ocr_json) ], cwd=str(script_dir)) print(f"[INFO] Layout detection done for {png.name}") except Exception as e: print(f"[WARN] Layout detection failed for {png.name}: {e}") # 4. Multi-Element Extraction (dimensions, positions, GOSTs, etc.) for png in page_pngs: ocr_json = output_folder / "full_ocr_results.json" layout_json = output_folder / "layout.json" if ocr_json.exists() and layout_json.exists(): try: _run_command([ sys.executable, str(script_dir / "multi_element_extractor.py"), str(png), str(ocr_json), str(layout_json) ], cwd=str(script_dir)) print(f"[INFO] Element extraction done for {png.name}") except Exception as e: print(f"[WARN] Element extraction failed for {png.name}: {e}") # 5. QC (dimension_qc_checker.py) — правила _run_command([ sys.executable, str(script_dir / "dimension_qc_checker.py"), str(output_folder) ], cwd=str(script_dir)) # 6. DZI для каждой страницы for png in page_pngs: _run_command([ sys.executable, str(script_dir / "generate_dzi.py"), str(png) ], cwd=str(script_dir)) # 7. Проверить результаты ocr_path = output_folder / "full_ocr_results.json" if not ocr_path.exists(): raise RuntimeError(f"OCR results not generated: {ocr_path}") # 8. Загрузить в БД _import_results(db, project_id, output_folder) crud.update_project_status(db, project_id, "completed", output_folder=str(output_folder)) except Exception as e: crud.update_project_status(db, project_id, "error", error_message=str(e)) raise def _run_command(cmd: list, cwd: Optional[Path] = None): """Запускает команду, проверяет exit code, выбрасывает исключение при ошибке.""" result = subprocess.run( cmd, capture_output=True, text=True, cwd=cwd ) if result.returncode != 0: stderr = result.stderr[:1000] stdout = result.stdout[:500] raise RuntimeError(f"Command failed ({result.returncode}): {' '.join(cmd)}\nSTDERR: {stderr}\nSTDOUT: {stdout}") return result def _import_results(db: Session, project_id: int, output_folder: Path): """Импорт OCR и QC результатов в БД. Очищает старые данные проекта перед импортом.""" # Очистить старые страницы и замечания (cascade удалит issues и feedback) db.query(models.Page).filter(models.Page.project_id == project_id).delete(synchronize_session=False) db.query(models.Issue).filter(models.Issue.project_id == project_id).delete(synchronize_session=False) db.commit() # Загрузить OCR ocr_path = output_folder / "full_ocr_results.json" if ocr_path.exists(): ocr = json.loads(ocr_path.read_text(encoding="utf-8")) for page_data in ocr.get("pages", []): page_num = page_data["page_number"] png_path = output_folder / f"page_{page_num:03d}.png" page = crud.create_page( db, project_id=project_id, page_number=page_num, png_path=str(png_path) if png_path.exists() else None, ocr_data=page_data ) # Загрузить VLM extraction descriptions в pages vlm_path = output_folder / "vlm_extraction.json" vlm_data = {} if vlm_path.exists(): vlm_data = json.loads(vlm_path.read_text(encoding="utf-8")) # Обновить vlm_description для каждой страницы for img_name, extraction in vlm_data.items(): page_num_match = re.search(r"page_(\d+)", img_name) if page_num_match: page_num = int(page_num_match.group(1)) page = crud.get_page_by_number(db, project_id, page_num) if page: desc = extraction.get("description", "") page.vlm_description = desc[:2000] if desc else None # ограничим длину db.commit() # Загрузить QC issues (только rules — VLM issues удалены как ненадёжные) qc_path = output_folder / "dimension_qc_report.json" if qc_path.exists(): qc = json.loads(qc_path.read_text(encoding="utf-8")) for severity in ["errors", "warnings", "infos"]: for item in qc.get(severity, []): page_num = item.get("page") if not page_num: continue page = crud.get_page_by_number(db, project_id, page_num) # Извлечь bbox bbox = item.get("bbox") or item.get("bbox1") or item.get("bbox_dim") x1 = y1 = x2 = y2 = None if bbox: if isinstance(bbox[0], list): xs = [p[0] for p in bbox] ys = [p[1] for p in bbox] x1, y1, x2, y2 = min(xs), min(ys), max(xs), max(ys) else: x1, y1, x2, y2 = bbox[0], bbox[1], bbox[2], bbox[3] crud.create_issue( db, project_id=project_id, page_id=page.id if page else None, issue_type=item.get("type", "UNKNOWN"), severity=item.get("severity", "warning"), message=item.get("message", ""), bbox_x1=x1, bbox_y1=y1, bbox_x2=x2, bbox_y2=y2, dimension_text=item.get("text"), confidence=item.get("confidence"), source="rules", extra_data={k: v for k, v in item.items() if k not in ["type", "severity", "message", "page", "text", "confidence", "bbox", "bbox1", "bbox2", "bbox_dim", "source"]} ) def generate_viewer_html(db: Session, project_id: int, page_number: int) -> Optional[str]: """Генерирует HTML viewer для конкретной страницы.""" project = crud.get_project(db, project_id) if not project or not project.output_folder: return None output_folder = Path(project.output_folder) # Перегенерировать viewer для нужной страницы # Запускаем из папки, где находится generate_web_viewer.py script_dir = Path(__file__).parent.parent.parent # backend/app -> backend -> opencode result = _run_command([ sys.executable, str(script_dir / "generate_web_viewer.py"), str(output_folder), str(page_number) ], cwd=str(script_dir)) if result.returncode != 0: print(f"[ERROR] generate_web_viewer.py failed: {result.stderr[:500]}") return None viewer_path = output_folder / "web_viewer" / "index.html" if viewer_path.exists(): return str(viewer_path) return None