transcription/backend/queue.py

235 lines
8.1 KiB
Python
Raw Normal View History

"""Фоновая очередь обработки аудио/видео."""
import asyncio
import json
import os
import shutil
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.audio_utils import prepare_audio_input
from src.config import get_profile, load_config, resolve_hf_token
from src.document import build_document
from src.pipeline import run_pipeline
UPLOAD_DIR = Path("uploads")
PROCESSED_DIR = Path("processed")
UPLOAD_DIR.mkdir(exist_ok=True)
PROCESSED_DIR.mkdir(exist_ok=True)
# Глобальное хранилище состояний задач
tasks: Dict[str, Dict[str, Any]] = {}
# Callback для отправки прогресса через WebSocket
_progress_callback: Optional[Callable] = None
def set_progress_callback(callback: Callable):
"""Устанавливает callback для отправки прогресса."""
global _progress_callback
_progress_callback = callback
async def _send_progress(task_id: str, progress: int, message: str, status: str, result=None, error=None):
"""Отправляет прогресс через callback."""
if _progress_callback:
try:
task_info = tasks.get(task_id, {})
await _progress_callback({
"task_id": task_id,
"progress": progress,
"message": message,
"status": status,
"file": task_info.get("file", ""),
"result": result,
"error": error,
})
except Exception:
pass
async def process_file(file_path: Path, task_id: str):
"""Обрабатывает один файл и отправляет прогресс."""
tasks[task_id] = {
"task_id": task_id,
"status": "processing",
"progress": 0,
"message": "Начало обработки...",
"file": str(file_path.name),
"result": None,
"error": None,
"started": datetime.now().isoformat(),
}
await _send_progress(task_id, 5, "Извлечение аудио...", "processing")
try:
# Загружаем конфиг
config = load_config()
profile = get_profile(config)
await _send_progress(task_id, 15, "Загрузка моделей ИИ...", "processing")
# Подготовка аудио (в отдельном потоке, чтобы не блокировать event loop)
audio_path = await asyncio.to_thread(prepare_audio_input, str(file_path))
await _send_progress(task_id, 25, "Транскрибация (распознавание речи)...", "processing")
# Запуск пайплайна (в отдельном потоке)
result = await asyncio.to_thread(
run_pipeline,
input_path=str(file_path),
profile_name=None,
config_path=None,
)
await _send_progress(task_id, 75, "Генерация документов...", "processing")
# Определяем имена выходных файлов (уникальная папка с timestamp)
stem = file_path.stem
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
folder_name = f"{stem}_{timestamp}"
output_dir = PROCESSED_DIR / folder_name
await asyncio.to_thread(output_dir.mkdir, parents=True, exist_ok=True)
# Сохраняем docx и md (в отдельном потоке)
docx_path = str(output_dir / f"{stem}.docx")
md_path = str(output_dir / f"{stem}.md")
await asyncio.to_thread(build_document, result["segments"], docx_path, config)
await asyncio.to_thread(build_document, result["segments"], md_path, config)
# Также сохраняем исходник
src_copy = output_dir / file_path.name
await asyncio.to_thread(shutil.copy2, str(file_path), str(src_copy))
result_data = {
"docx": str(docx_path),
"md": str(md_path),
"dir": str(output_dir),
}
await _send_progress(task_id, 100, "Обработка завершена", "completed", result=result_data)
tasks[task_id].update({
"status": "completed",
"progress": 100,
"message": "Обработка завершена",
"result": result_data,
"finished": datetime.now().isoformat(),
})
except Exception as e:
error_msg = str(e)
await _send_progress(task_id, 0, f"Ошибка: {error_msg}", "error", error=error_msg)
tasks[task_id].update({
"status": "error",
"progress": 0,
"message": f"Ошибка: {error_msg}",
"error": error_msg,
})
# Очередь задач
_queue: asyncio.Queue = asyncio.Queue()
_workers: List[asyncio.Task] = []
async def _worker_loop():
"""Рабочий цикл обработки."""
print("[Worker] Рабочий процесс запущен и ждёт задачи...")
while True:
try:
task_id, file_path = await _queue.get()
print(f"[Worker] Получена задача: {task_id}")
await process_file(file_path, task_id)
_queue.task_done()
print(f"[Worker] Задача завершена: {task_id}")
except asyncio.CancelledError:
print("[Worker] Остановка рабочего процесса")
break
except Exception as e:
print(f"[Worker Error] {e}")
def start_workers(num_workers: int = 1):
"""Запускает рабочих в текущем event loop."""
global _workers
for i in range(num_workers):
task = asyncio.create_task(_worker_loop())
_workers.append(task)
def stop_workers():
"""Останавливает рабочих."""
for w in _workers:
w.cancel()
async def enqueue(file_path: Path) -> str:
"""Добавляет файл в очередь."""
task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file_path.stem}"
tasks[task_id] = {
"task_id": task_id,
"status": "queued",
"progress": 0,
"message": "В очереди...",
"file": str(file_path.name),
"result": None,
"error": None,
"started": datetime.now().isoformat(),
}
await _queue.put((task_id, file_path))
return task_id
def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
"""Возвращает статус задачи."""
return tasks.get(task_id)
def get_all_tasks() -> List[Dict[str, Any]]:
"""Возвращает все задачи."""
return list(tasks.values())
def get_processed_tree() -> List[Dict[str, Any]]:
"""Возвращает дерево обработанных файлов."""
tree = []
if not PROCESSED_DIR.exists():
return tree
for item in sorted(PROCESSED_DIR.iterdir()):
if item.is_dir():
files = []
for f in sorted(item.iterdir()):
if f.is_file():
files.append({
"name": f.name,
"path": str(f.relative_to(PROCESSED_DIR)),
"size": f.stat().st_size,
"ext": f.suffix.lower(),
})
tree.append({
"name": item.name,
"path": str(item.relative_to(PROCESSED_DIR)),
"files": files,
"created": datetime.fromtimestamp(item.stat().st_ctime).isoformat(),
})
return tree
def read_file_content(rel_path: str) -> str:
"""Читает содержимое файла."""
full_path = PROCESSED_DIR / rel_path
if not full_path.exists() or not full_path.is_file():
raise FileNotFoundError(f"Файл не найден: {rel_path}")
with open(full_path, "r", encoding="utf-8") as f:
return f.read()