transcription/backend/auth/deps.py
keboss-m 8df14e3102 Add multi-tenant auth with org projects, roles, and personal workspaces.
JWT login, org-scoped storage and RAG, admin/director/user roles, user-owned projects, login UI, and legacy data migration.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-01 18:54:25 +03:00

71 lines
2.3 KiB
Python

"""FastAPI auth dependencies."""
from typing import Optional
from fastapi import Depends, HTTPException, Query, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from backend.auth.models import UserContext
from backend.auth.security import safe_decode_token
from backend.auth.service import get_user_context
from src.config import load_config
bearer_scheme = HTTPBearer(auto_error=False)
def _resolve_token(
credentials: Optional[HTTPAuthorizationCredentials],
token: Optional[str],
) -> Optional[str]:
if credentials and credentials.scheme.lower() == "bearer":
return credentials.credentials
if token:
return token
return None
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme),
token: Optional[str] = Query(None, alias="token"),
) -> UserContext:
raw_token = _resolve_token(credentials, token)
if not raw_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Требуется авторизация",
headers={"WWW-Authenticate": "Bearer"},
)
config = load_config()
payload = safe_decode_token(raw_token, config)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Недействительный или просроченный токен",
)
user_id = int(payload.get("sub", 0))
ctx = get_user_context(user_id)
if not ctx:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Пользователь не найден или деактивирован",
)
return ctx
async def require_admin(user: UserContext = Depends(get_current_user)) -> UserContext:
if not user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Требуются права администратора")
return user
def get_user_from_token(token: Optional[str]) -> Optional[UserContext]:
if not token:
return None
config = load_config()
payload = safe_decode_token(token, config)
if not payload:
return None
return get_user_context(int(payload.get("sub", 0)))