| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- """
- 认证依赖
- 提供JWT令牌验证和用户获取的依赖注入
- 需求: 5.1, 5.4, 6.1, 6.2, 6.3, 6.4
- """
- import json
- import logging
- from fastapi import Depends, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer
- from jose import JWTError
- from sqlalchemy.orm import Session
- from app.database import get_db
- from app.models.user import User
- from app.models.admin import AdminUser
- from app.services.auth_service import AuthService
- from app.services.user_service import UserService
- from app.services.token_revocation_service import token_revocation_service
- logger = logging.getLogger(__name__)
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
- admin_oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/auth/login")
- # 用户信息缓存 TTL(秒),与 JWT 有效期远小于 24h,5 分钟足够
- _USER_CACHE_TTL = 300
- _USER_CACHE_PREFIX = "auth_user:"
- def _get_user_from_cache(user_id: str):
- """从 Redis 同步缓存获取用户基本信息,未命中返回 None。"""
- try:
- from app.core.redis import redis_manager
- r = redis_manager.get_sync_client()
- if not r:
- return None
- raw = r.get(f"{_USER_CACHE_PREFIX}{user_id}")
- if not raw:
- return None
- data = json.loads(raw)
- # 反序列化 datetime 字段
- from datetime import datetime, date
- for dt_field in ("created_at", "updated_at"):
- if data.get(dt_field):
- data[dt_field] = datetime.fromisoformat(data[dt_field])
- if data.get("registration_date"):
- data["registration_date"] = date.fromisoformat(data["registration_date"])
- # 用 SimpleNamespace 构造轻量对象,避免触发 SQLAlchemy ORM 机制
- from types import SimpleNamespace
- user = SimpleNamespace(**data)
- return user
- except Exception as e:
- logger.debug(f"用户缓存读取失败: {e}")
- return None
- def _set_user_cache(user: User) -> None:
- """将用户基本信息写入 Redis 缓存。"""
- try:
- from app.core.redis import redis_manager
- r = redis_manager.get_sync_client()
- if not r:
- return
- data = {
- "id": user.id,
- "username": user.username,
- "nickname": user.nickname,
- "email": user.email,
- "phone": user.phone,
- "apikey": user.apikey,
- "status": user.status,
- "avatar": user.avatar,
- "created_at": user.created_at.isoformat() if user.created_at else None,
- "updated_at": user.updated_at.isoformat() if user.updated_at else None,
- "registration_date": user.registration_date.isoformat() if getattr(user, "registration_date", None) else None,
- }
- r.setex(f"{_USER_CACHE_PREFIX}{user.id}", _USER_CACHE_TTL, json.dumps(data))
- except Exception as e:
- logger.debug(f"用户缓存写入失败: {e}")
- def invalidate_user_cache(user_id: str) -> None:
- """主动清除用户缓存(用户信息更新时调用)。"""
- try:
- from app.core.redis import redis_manager
- r = redis_manager.get_sync_client()
- if r:
- r.delete(f"{_USER_CACHE_PREFIX}{user_id}")
- except Exception:
- pass
- def get_current_user(
- token: str = Depends(oauth2_scheme),
- db: Session = Depends(get_db)
- ) -> User:
- """从JWT令牌获取当前用户。"""
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = AuthService.verify_token(token)
- if token_revocation_service.is_payload_revoked(payload):
- raise credentials_exception
- user_id = payload.get("user_id")
- if not user_id:
- raise credentials_exception
- except JWTError:
- raise credentials_exception
- user_service = UserService(db)
- user = user_service.get_user_by_id(user_id)
- if not user:
- raise credentials_exception
- return user
- def get_current_apikey(user: User = Depends(get_current_user)) -> str:
- """获取当前用户的APIkey"""
- if not user.apikey:
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail="No APIkey configured"
- )
- return user.apikey
- def get_current_admin(
- token: str = Depends(admin_oauth2_scheme),
- db: Session = Depends(get_db)
- ) -> AdminUser:
- """从JWT令牌获取当前管理员"""
- from app.services.admin_auth_service import AdminAuthService
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Invalid admin credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- payload = AdminAuthService.verify_token(token)
- admin_id = payload.get("admin_id")
- if not admin_id:
- raise credentials_exception
- except JWTError:
- raise credentials_exception
- admin = db.query(AdminUser).filter(AdminUser.id == admin_id).first()
- if not admin or admin.status != "active":
- raise credentials_exception
- return admin
|