transcription/backend/auth/service.py

181 lines
6.4 KiB
Python
Raw Normal View History

"""Auth business logic."""
import re
from typing import Any, Dict, List, Optional
from backend.auth import database as db
from backend.auth.models import UserContext
from backend.auth.security import create_access_token, verify_password
from src.config import load_config
def normalize_project_slug(slug: str) -> str:
value = slug.strip().lower()
value = re.sub(r"[^\w\-]", "-", value)
value = re.sub(r"-+", "-", value).strip("-_")
if not value:
raise ValueError("Некорректный slug проекта")
return value
def _build_user_context(row: Dict[str, Any], config: Optional[dict] = None) -> UserContext:
user_id = row["id"]
return UserContext(
user_id=user_id,
username=row["username"],
role=row["role"],
org_id=row["org_id"],
org_slug=row["org_slug"],
org_name=row["org_name"],
project_slugs=db.get_user_project_slugs(user_id, config=config),
owned_project_slugs=db.get_owned_project_slugs(user_id, config=config),
)
def _enrich_project(project: Dict[str, Any], ctx: UserContext) -> Dict[str, Any]:
item = dict(project)
item["is_owner"] = project.get("owner_user_id") == ctx.user_id
item["scope"] = project.get("scope") or ("personal" if project.get("owner_user_id") else "org")
return item
def authenticate(org_slug: str, username: str, password: str) -> Optional[Dict[str, Any]]:
config = load_config()
org = db.get_org_by_slug(org_slug.strip().lower(), config)
if not org:
return None
user = db.get_user_by_username(org["id"], username.strip(), config)
if not user or not user.get("is_active"):
return None
if not verify_password(password, user["password_hash"]):
return None
ctx = _build_user_context(user, config)
token = create_access_token(
user_id=ctx.user_id,
org_id=ctx.org_id,
role=ctx.role,
username=ctx.username,
org_slug=ctx.org_slug,
config=config,
)
return {
"access_token": token,
"token_type": "bearer",
"user": user_to_dict(ctx),
}
def get_user_context(user_id: int) -> Optional[UserContext]:
config = load_config()
row = db.get_user_by_id(user_id, config)
if not row or not row.get("is_active"):
return None
return _build_user_context(row, config)
def user_to_dict(ctx: UserContext) -> Dict[str, Any]:
return {
"id": ctx.user_id,
"username": ctx.username,
"role": ctx.role,
"org_id": ctx.org_id,
"org_slug": ctx.org_slug,
"org_name": ctx.org_name,
"shared_projects": ctx.project_slugs,
"owned_projects": ctx.owned_project_slugs,
"projects": list(ctx.accessible_project_slugs) if not ctx.has_all_projects_access else [],
"is_admin": ctx.is_admin,
"is_director": ctx.is_director,
"all_projects_access": ctx.has_all_projects_access,
}
def list_accessible_projects(ctx: UserContext) -> List[Dict[str, Any]]:
config = load_config()
all_projects = db.list_projects(ctx.org_id, config)
if ctx.has_all_projects_access:
return [_enrich_project(p, ctx) for p in all_projects]
result = []
assigned = set(ctx.project_slugs)
owned = set(ctx.owned_project_slugs)
for project in all_projects:
slug = project["slug"]
if project.get("owner_user_id") == ctx.user_id or slug in owned:
result.append(_enrich_project(project, ctx))
elif not project.get("owner_user_id") and slug in assigned:
result.append(_enrich_project(project, ctx))
return sorted(result, key=lambda p: (p["scope"] != "personal", p["slug"]))
def ensure_project_access(ctx: UserContext, project_slug: str) -> None:
slug = project_slug.strip().lower()
if not ctx.can_access_project(slug):
raise PermissionError(f"Нет доступа к проекту: {slug}")
def create_personal_project(ctx: UserContext, slug: str, name: str) -> Dict[str, Any]:
config = load_config()
normalized = normalize_project_slug(slug)
display_name = name.strip() or normalized
if db.get_project_by_slug(ctx.org_id, normalized, config):
raise ValueError("Проект с таким slug уже существует")
project = db.create_project(
ctx.org_id,
normalized,
display_name,
owner_user_id=ctx.user_id,
config=config,
)
return _enrich_project(project, ctx)
def delete_personal_project(ctx: UserContext, slug: str) -> None:
config = load_config()
normalized = normalize_project_slug(slug)
project = db.get_project_by_slug(ctx.org_id, normalized, config)
if not project:
raise ValueError("Проект не найден")
if ctx.is_admin:
db.delete_project(project["id"], config)
return
if project.get("owner_user_id") != ctx.user_id:
raise PermissionError("Можно удалять только свои личные проекты")
db.delete_project(project["id"], config)
def create_org_user(
ctx: UserContext,
username: str,
password: str,
role: str,
project_slugs: List[str],
) -> Dict[str, Any]:
config = load_config()
user = db.create_user(ctx.org_id, username, password, role=role, config=config)
if role == "user" and project_slugs:
projects = db.list_org_projects(ctx.org_id, config)
slug_to_id = {p["slug"]: p["id"] for p in projects}
project_ids = [slug_to_id[s] for s in project_slugs if s in slug_to_id]
db.set_user_projects(user["id"], project_ids, config)
user_row = db.get_user_by_id(user["id"], config)
return user_to_dict(_build_user_context(user_row, config))
def update_user_projects(ctx: UserContext, user_id: int, project_slugs: List[str]) -> Dict[str, Any]:
config = load_config()
target = db.get_user_by_id(user_id, config)
if not target or target["org_id"] != ctx.org_id:
raise ValueError("Пользователь не найден")
if target["role"] in ("admin", "director"):
return user_to_dict(_build_user_context(target, config))
projects = db.list_org_projects(ctx.org_id, config)
slug_to_id = {p["slug"]: p["id"] for p in projects}
project_ids = [slug_to_id[s] for s in project_slugs if s in slug_to_id]
db.set_user_projects(user_id, project_ids, config)
updated = db.get_user_by_id(user_id, config)
return user_to_dict(_build_user_context(updated, config))