"""Password hashing and JWT tokens.""" import os from datetime import datetime, timedelta, timezone from typing import Any, Dict, Optional import bcrypt from jose import JWTError, jwt ALGORITHM = "HS256" def get_jwt_secret(config: Optional[dict] = None) -> str: env_secret = os.getenv("JWT_SECRET") if env_secret: return env_secret if config: auth_cfg = config.get("auth", {}) secret = auth_cfg.get("jwt_secret") if secret: return secret return "dev-insecure-change-me" def get_jwt_expire_hours(config: Optional[dict] = None) -> int: if config: return int(config.get("auth", {}).get("jwt_expire_hours", 168)) return 168 def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: try: return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) except ValueError: return False def create_access_token( user_id: int, org_id: int, role: str, username: str, org_slug: str, config: Optional[dict] = None, ) -> str: expire_hours = get_jwt_expire_hours(config) expire = datetime.now(timezone.utc) + timedelta(hours=expire_hours) payload = { "sub": str(user_id), "org_id": org_id, "org_slug": org_slug, "role": role, "username": username, "exp": expire, } return jwt.encode(payload, get_jwt_secret(config), algorithm=ALGORITHM) def decode_access_token(token: str, config: Optional[dict] = None) -> Dict[str, Any]: return jwt.decode(token, get_jwt_secret(config), algorithms=[ALGORITHM]) def safe_decode_token(token: str, config: Optional[dict] = None) -> Optional[Dict[str, Any]]: try: return decode_access_token(token, config) except JWTError: return None