transcription/backend/auth/service.py
keboss-m 8df14e3102 Add multi-tenant auth with org projects, roles, and personal workspaces.
JWT login, org-scoped storage and RAG, admin/director/user roles, user-owned projects, login UI, and legacy data migration.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-01 18:54:25 +03:00

181 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Auth business logic."""
import re
from typing import Any, Dict, List, Optional
from backend.auth import database as db
from backend.auth.models import UserContext
from backend.auth.security import create_access_token, verify_password
from src.config import load_config
def normalize_project_slug(slug: str) -> str:
value = slug.strip().lower()
value = re.sub(r"[^\w\-]", "-", value)
value = re.sub(r"-+", "-", value).strip("-_")
if not value:
raise ValueError("Некорректный slug проекта")
return value
def _build_user_context(row: Dict[str, Any], config: Optional[dict] = None) -> UserContext:
user_id = row["id"]
return UserContext(
user_id=user_id,
username=row["username"],
role=row["role"],
org_id=row["org_id"],
org_slug=row["org_slug"],
org_name=row["org_name"],
project_slugs=db.get_user_project_slugs(user_id, config=config),
owned_project_slugs=db.get_owned_project_slugs(user_id, config=config),
)
def _enrich_project(project: Dict[str, Any], ctx: UserContext) -> Dict[str, Any]:
item = dict(project)
item["is_owner"] = project.get("owner_user_id") == ctx.user_id
item["scope"] = project.get("scope") or ("personal" if project.get("owner_user_id") else "org")
return item
def authenticate(org_slug: str, username: str, password: str) -> Optional[Dict[str, Any]]:
config = load_config()
org = db.get_org_by_slug(org_slug.strip().lower(), config)
if not org:
return None
user = db.get_user_by_username(org["id"], username.strip(), config)
if not user or not user.get("is_active"):
return None
if not verify_password(password, user["password_hash"]):
return None
ctx = _build_user_context(user, config)
token = create_access_token(
user_id=ctx.user_id,
org_id=ctx.org_id,
role=ctx.role,
username=ctx.username,
org_slug=ctx.org_slug,
config=config,
)
return {
"access_token": token,
"token_type": "bearer",
"user": user_to_dict(ctx),
}
def get_user_context(user_id: int) -> Optional[UserContext]:
config = load_config()
row = db.get_user_by_id(user_id, config)
if not row or not row.get("is_active"):
return None
return _build_user_context(row, config)
def user_to_dict(ctx: UserContext) -> Dict[str, Any]:
return {
"id": ctx.user_id,
"username": ctx.username,
"role": ctx.role,
"org_id": ctx.org_id,
"org_slug": ctx.org_slug,
"org_name": ctx.org_name,
"shared_projects": ctx.project_slugs,
"owned_projects": ctx.owned_project_slugs,
"projects": list(ctx.accessible_project_slugs) if not ctx.has_all_projects_access else [],
"is_admin": ctx.is_admin,
"is_director": ctx.is_director,
"all_projects_access": ctx.has_all_projects_access,
}
def list_accessible_projects(ctx: UserContext) -> List[Dict[str, Any]]:
config = load_config()
all_projects = db.list_projects(ctx.org_id, config)
if ctx.has_all_projects_access:
return [_enrich_project(p, ctx) for p in all_projects]
result = []
assigned = set(ctx.project_slugs)
owned = set(ctx.owned_project_slugs)
for project in all_projects:
slug = project["slug"]
if project.get("owner_user_id") == ctx.user_id or slug in owned:
result.append(_enrich_project(project, ctx))
elif not project.get("owner_user_id") and slug in assigned:
result.append(_enrich_project(project, ctx))
return sorted(result, key=lambda p: (p["scope"] != "personal", p["slug"]))
def ensure_project_access(ctx: UserContext, project_slug: str) -> None:
slug = project_slug.strip().lower()
if not ctx.can_access_project(slug):
raise PermissionError(f"Нет доступа к проекту: {slug}")
def create_personal_project(ctx: UserContext, slug: str, name: str) -> Dict[str, Any]:
config = load_config()
normalized = normalize_project_slug(slug)
display_name = name.strip() or normalized
if db.get_project_by_slug(ctx.org_id, normalized, config):
raise ValueError("Проект с таким slug уже существует")
project = db.create_project(
ctx.org_id,
normalized,
display_name,
owner_user_id=ctx.user_id,
config=config,
)
return _enrich_project(project, ctx)
def delete_personal_project(ctx: UserContext, slug: str) -> None:
config = load_config()
normalized = normalize_project_slug(slug)
project = db.get_project_by_slug(ctx.org_id, normalized, config)
if not project:
raise ValueError("Проект не найден")
if ctx.is_admin:
db.delete_project(project["id"], config)
return
if project.get("owner_user_id") != ctx.user_id:
raise PermissionError("Можно удалять только свои личные проекты")
db.delete_project(project["id"], config)
def create_org_user(
ctx: UserContext,
username: str,
password: str,
role: str,
project_slugs: List[str],
) -> Dict[str, Any]:
config = load_config()
user = db.create_user(ctx.org_id, username, password, role=role, config=config)
if role == "user" and project_slugs:
projects = db.list_org_projects(ctx.org_id, config)
slug_to_id = {p["slug"]: p["id"] for p in projects}
project_ids = [slug_to_id[s] for s in project_slugs if s in slug_to_id]
db.set_user_projects(user["id"], project_ids, config)
user_row = db.get_user_by_id(user["id"], config)
return user_to_dict(_build_user_context(user_row, config))
def update_user_projects(ctx: UserContext, user_id: int, project_slugs: List[str]) -> Dict[str, Any]:
config = load_config()
target = db.get_user_by_id(user_id, config)
if not target or target["org_id"] != ctx.org_id:
raise ValueError("Пользователь не найден")
if target["role"] in ("admin", "director"):
return user_to_dict(_build_user_context(target, config))
projects = db.list_org_projects(ctx.org_id, config)
slug_to_id = {p["slug"]: p["id"] for p in projects}
project_ids = [slug_to_id[s] for s in project_slugs if s in slug_to_id]
db.set_user_projects(user_id, project_ids, config)
updated = db.get_user_by_id(user_id, config)
return user_to_dict(_build_user_context(updated, config))