transcription/backend/ingest_worker.py
keboss-m eee8f4c8a4 Replace LightRAG with native Python RAG engine + add deploy tooling
- New: src/rag/engine/ — in-process hybrid search (FTS5 BM25 + sqlite-vec + LLM rerank)
- New: src/rag/qmd/ — compatibility layer (qmd_query, qmd_chat, qmd_chat_stream, qmd_index_*)
- New: src/ingest/stub_writer.py — .md stubs for binary files (videos, archives)
- New: scripts/deploy.sh + scripts/pull_models.sh + Makefile + .env.example
- Removed: LightRAG, sentence-transformers embedding via separate package, rag_standalone/
- Removed: @nousresearch/qmd npm dep (package not published); Node.js from Dockerfile
- Updated: tests/ (46 passed), docker-compose, .dockerignore, config.yaml, README

Engine: in-process Python (no daemon, no npm), sentence-transformers 384-dim,
RRF fusion (k=60), BM25 + vector with numpy fallback. WebSocket API unchanged.

Deploy: 'git clone' + 'make init' + 'make pull-models MODELS_SOURCE=...' + 'make up'.
Models (5.83 GB) live outside git; pulled via rsync from dev host.
2026-06-10 14:24:01 +03:00

159 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Document ingestion worker pipeline."""
import asyncio
import json
import shutil
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
from backend.paths import org_documents_dir, org_qmd_root, write_folder_project_meta
from src.config import load_config, resolve_opencode_credentials
from src.ingest.classify import classify_document
from src.ingest.formatter import format_global_index_document, format_index_document
from src.ingest.router import extract_document, is_extractable
from src.ingest.stub_writer import write_stub
from src.rag.qmd.indexer import qmd_index_document
async def process_document_ingest(job: Dict[str, Any], tasks: dict, send_progress):
task_id = job["task_id"]
file_path = Path(job["file_path"])
org_slug = job["org_slug"]
project_slug = job["project_slug"]
doc_type = job.get("doc_type", "other")
display_name = job.get("display_name", file_path.name)
tasks[task_id].update({"status": "processing", "message": "Извлечение текста...", "progress": 10})
await send_progress(task_id, 10, "Извлечение текста...", "processing")
try:
config = load_config()
ingest_cfg = config.get("ingest", {})
pdf_ocr = ingest_cfg.get("pdf_ocr", True)
documents_dir = org_documents_dir(org_slug)
output_dir = documents_dir / f"doc_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{task_id[-8:]}"
await asyncio.to_thread(output_dir.mkdir, parents=True, exist_ok=True)
original_dest = output_dir / file_path.name
await asyncio.to_thread(shutil.copy2, file_path, original_dest)
await asyncio.to_thread(write_folder_project_meta, output_dir, project_slug)
if is_extractable(file_path.name):
doc = await asyncio.to_thread(
extract_document,
file_path,
project_slug,
doc_type,
None,
pdf_ocr,
)
if not doc.full_text.strip():
raise ValueError("Не удалось извлечь текст из документа")
await asyncio.to_thread(
(output_dir / "extracted.md").write_text,
doc.full_text,
encoding="utf-8",
)
else:
try:
stub = await asyncio.to_thread(
write_stub, file_path, project_slug
)
print(f"[Ingest] {task_id}: создан stub {stub.name}")
except (FileNotFoundError, OSError) as exc:
print(f"[Ingest] {task_id}: stub_writer failed: {exc}")
doc = type("StubDoc", (), {})()
doc.full_text = ""
doc.document_id = output_dir.name
doc.filename = file_path.name
doc.doc_type = doc_type
doc.metadata = {}
doc.to_metadata_dict = lambda: {
"document_id": doc.document_id,
"filename": doc.filename,
"doc_type": doc.doc_type,
"project": project_slug,
"stub": True,
}
tasks[task_id].update({"status": "postprocessing", "message": "Анализ документа...", "progress": 40})
await send_progress(task_id, 40, "Анализ документа...", "postprocessing")
metadata = doc.to_metadata_dict() if callable(getattr(doc, "to_metadata_dict", None)) else {}
rag_cfg = config.get("rag", {})
api_key, base_url = resolve_opencode_credentials(config)
if api_key and ingest_cfg.get("auto_classify", True) and doc.full_text:
metadata = await classify_document(
text=doc.full_text,
project=project_slug,
doc_type_hint=doc_type,
api_key=api_key,
base_url=base_url,
model=rag_cfg.get("index_model", "mimo-v2.5-free"),
chunk_size=int(rag_cfg.get("classify_chunk_size", 7000)),
)
metadata["filename"] = doc.filename
metadata["document_id"] = doc.document_id
await asyncio.to_thread(
(output_dir / "metadata.json").write_text,
json.dumps(metadata, ensure_ascii=False, indent=2),
encoding="utf-8",
)
if doc.full_text:
doc_text = format_index_document(doc, metadata)
index_path = output_dir / "index.txt"
await asyncio.to_thread(index_path.write_text, doc_text, encoding="utf-8")
else:
doc_text = ""
index_path = None
result_data = {
"document_id": getattr(doc, "document_id", output_dir.name),
"dir": str(output_dir),
"rel_dir": str(output_dir.relative_to(documents_dir)),
"extracted": str(output_dir / "extracted.md") if (output_dir / "extracted.md").exists() else None,
"index": str(index_path) if index_path else None,
"project": project_slug,
"doc_type": metadata.get("doc_type", doc_type),
"kind": "document",
}
if rag_cfg.get("enabled", False) and rag_cfg.get("auto_index", True):
tasks[task_id].update({"message": "Индексация в qmd...", "progress": 75})
await send_progress(task_id, 75, "Индексация в qmd...", "postprocessing")
try:
await qmd_index_document(
org_slug=org_slug,
project_slug=project_slug,
document_dir=output_dir,
extracted_md=output_dir / "extracted.md",
)
except Exception as idx_exc:
print(f"[Ingest] {task_id}: qmd index failed: {idx_exc}")
from backend.queue import _cleanup_upload
await asyncio.to_thread(_cleanup_upload, file_path)
tasks[task_id].update({
"status": "completed",
"progress": 100,
"message": "Документ проиндексирован",
"result": result_data,
"finished": datetime.now().isoformat(),
})
await send_progress(task_id, 100, "Документ проиндексирован", "completed", result=result_data)
except Exception as e:
error_msg = str(e)
tasks[task_id].update({
"status": "error",
"message": f"Ошибка: {error_msg}",
"error": error_msg,
})
await send_progress(task_id, 0, f"Ошибка: {error_msg}", "error", error=error_msg)