"""migrate_lightrag_to_qmd.py — перенос индексов из LightRAG в qmd. Идемпотентный скрипт: переиндексирует существующие .md в qmd-коллекции. Повторный запуск безопасен (qmd content-hash проверка + перезапись stub). Использование:: python scripts/migrate_lightrag_to_qmd.py [--org merakom] [--dry-run] Что делает: 1. Находит все ``processed//meetings//.md`` и ``_summary.md``. 2. Находит все ``processed//documents//extracted.md``. 3. По ``.project.json`` определяет проект и вызывает ``src.rag.qmd.indexer.qmd_index_meeting`` или ``qmd_index_document``. 4. После успешной миграции помечает папку через ``.migrated_to_qmd``. Снапшот:: Перед запуском в проде: ``tar -czf processed-pre-qmd.tar.gz processed`` """ from __future__ import annotations import argparse import asyncio import json import sys from pathlib import Path from typing import Iterable, Optional ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from src.config import load_config # noqa: E402 from src.rag.qmd.indexer import qmd_index_document, qmd_index_meeting # noqa: E402 MIGRATION_MARKER = ".migrated_to_qmd" def _iter_meetings(org_dir: Path) -> Iterable[Path]: meetings = org_dir / "meetings" if not meetings.exists(): return for folder in sorted(meetings.iterdir()): if not folder.is_dir(): continue if (folder / MIGRATION_MARKER).exists(): continue yield folder def _iter_documents(org_dir: Path) -> Iterable[Path]: documents = org_dir / "documents" if not documents.exists(): return for folder in sorted(documents.iterdir()): if not folder.is_dir(): continue if (folder / MIGRATION_MARKER).exists(): continue yield folder def _read_project(folder: Path, fallback: Path) -> Optional[str]: meta_path = folder / ".project.json" if meta_path.exists(): try: return json.loads(meta_path.read_text(encoding="utf-8")).get("project_slug") except json.JSONDecodeError: pass return None def _write_marker(folder: Path) -> None: (folder / MIGRATION_MARKER).write_text( json.dumps( {"migrated_at": _now_iso(), "engine": "qmd"}, ensure_ascii=False, ), encoding="utf-8", ) def _now_iso() -> str: from datetime import datetime return datetime.now().isoformat() async def migrate_org(org_slug: str, dry_run: bool = False) -> dict: config = load_config() rag_cfg = config.get("rag", {}) if not (rag_cfg.get("enabled", True) and rag_cfg.get("auto_index", True)): print(f"[migrate] RAG disabled in config — nothing to do for org='{org_slug}'") return {"meetings": 0, "documents": 0} org_dir = ROOT / "processed" / org_slug if not org_dir.exists(): print(f"[migrate] No processed/{org_slug} — skipping") return {"meetings": 0, "documents": 0} meetings_count = 0 documents_count = 0 errors: list[str] = [] for folder in _iter_meetings(org_dir): project = _read_project(folder, org_dir / "meetings") if not project: print(f"[migrate] skip meeting folder {folder.name}: no project") continue body_path = next(iter(sorted(folder.glob("*.txt"))), None) if body_path is None: md_files = sorted(folder.glob("*.md")) body_path = md_files[0] if md_files else None if body_path is None: print(f"[migrate] skip meeting folder {folder.name}: no .md/.txt") continue summary_path = next(iter(sorted(folder.glob("*_summary.md"))), None) print(f"[migrate] meeting: org={org_slug} project={project} folder={folder.name}") if dry_run: continue try: await qmd_index_meeting( org_slug=org_slug, project_slug=project, body_path=body_path, summary_path=summary_path, txt_path=body_path, ) _write_marker(folder) meetings_count += 1 except Exception as exc: errors.append(f"meeting {folder.name}: {exc}") for folder in _iter_documents(org_dir): project = _read_project(folder, org_dir / "documents") if not project: print(f"[migrate] skip document folder {folder.name}: no project") continue extracted = folder / "extracted.md" print(f"[migrate] document: org={org_slug} project={project} folder={folder.name}") if dry_run: continue try: await qmd_index_document( org_slug=org_slug, project_slug=project, document_dir=folder, extracted_md=extracted, ) _write_marker(folder) documents_count += 1 except Exception as exc: errors.append(f"document {folder.name}: {exc}") if errors: print(f"[migrate] {len(errors)} errors:") for err in errors: print(f" - {err}") return {"meetings": meetings_count, "documents": documents_count, "errors": len(errors)} def main() -> int: parser = argparse.ArgumentParser(description="Migrate LightRAG caches to qmd collections.") parser.add_argument("--org", help="Org slug (default: bootstrap org from config)") parser.add_argument("--dry-run", action="store_true", help="Show what would be done") args = parser.parse_args() org_slug = args.org if not org_slug: config = load_config() org_slug = config.get("auth", {}).get("bootstrap", {}).get("org_slug", "merakom") print(f"[migrate] target org: {org_slug} dry-run: {args.dry_run}") result = asyncio.run(migrate_org(org_slug, dry_run=args.dry_run)) print(f"[migrate] done: {result}") return 0 if not result.get("errors") else 1 if __name__ == "__main__": raise SystemExit(main())