"""Document ingestion worker pipeline.""" import asyncio import json import shutil from datetime import datetime from pathlib import Path from typing import Any, Dict from backend.paths import org_documents_dir, org_qmd_root, write_folder_project_meta from src.config import load_config, resolve_opencode_credentials from src.ingest.classify import classify_document from src.ingest.formatter import format_global_index_document, format_index_document from src.ingest.router import extract_document, is_extractable from src.ingest.stub_writer import write_stub from src.rag.qmd.indexer import qmd_index_document async def process_document_ingest(job: Dict[str, Any], tasks: dict, send_progress): task_id = job["task_id"] file_path = Path(job["file_path"]) org_slug = job["org_slug"] project_slug = job["project_slug"] doc_type = job.get("doc_type", "other") display_name = job.get("display_name", file_path.name) tasks[task_id].update({"status": "processing", "message": "Извлечение текста...", "progress": 10}) await send_progress(task_id, 10, "Извлечение текста...", "processing") try: config = load_config() ingest_cfg = config.get("ingest", {}) pdf_ocr = ingest_cfg.get("pdf_ocr", True) documents_dir = org_documents_dir(org_slug) output_dir = documents_dir / f"doc_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{task_id[-8:]}" await asyncio.to_thread(output_dir.mkdir, parents=True, exist_ok=True) original_dest = output_dir / file_path.name await asyncio.to_thread(shutil.copy2, file_path, original_dest) await asyncio.to_thread(write_folder_project_meta, output_dir, project_slug) if is_extractable(file_path.name): doc = await asyncio.to_thread( extract_document, file_path, project_slug, doc_type, None, pdf_ocr, ) if not doc.full_text.strip(): raise ValueError("Не удалось извлечь текст из документа") await asyncio.to_thread( (output_dir / "extracted.md").write_text, doc.full_text, encoding="utf-8", ) else: try: stub = await asyncio.to_thread( write_stub, file_path, project_slug ) print(f"[Ingest] {task_id}: создан stub {stub.name}") except (FileNotFoundError, OSError) as exc: print(f"[Ingest] {task_id}: stub_writer failed: {exc}") doc = type("StubDoc", (), {})() doc.full_text = "" doc.document_id = output_dir.name doc.filename = file_path.name doc.doc_type = doc_type doc.metadata = {} doc.to_metadata_dict = lambda: { "document_id": doc.document_id, "filename": doc.filename, "doc_type": doc.doc_type, "project": project_slug, "stub": True, } tasks[task_id].update({"status": "postprocessing", "message": "Анализ документа...", "progress": 40}) await send_progress(task_id, 40, "Анализ документа...", "postprocessing") metadata = doc.to_metadata_dict() if callable(getattr(doc, "to_metadata_dict", None)) else {} rag_cfg = config.get("rag", {}) api_key, base_url = resolve_opencode_credentials(config) if api_key and ingest_cfg.get("auto_classify", True) and doc.full_text: metadata = await classify_document( text=doc.full_text, project=project_slug, doc_type_hint=doc_type, api_key=api_key, base_url=base_url, model=rag_cfg.get("index_model", "mimo-v2.5-free"), chunk_size=int(rag_cfg.get("classify_chunk_size", 7000)), ) metadata["filename"] = doc.filename metadata["document_id"] = doc.document_id await asyncio.to_thread( (output_dir / "metadata.json").write_text, json.dumps(metadata, ensure_ascii=False, indent=2), encoding="utf-8", ) if doc.full_text: doc_text = format_index_document(doc, metadata) index_path = output_dir / "index.txt" await asyncio.to_thread(index_path.write_text, doc_text, encoding="utf-8") else: doc_text = "" index_path = None result_data = { "document_id": getattr(doc, "document_id", output_dir.name), "dir": str(output_dir), "rel_dir": str(output_dir.relative_to(documents_dir)), "extracted": str(output_dir / "extracted.md") if (output_dir / "extracted.md").exists() else None, "index": str(index_path) if index_path else None, "project": project_slug, "doc_type": metadata.get("doc_type", doc_type), "kind": "document", } if rag_cfg.get("enabled", False) and rag_cfg.get("auto_index", True): tasks[task_id].update({"message": "Индексация в qmd...", "progress": 75}) await send_progress(task_id, 75, "Индексация в qmd...", "postprocessing") try: await qmd_index_document( org_slug=org_slug, project_slug=project_slug, document_dir=output_dir, extracted_md=output_dir / "extracted.md", ) except Exception as idx_exc: print(f"[Ingest] {task_id}: qmd index failed: {idx_exc}") from backend.queue import _cleanup_upload await asyncio.to_thread(_cleanup_upload, file_path) tasks[task_id].update({ "status": "completed", "progress": 100, "message": "Документ проиндексирован", "result": result_data, "finished": datetime.now().isoformat(), }) await send_progress(task_id, 100, "Документ проиндексирован", "completed", result=result_data) except Exception as e: error_msg = str(e) tasks[task_id].update({ "status": "error", "message": f"Ошибка: {error_msg}", "error": error_msg, }) await send_progress(task_id, 0, f"Ошибка: {error_msg}", "error", error=error_msg)