transcription/backend/main.py
keboss-m fee9b9acb1 Add RAG, summary pipeline, and split transcribe/postprocess queue.
Separate ASR (2 workers) from summary/RAG post-processing, add LightRAG chat API, batch upload fixes, and local model mounts for Docker deployment.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-01 17:40:58 +03:00

433 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""FastAPI backend для сервиса транскрибации."""
import json
import os
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, File, UploadFile, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, PlainTextResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from backend.queue import (
UPLOAD_DIR,
PROCESSED_DIR,
save_upload,
get_all_tasks,
get_queue_info,
get_task_status,
get_processed_tree,
read_file_content,
set_progress_callback,
start_workers,
stop_workers,
)
# Добавляем корень проекта в путь, чтобы импортировать src.rag
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.config import load_config, resolve_opencode_credentials
from src.rag.indexer import get_project_names
from src.rag.parser import parse_project_from_filename
from src.rag.query import rag_chat, retrieve_context
# WebSocket менеджер
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
async def broadcast(self, message: dict):
for conn in self.active_connections:
try:
await conn.send_json(message)
except Exception:
pass
manager = ConnectionManager()
# Устанавливаем callback для отправки прогресса через WebSocket
set_progress_callback(manager.broadcast)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Управление жизненным циклом приложения."""
config = load_config()
queue_cfg = config.get("queue", {})
transcribe_workers = int(queue_cfg.get("transcribe_workers", 2))
postprocess_workers = int(queue_cfg.get("postprocess_workers", 1))
print("🚀 Запуск рабочих процессов...")
start_workers(
transcribe_workers=transcribe_workers,
postprocess_workers=postprocess_workers,
)
yield
print("🛑 Остановка рабочих процессов...")
stop_workers()
app = FastAPI(
title="Transcription Service",
version="1.0.0",
lifespan=lifespan,
)
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# === API Endpoints ===
@app.get("/", response_class=HTMLResponse)
async def root():
"""Главная страница."""
index_path = Path(__file__).parent / "static" / "index.html"
if index_path.exists():
return index_path.read_text(encoding="utf-8")
return "<h1>Transcription Service</h1><p>Frontend not built</p>"
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
"""Загружает файл и добавляет в очередь обработки."""
content = await file.read()
task_id, _ = await save_upload(content, file.filename or "upload.bin")
return {
"task_id": task_id,
"file": file.filename,
"status": "queued",
"message": "Файл добавлен в очередь обработки",
"queue": get_queue_info(),
}
@app.post("/upload-batch")
async def upload_batch(files: List[UploadFile] = File(...)):
"""Загружает несколько файлов пакетно — все ставятся в очередь."""
if not files:
return {"error": "Не переданы файлы", "uploaded": 0, "tasks": []}
results = []
for file in files:
content = await file.read()
task_id, _ = await save_upload(content, file.filename or "upload.bin")
results.append({
"task_id": task_id,
"file": file.filename,
"status": "queued",
})
return {
"uploaded": len(results),
"tasks": results,
"queue": get_queue_info(),
"message": f"{len(results)} файл(ов) добавлено в очередь",
}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket для получения прогресса обработки."""
await manager.connect(websocket)
try:
while True:
# Ждём сообщения от клиента (ping/keepalive)
data = await websocket.receive_text()
msg = json.loads(data)
if msg.get("action") == "get_tasks":
tasks = get_all_tasks()
await websocket.send_json({
"type": "tasks_list",
"tasks": tasks,
"queue": get_queue_info(),
})
elif msg.get("action") == "get_tree":
tree = get_processed_tree()
await websocket.send_json({
"type": "file_tree",
"tree": tree,
})
elif msg.get("action") == "rag_query":
await _handle_rag_query(websocket, msg)
elif msg.get("action") == "rag_query_global":
await _handle_rag_query(websocket, msg)
except WebSocketDisconnect:
manager.disconnect(websocket)
except Exception:
manager.disconnect(websocket)
@app.get("/api/tasks")
async def api_tasks():
"""Возвращает список всех задач."""
return {"tasks": get_all_tasks(), "queue": get_queue_info()}
@app.get("/api/tasks/{task_id}")
async def api_task(task_id: str):
"""Возвращает статус конкретной задачи."""
status = get_task_status(task_id)
if not status:
return {"error": "Task not found"}
return status
@app.get("/api/files")
async def api_files():
"""Возвращает дерево обработанных файлов."""
return {"tree": get_processed_tree()}
@app.get("/api/files/content")
async def api_file_content(path: str):
"""Возвращает содержимое файла."""
try:
content = read_file_content(path)
return {"content": content, "path": path}
except Exception as e:
return {"error": str(e)}
@app.get("/api/files/download")
async def api_download(path: str):
"""Скачивает файл."""
file_path = PROCESSED_DIR / path
if not file_path.exists():
return {"error": "File not found"}
return FileResponse(file_path, filename=file_path.name)
import shutil
@app.delete("/api/folders/{folder_name}")
async def api_delete_folder(folder_name: str):
"""Удаляет папку с обработанными файлами."""
folder_path = PROCESSED_DIR / folder_name
if not folder_path.exists():
return {"error": "Folder not found"}
try:
shutil.rmtree(folder_path)
return {"deleted": folder_name}
except Exception as e:
return {"error": str(e)}
# === RAG / Chat API ===
@app.get("/api/rag/projects")
async def api_rag_projects():
"""Возвращает список проектов с RAG-индексами."""
try:
config = load_config()
rag_cfg = config.get("rag", {})
index_dir = Path(rag_cfg.get("project_index_dir", "./processed/lightrag_caches"))
projects = await get_project_names(index_dir)
return {"projects": projects}
except Exception as e:
return {"error": str(e), "projects": []}
@app.post("/api/rag/query")
async def api_rag_query(payload: dict):
"""Запрос к чат-боту по конкретному проекту."""
try:
config = load_config()
rag_cfg = config.get("rag", {})
index_dir = Path(rag_cfg.get("project_index_dir", "./processed/lightrag_caches"))
api_key, base_url = resolve_opencode_credentials(config)
chat_model = rag_cfg.get("chat_model", "deepseek-v4-flash-free")
index_model = rag_cfg.get("index_model", "mimo-v2.5-free")
mode = payload.get("mode", "hybrid")
result = await rag_chat(
question=payload.get("question", ""),
working_dir_base=index_dir,
history=payload.get("history", []),
api_key=api_key,
project_name=payload.get("project"),
base_url=base_url,
chat_model=chat_model,
mode=mode,
index_model=index_model,
)
return {
"answer": result["answer"],
"context": result["context"],
"project": result["project"],
}
except Exception as e:
return {"error": str(e)}
@app.post("/api/rag/query-global")
async def api_rag_query_global(payload: dict):
"""Глобальный запрос ко всем проектам."""
try:
config = load_config()
rag_cfg = config.get("rag", {})
index_dir = Path(rag_cfg.get("project_index_dir", "./processed/lightrag_caches"))
api_key, base_url = resolve_opencode_credentials(config)
chat_model = rag_cfg.get("chat_model", "deepseek-v4-flash-free")
index_model = rag_cfg.get("index_model", "mimo-v2.5-free")
mode = payload.get("mode", "hybrid")
result = await rag_chat(
question=payload.get("question", ""),
working_dir_base=index_dir,
history=payload.get("history", []),
api_key=api_key,
project_name=None,
base_url=base_url,
chat_model=chat_model,
mode=mode,
index_model=index_model,
)
return {
"answer": result["answer"],
"context": result["context"],
"project": None,
}
except Exception as e:
return {"error": str(e)}
@app.get("/api/rag/tasks")
async def api_rag_tasks(project: Optional[str] = None):
"""Возвращает action items из RAG."""
try:
config = load_config()
rag_cfg = config.get("rag", {})
index_dir = Path(rag_cfg.get("project_index_dir", "./processed/lightrag_caches"))
api_key, base_url = resolve_opencode_credentials(config)
chat_model = rag_cfg.get("chat_model", "deepseek-v4-flash-free")
index_model = rag_cfg.get("index_model", "mimo-v2.5-free")
question = "Перечисли все action items, задачи и ответственных из протоколов."
if project:
question = f"Перечисли все action items, задачи и ответственных по проекту {project}."
result = await rag_chat(
question=question,
working_dir_base=index_dir,
history=[],
api_key=api_key,
project_name=project,
base_url=base_url,
chat_model=chat_model,
mode="hybrid",
index_model=index_model,
)
return {
"tasks": result["answer"],
"project": project,
}
except Exception as e:
return {"error": str(e)}
@app.post("/api/rag/index/{folder_name}")
async def api_rag_index_folder(folder_name: str):
"""Принудительная переиндексация папки с обработанным совещанием."""
try:
folder_path = PROCESSED_DIR / folder_name
if not folder_path.exists():
return {"error": "Folder not found"}
# Ищем .txt файл с протоколом
txt_files = list(folder_path.glob("*.txt"))
if not txt_files:
return {"error": "No .txt protocol found in folder"}
txt_path = txt_files[0]
doc_text = txt_path.read_text(encoding="utf-8")
# Определяем проект из имени папки
project = parse_project_from_filename(folder_name)
config = load_config()
rag_cfg = config.get("rag", {})
index_dir = Path(rag_cfg.get("project_index_dir", "./processed/lightrag_caches"))
index_model = rag_cfg.get("index_model", "mimo-v2.5-free")
api_key, base_url = resolve_opencode_credentials(config)
from src.rag.indexer import index_meeting
from src.rag.formatter import format_global_document
# Для переиндексации используем простую заглушку метаданных
metadata = {"project": project, "section": "Общие вопросы", "topic": "Переиндексация"}
global_doc_text = format_global_document(doc_text, metadata)
await index_meeting(
doc_text=doc_text,
global_doc_text=global_doc_text,
project_name=project,
working_dir_base=index_dir,
model=index_model,
api_key=api_key,
base_url=base_url,
)
return {"indexed": folder_name, "project": project}
except Exception as e:
return {"error": str(e)}
# === WebSocket Chat Actions ===
async def _handle_rag_query(websocket: WebSocket, msg: dict):
"""Обрабатывает rag_query через WebSocket."""
try:
config = load_config()
rag_cfg = config.get("rag", {})
index_dir = Path(rag_cfg.get("project_index_dir", "./processed/lightrag_caches"))
api_key, base_url = resolve_opencode_credentials(config)
chat_model = rag_cfg.get("chat_model", "deepseek-v4-flash-free")
index_model = rag_cfg.get("index_model", "mimo-v2.5-free")
mode = msg.get("mode", "hybrid")
result = await rag_chat(
question=msg.get("question", ""),
working_dir_base=index_dir,
history=msg.get("history", []),
api_key=api_key,
project_name=msg.get("project"),
base_url=base_url,
chat_model=chat_model,
mode=mode,
index_model=index_model,
)
await websocket.send_json({
"type": "rag_response",
"answer": result["answer"],
"context": result["context"],
"project": result["project"],
})
except Exception as e:
await websocket.send_json({
"type": "rag_error",
"error": str(e),
})
# Статические файлы
app.mount("/static", StaticFiles(directory="backend/static"), name="static")