transcription/backend/auth/deps.py

71 lines
2.3 KiB
Python
Raw Permalink Normal View History

"""FastAPI auth dependencies."""
from typing import Optional
from fastapi import Depends, HTTPException, Query, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from backend.auth.models import UserContext
from backend.auth.security import safe_decode_token
from backend.auth.service import get_user_context
from src.config import load_config
bearer_scheme = HTTPBearer(auto_error=False)
def _resolve_token(
credentials: Optional[HTTPAuthorizationCredentials],
token: Optional[str],
) -> Optional[str]:
if credentials and credentials.scheme.lower() == "bearer":
return credentials.credentials
if token:
return token
return None
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme),
token: Optional[str] = Query(None, alias="token"),
) -> UserContext:
raw_token = _resolve_token(credentials, token)
if not raw_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Требуется авторизация",
headers={"WWW-Authenticate": "Bearer"},
)
config = load_config()
payload = safe_decode_token(raw_token, config)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Недействительный или просроченный токен",
)
user_id = int(payload.get("sub", 0))
ctx = get_user_context(user_id)
if not ctx:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Пользователь не найден или деактивирован",
)
return ctx
async def require_admin(user: UserContext = Depends(get_current_user)) -> UserContext:
if not user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Требуются права администратора")
return user
def get_user_from_token(token: Optional[str]) -> Optional[UserContext]:
if not token:
return None
config = load_config()
payload = safe_decode_token(token, config)
if not payload:
return None
return get_user_context(int(payload.get("sub", 0)))