71 lines
2.3 KiB
Python
71 lines
2.3 KiB
Python
|
|
"""FastAPI auth dependencies."""
|
||
|
|
|
||
|
|
from typing import Optional
|
||
|
|
|
||
|
|
from fastapi import Depends, HTTPException, Query, status
|
||
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
||
|
|
|
||
|
|
from backend.auth.models import UserContext
|
||
|
|
from backend.auth.security import safe_decode_token
|
||
|
|
from backend.auth.service import get_user_context
|
||
|
|
from src.config import load_config
|
||
|
|
|
||
|
|
bearer_scheme = HTTPBearer(auto_error=False)
|
||
|
|
|
||
|
|
|
||
|
|
def _resolve_token(
|
||
|
|
credentials: Optional[HTTPAuthorizationCredentials],
|
||
|
|
token: Optional[str],
|
||
|
|
) -> Optional[str]:
|
||
|
|
if credentials and credentials.scheme.lower() == "bearer":
|
||
|
|
return credentials.credentials
|
||
|
|
if token:
|
||
|
|
return token
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
async def get_current_user(
|
||
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme),
|
||
|
|
token: Optional[str] = Query(None, alias="token"),
|
||
|
|
) -> UserContext:
|
||
|
|
raw_token = _resolve_token(credentials, token)
|
||
|
|
if not raw_token:
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
|
detail="Требуется авторизация",
|
||
|
|
headers={"WWW-Authenticate": "Bearer"},
|
||
|
|
)
|
||
|
|
|
||
|
|
config = load_config()
|
||
|
|
payload = safe_decode_token(raw_token, config)
|
||
|
|
if not payload:
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
|
detail="Недействительный или просроченный токен",
|
||
|
|
)
|
||
|
|
|
||
|
|
user_id = int(payload.get("sub", 0))
|
||
|
|
ctx = get_user_context(user_id)
|
||
|
|
if not ctx:
|
||
|
|
raise HTTPException(
|
||
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
|
|
detail="Пользователь не найден или деактивирован",
|
||
|
|
)
|
||
|
|
return ctx
|
||
|
|
|
||
|
|
|
||
|
|
async def require_admin(user: UserContext = Depends(get_current_user)) -> UserContext:
|
||
|
|
if not user.is_admin:
|
||
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Требуются права администратора")
|
||
|
|
return user
|
||
|
|
|
||
|
|
|
||
|
|
def get_user_from_token(token: Optional[str]) -> Optional[UserContext]:
|
||
|
|
if not token:
|
||
|
|
return None
|
||
|
|
config = load_config()
|
||
|
|
payload = safe_decode_token(token, config)
|
||
|
|
if not payload:
|
||
|
|
return None
|
||
|
|
return get_user_context(int(payload.get("sub", 0)))
|