open_middleware.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. """
  2. Open API 认证中间件(依赖项)。
  3. 验证 Open API 的 Bearer JWT Token,提取 app_id。
  4. 与现有 AuthMiddleware 独立,不校验 role 字段。
  5. """
  6. import jwt
  7. import logging
  8. from fastapi import Depends, HTTPException, status, Header
  9. from config import settings
  10. logger = logging.getLogger(__name__)
  11. def verify_open_api_token(authorization: str = Header(...)) -> dict:
  12. """
  13. 验证 Open API Bearer Token。
  14. 从 Authorization Header 中解析 JWT,确认 type=open_api_access。
  15. 返回 decoded payload,包含 app_id, app_name 等。
  16. """
  17. parts = authorization.split()
  18. if len(parts) != 2 or parts[0].lower() != "bearer":
  19. raise HTTPException(
  20. status_code=status.HTTP_401_UNAUTHORIZED,
  21. detail={"error_code": "TOKEN_INVALID", "message": "无效的认证令牌格式"},
  22. )
  23. token = parts[1]
  24. try:
  25. payload = jwt.decode(
  26. token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]
  27. )
  28. except jwt.ExpiredSignatureError:
  29. raise HTTPException(
  30. status_code=status.HTTP_401_UNAUTHORIZED,
  31. detail={"error_code": "TOKEN_EXPIRED", "message": "Access Token 已过期"},
  32. )
  33. except jwt.InvalidTokenError:
  34. raise HTTPException(
  35. status_code=status.HTTP_401_UNAUTHORIZED,
  36. detail={"error_code": "TOKEN_INVALID", "message": "无效的 Access Token"},
  37. )
  38. if payload.get("type") != "open_api_access":
  39. raise HTTPException(
  40. status_code=status.HTTP_403_FORBIDDEN,
  41. detail={"error_code": "TOKEN_INVALID", "message": "无权访问 Open API"},
  42. )
  43. return payload