"""Фоновая очередь обработки аудио/видео.""" import asyncio import json import os import shutil import sys from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional sys.path.insert(0, str(Path(__file__).parent.parent)) from src.audio_utils import prepare_audio_input from src.config import get_profile, load_config, resolve_hf_token from src.document import build_document from src.pipeline import run_pipeline UPLOAD_DIR = Path("uploads") PROCESSED_DIR = Path("processed") UPLOAD_DIR.mkdir(exist_ok=True) PROCESSED_DIR.mkdir(exist_ok=True) # Глобальное хранилище состояний задач tasks: Dict[str, Dict[str, Any]] = {} # Callback для отправки прогресса через WebSocket _progress_callback: Optional[Callable] = None def set_progress_callback(callback: Callable): """Устанавливает callback для отправки прогресса.""" global _progress_callback _progress_callback = callback async def _send_progress(task_id: str, progress: int, message: str, status: str, result=None, error=None): """Отправляет прогресс через callback.""" if _progress_callback: try: await _progress_callback({ "task_id": task_id, "progress": progress, "message": message, "status": status, "result": result, "error": error, }) except Exception: pass async def process_file(file_path: Path, task_id: str): """Обрабатывает один файл и отправляет прогресс.""" tasks[task_id] = { "task_id": task_id, "status": "processing", "progress": 0, "message": "Начало обработки...", "file": str(file_path.name), "result": None, "error": None, "started": datetime.now().isoformat(), } await _send_progress(task_id, 5, "Извлечение аудио...", "processing") try: # Загружаем конфиг config = load_config() profile = get_profile(config) await _send_progress(task_id, 15, "Загрузка моделей ИИ...", "processing") # Подготовка аудио audio_path = prepare_audio_input(str(file_path)) await _send_progress(task_id, 25, "Транскрибация (распознавание речи)...", "processing") # Запуск пайплайна result = run_pipeline( input_path=str(file_path), profile_name=None, config_path=None, ) await _send_progress(task_id, 75, "Генерация документов...", "processing") # Определяем имена выходных файлов stem = file_path.stem output_dir = PROCESSED_DIR / stem output_dir.mkdir(parents=True, exist_ok=True) # Сохраняем docx и md docx_path = str(output_dir / f"{stem}.docx") md_path = str(output_dir / f"{stem}.md") build_document(result["segments"], docx_path, config) build_document(result["segments"], md_path, config) # Также сохраняем исходник src_copy = output_dir / file_path.name if not src_copy.exists(): shutil.copy2(str(file_path), str(src_copy)) result_data = { "docx": str(docx_path), "md": str(md_path), "dir": str(output_dir), } await _send_progress(task_id, 100, "Обработка завершена", "completed", result=result_data) tasks[task_id].update({ "status": "completed", "progress": 100, "message": "Обработка завершена", "result": result_data, "finished": datetime.now().isoformat(), }) except Exception as e: error_msg = str(e) await _send_progress(task_id, 0, f"Ошибка: {error_msg}", "error", error=error_msg) tasks[task_id].update({ "status": "error", "progress": 0, "message": f"Ошибка: {error_msg}", "error": error_msg, }) # Очередь задач _queue: asyncio.Queue = asyncio.Queue() _workers: List[asyncio.Task] = [] async def _worker_loop(): """Рабочий цикл обработки.""" while True: try: task_id, file_path = await _queue.get() await process_file(file_path, task_id) _queue.task_done() except asyncio.CancelledError: break except Exception as e: print(f"[Worker Error] {e}") def start_workers(num_workers: int = 1): """Запускает рабочих.""" global _workers loop = asyncio.get_event_loop() for i in range(num_workers): task = loop.create_task(_worker_loop()) _workers.append(task) def stop_workers(): """Останавливает рабочих.""" for w in _workers: w.cancel() async def enqueue(file_path: Path) -> str: """Добавляет файл в очередь.""" task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file_path.stem}" tasks[task_id] = { "task_id": task_id, "status": "queued", "progress": 0, "message": "В очереди...", "file": str(file_path.name), "result": None, "error": None, "started": datetime.now().isoformat(), } await _queue.put((task_id, file_path)) return task_id def get_task_status(task_id: str) -> Optional[Dict[str, Any]]: """Возвращает статус задачи.""" return tasks.get(task_id) def get_all_tasks() -> List[Dict[str, Any]]: """Возвращает все задачи.""" return list(tasks.values()) def get_processed_tree() -> List[Dict[str, Any]]: """Возвращает дерево обработанных файлов.""" tree = [] if not PROCESSED_DIR.exists(): return tree for item in sorted(PROCESSED_DIR.iterdir()): if item.is_dir(): files = [] for f in sorted(item.iterdir()): if f.is_file(): files.append({ "name": f.name, "path": str(f.relative_to(PROCESSED_DIR)), "size": f.stat().st_size, "ext": f.suffix.lower(), }) tree.append({ "name": item.name, "path": str(item.relative_to(PROCESSED_DIR)), "files": files, "created": datetime.fromtimestamp(item.stat().st_ctime).isoformat(), }) return tree def read_file_content(rel_path: str) -> str: """Читает содержимое файла.""" full_path = PROCESSED_DIR / rel_path if not full_path.exists() or not full_path.is_file(): raise FileNotFoundError(f"Файл не найден: {rel_path}") with open(full_path, "r", encoding="utf-8") as f: return f.read()