transcription/backend/queue.py

232 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Фоновая очередь обработки аудио/видео."""
import asyncio
import json
import os
import shutil
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.audio_utils import prepare_audio_input
from src.config import get_profile, load_config, resolve_hf_token
from src.document import build_document
from src.pipeline import run_pipeline
UPLOAD_DIR = Path("uploads")
PROCESSED_DIR = Path("processed")
UPLOAD_DIR.mkdir(exist_ok=True)
PROCESSED_DIR.mkdir(exist_ok=True)
# Глобальное хранилище состояний задач
tasks: Dict[str, Dict[str, Any]] = {}
# Callback для отправки прогресса через WebSocket
_progress_callback: Optional[Callable] = None
def set_progress_callback(callback: Callable):
"""Устанавливает callback для отправки прогресса."""
global _progress_callback
_progress_callback = callback
async def _send_progress(task_id: str, progress: int, message: str, status: str, result=None, error=None):
"""Отправляет прогресс через callback."""
if _progress_callback:
try:
await _progress_callback({
"task_id": task_id,
"progress": progress,
"message": message,
"status": status,
"result": result,
"error": error,
})
except Exception:
pass
async def process_file(file_path: Path, task_id: str):
"""Обрабатывает один файл и отправляет прогресс."""
tasks[task_id] = {
"task_id": task_id,
"status": "processing",
"progress": 0,
"message": "Начало обработки...",
"file": str(file_path.name),
"result": None,
"error": None,
"started": datetime.now().isoformat(),
}
await _send_progress(task_id, 5, "Извлечение аудио...", "processing")
try:
# Загружаем конфиг
config = load_config()
profile = get_profile(config)
await _send_progress(task_id, 15, "Загрузка моделей ИИ...", "processing")
# Подготовка аудио
audio_path = prepare_audio_input(str(file_path))
await _send_progress(task_id, 25, "Транскрибация (распознавание речи)...", "processing")
# Запуск пайплайна
result = run_pipeline(
input_path=str(file_path),
profile_name=None,
config_path=None,
)
await _send_progress(task_id, 75, "Генерация документов...", "processing")
# Определяем имена выходных файлов (уникальная папка с timestamp)
stem = file_path.stem
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
folder_name = f"{stem}_{timestamp}"
output_dir = PROCESSED_DIR / folder_name
output_dir.mkdir(parents=True, exist_ok=True)
# Сохраняем docx и md
docx_path = str(output_dir / f"{stem}.docx")
md_path = str(output_dir / f"{stem}.md")
build_document(result["segments"], docx_path, config)
build_document(result["segments"], md_path, config)
# Также сохраняем исходник (всегда копируем, так как папка уникальная)
src_copy = output_dir / file_path.name
shutil.copy2(str(file_path), str(src_copy))
result_data = {
"docx": str(docx_path),
"md": str(md_path),
"dir": str(output_dir),
}
await _send_progress(task_id, 100, "Обработка завершена", "completed", result=result_data)
tasks[task_id].update({
"status": "completed",
"progress": 100,
"message": "Обработка завершена",
"result": result_data,
"finished": datetime.now().isoformat(),
})
except Exception as e:
error_msg = str(e)
await _send_progress(task_id, 0, f"Ошибка: {error_msg}", "error", error=error_msg)
tasks[task_id].update({
"status": "error",
"progress": 0,
"message": f"Ошибка: {error_msg}",
"error": error_msg,
})
# Очередь задач
_queue: asyncio.Queue = asyncio.Queue()
_workers: List[asyncio.Task] = []
async def _worker_loop():
"""Рабочий цикл обработки."""
print("[Worker] Рабочий процесс запущен и ждёт задачи...")
while True:
try:
task_id, file_path = await _queue.get()
print(f"[Worker] Получена задача: {task_id}")
await process_file(file_path, task_id)
_queue.task_done()
print(f"[Worker] Задача завершена: {task_id}")
except asyncio.CancelledError:
print("[Worker] Остановка рабочего процесса")
break
except Exception as e:
print(f"[Worker Error] {e}")
def start_workers(num_workers: int = 1):
"""Запускает рабочих в текущем event loop."""
global _workers
for i in range(num_workers):
task = asyncio.create_task(_worker_loop())
_workers.append(task)
def stop_workers():
"""Останавливает рабочих."""
for w in _workers:
w.cancel()
async def enqueue(file_path: Path) -> str:
"""Добавляет файл в очередь."""
task_id = f"task_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{file_path.stem}"
tasks[task_id] = {
"task_id": task_id,
"status": "queued",
"progress": 0,
"message": "В очереди...",
"file": str(file_path.name),
"result": None,
"error": None,
"started": datetime.now().isoformat(),
}
await _queue.put((task_id, file_path))
return task_id
def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
"""Возвращает статус задачи."""
return tasks.get(task_id)
def get_all_tasks() -> List[Dict[str, Any]]:
"""Возвращает все задачи."""
return list(tasks.values())
def get_processed_tree() -> List[Dict[str, Any]]:
"""Возвращает дерево обработанных файлов."""
tree = []
if not PROCESSED_DIR.exists():
return tree
for item in sorted(PROCESSED_DIR.iterdir()):
if item.is_dir():
files = []
for f in sorted(item.iterdir()):
if f.is_file():
files.append({
"name": f.name,
"path": str(f.relative_to(PROCESSED_DIR)),
"size": f.stat().st_size,
"ext": f.suffix.lower(),
})
tree.append({
"name": item.name,
"path": str(item.relative_to(PROCESSED_DIR)),
"files": files,
"created": datetime.fromtimestamp(item.stat().st_ctime).isoformat(),
})
return tree
def read_file_content(rel_path: str) -> str:
"""Читает содержимое файла."""
full_path = PROCESSED_DIR / rel_path
if not full_path.exists() or not full_path.is_file():
raise FileNotFoundError(f"Файл не найден: {rel_path}")
with open(full_path, "r", encoding="utf-8") as f:
return f.read()