""" 认证依赖 提供JWT令牌验证和用户获取的依赖注入 需求: 5.1, 5.4, 6.1, 6.2, 6.3, 6.4 """ import json import logging from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError from sqlalchemy.orm import Session from app.database import get_db from app.models.user import User from app.models.admin import AdminUser from app.services.auth_service import AuthService from app.services.user_service import UserService from app.services.token_revocation_service import token_revocation_service logger = logging.getLogger(__name__) oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") admin_oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/auth/login") # 用户信息缓存 TTL(秒),与 JWT 有效期远小于 24h,5 分钟足够 _USER_CACHE_TTL = 300 _USER_CACHE_PREFIX = "auth_user:" def _get_user_from_cache(user_id: str): """从 Redis 同步缓存获取用户基本信息,未命中返回 None。""" try: from app.core.redis import redis_manager r = redis_manager.get_sync_client() if not r: return None raw = r.get(f"{_USER_CACHE_PREFIX}{user_id}") if not raw: return None data = json.loads(raw) # 反序列化 datetime 字段 from datetime import datetime, date for dt_field in ("created_at", "updated_at"): if data.get(dt_field): data[dt_field] = datetime.fromisoformat(data[dt_field]) if data.get("registration_date"): data["registration_date"] = date.fromisoformat(data["registration_date"]) # 用 SimpleNamespace 构造轻量对象,避免触发 SQLAlchemy ORM 机制 from types import SimpleNamespace user = SimpleNamespace(**data) return user except Exception as e: logger.debug(f"用户缓存读取失败: {e}") return None def _set_user_cache(user: User) -> None: """将用户基本信息写入 Redis 缓存。""" try: from app.core.redis import redis_manager r = redis_manager.get_sync_client() if not r: return data = { "id": user.id, "username": user.username, "nickname": user.nickname, "email": user.email, "phone": user.phone, "apikey": user.apikey, "status": user.status, "avatar": user.avatar, "created_at": user.created_at.isoformat() if user.created_at else None, "updated_at": user.updated_at.isoformat() if user.updated_at else None, "registration_date": user.registration_date.isoformat() if getattr(user, "registration_date", None) else None, } r.setex(f"{_USER_CACHE_PREFIX}{user.id}", _USER_CACHE_TTL, json.dumps(data)) except Exception as e: logger.debug(f"用户缓存写入失败: {e}") def invalidate_user_cache(user_id: str) -> None: """主动清除用户缓存(用户信息更新时调用)。""" try: from app.core.redis import redis_manager r = redis_manager.get_sync_client() if r: r.delete(f"{_USER_CACHE_PREFIX}{user_id}") except Exception: pass def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> User: """从JWT令牌获取当前用户。""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = AuthService.verify_token(token) if token_revocation_service.is_payload_revoked(payload): raise credentials_exception user_id = payload.get("user_id") if not user_id: raise credentials_exception except JWTError: raise credentials_exception user_service = UserService(db) user = user_service.get_user_by_id(user_id) if not user: raise credentials_exception return user def get_current_apikey(user: User = Depends(get_current_user)) -> str: """获取当前用户的APIkey""" if not user.apikey: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No APIkey configured" ) return user.apikey def get_current_admin( token: str = Depends(admin_oauth2_scheme), db: Session = Depends(get_db) ) -> AdminUser: """从JWT令牌获取当前管理员""" from app.services.admin_auth_service import AdminAuthService credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid admin credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = AdminAuthService.verify_token(token) admin_id = payload.get("admin_id") if not admin_id: raise credentials_exception except JWTError: raise credentials_exception admin = db.query(AdminUser).filter(AdminUser.id == admin_id).first() if not admin or admin.status != "active": raise credentials_exception return admin