| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- """
- Open API 认证中间件(依赖项)。
- 验证 Open API 的 Bearer JWT Token,提取 app_id。
- 与现有 AuthMiddleware 独立,不校验 role 字段。
- """
- import jwt
- import logging
- from fastapi import Depends, HTTPException, status, Header
- from config import settings
- logger = logging.getLogger(__name__)
- def verify_open_api_token(authorization: str = Header(...)) -> dict:
- """
- 验证 Open API Bearer Token。
- 从 Authorization Header 中解析 JWT,确认 type=open_api_access。
- 返回 decoded payload,包含 app_id, app_name 等。
- """
- parts = authorization.split()
- if len(parts) != 2 or parts[0].lower() != "bearer":
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={"error_code": "TOKEN_INVALID", "message": "无效的认证令牌格式"},
- )
- token = parts[1]
- try:
- payload = jwt.decode(
- token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]
- )
- except jwt.ExpiredSignatureError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={"error_code": "TOKEN_EXPIRED", "message": "Access Token 已过期"},
- )
- except jwt.InvalidTokenError:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={"error_code": "TOKEN_INVALID", "message": "无效的 Access Token"},
- )
- if payload.get("type") != "open_api_access":
- raise HTTPException(
- status_code=status.HTTP_403_FORBIDDEN,
- detail={"error_code": "TOKEN_INVALID", "message": "无权访问 Open API"},
- )
- return payload
|