transcription/backend/ingest_worker.py
keboss-m 36c9be48be Add document ingestion pipeline, chat analytics modes, and auth fixes
Ingest MD/PDF/DOCX/XLSX into org-scoped documents with classify and RAG indexing. Add compare/timeline chat modes and UI upload. Filter WebSocket progress by user ACL and normalize admin project slugs consistently.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-01 19:16:23 +03:00

134 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Document ingestion worker pipeline."""
import asyncio
import json
import shutil
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
from backend.paths import org_documents_dir, org_rag_index_dir, write_folder_project_meta
from src.config import load_config, resolve_opencode_credentials
from src.ingest.classify import classify_document
from src.ingest.formatter import format_global_index_document, format_index_document
from src.ingest.router import extract_document
from src.rag.indexer import index_meeting
async def process_document_ingest(job: Dict[str, Any], tasks: dict, send_progress):
task_id = job["task_id"]
file_path = Path(job["file_path"])
org_slug = job["org_slug"]
project_slug = job["project_slug"]
doc_type = job.get("doc_type", "other")
display_name = job.get("display_name", file_path.name)
tasks[task_id].update({"status": "processing", "message": "Извлечение текста...", "progress": 10})
await send_progress(task_id, 10, "Извлечение текста...", "processing")
try:
config = load_config()
ingest_cfg = config.get("ingest", {})
pdf_ocr = ingest_cfg.get("pdf_ocr", True)
doc = await asyncio.to_thread(
extract_document,
file_path,
project_slug,
doc_type,
None,
pdf_ocr,
)
if not doc.full_text.strip():
raise ValueError("Не удалось извлечь текст из документа")
documents_dir = org_documents_dir(org_slug)
output_dir = documents_dir / doc.document_id
await asyncio.to_thread(output_dir.mkdir, parents=True, exist_ok=True)
original_dest = output_dir / file_path.name
await asyncio.to_thread(shutil.copy2, file_path, original_dest)
await asyncio.to_thread(
(output_dir / "extracted.md").write_text,
doc.full_text,
encoding="utf-8",
)
await asyncio.to_thread(write_folder_project_meta, output_dir, project_slug)
tasks[task_id].update({"status": "postprocessing", "message": "Анализ документа...", "progress": 40})
await send_progress(task_id, 40, "Анализ документа...", "postprocessing")
metadata = doc.to_metadata_dict()
rag_cfg = config.get("rag", {})
api_key, base_url = resolve_opencode_credentials(config)
if api_key and ingest_cfg.get("auto_classify", True):
metadata = await classify_document(
text=doc.full_text,
project=project_slug,
doc_type_hint=doc_type,
api_key=api_key,
base_url=base_url,
model=rag_cfg.get("index_model", "mimo-v2.5-free"),
chunk_size=int(rag_cfg.get("classify_chunk_size", 7000)),
)
metadata["filename"] = doc.filename
metadata["document_id"] = doc.document_id
await asyncio.to_thread(
(output_dir / "metadata.json").write_text,
json.dumps(metadata, ensure_ascii=False, indent=2),
encoding="utf-8",
)
doc_text = format_index_document(doc, metadata)
index_path = output_dir / "index.txt"
await asyncio.to_thread(index_path.write_text, doc_text, encoding="utf-8")
result_data = {
"document_id": doc.document_id,
"dir": str(output_dir),
"rel_dir": str(output_dir.relative_to(documents_dir)),
"extracted": str(output_dir / "extracted.md"),
"index": str(index_path),
"project": project_slug,
"doc_type": metadata.get("doc_type", doc_type),
"kind": "document",
}
if rag_cfg.get("enabled", False) and rag_cfg.get("auto_index", True):
tasks[task_id].update({"message": "Индексация в RAG...", "progress": 75})
await send_progress(task_id, 75, "Индексация в RAG...", "postprocessing")
global_doc_text = format_global_index_document(doc_text, metadata)
await index_meeting(
doc_text=doc_text,
global_doc_text=global_doc_text,
project_name=project_slug,
working_dir_base=org_rag_index_dir(org_slug),
model=rag_cfg.get("index_model", "mimo-v2.5-free"),
api_key=api_key,
base_url=base_url,
)
from backend.queue import _cleanup_upload
await asyncio.to_thread(_cleanup_upload, file_path)
tasks[task_id].update({
"status": "completed",
"progress": 100,
"message": "Документ проиндексирован",
"result": result_data,
"finished": datetime.now().isoformat(),
})
await send_progress(task_id, 100, "Документ проиндексирован", "completed", result=result_data)
except Exception as e:
error_msg = str(e)
tasks[task_id].update({
"status": "error",
"message": f"Ошибка: {error_msg}",
"error": error_msg,
})
await send_progress(task_id, 0, f"Ошибка: {error_msg}", "error", error=error_msg)