from fastapi import Request, status from fastapi.responses import JSONResponse from .logger import logger from .token import verify_token async def auth_middleware(request: Request, call_next): """统一 token 认证中间件。""" whitelist_paths = [ "/", "/health", "/docs", "/redoc", "/openapi.json", "/static", "/assets", "/apiv1/auth/local_login", "/apiv1/auth/register", ] path = request.url.path for whitelist_path in whitelist_paths: if path.startswith(whitelist_path): request.state.user = None return await call_next(request) auth_header = (request.headers.get("Authorization") or "").strip() token = request.headers.get("token") or request.headers.get("Token") or auth_header if auth_header.lower().startswith("bearer "): token = auth_header[7:].strip() logger.info(f"认证中间件 - 路径: {path}") logger.info(f"认证中间件 - Token (前20字符): {token[:20] if token else 'None'}...") if not token: logger.warning("认证中间件 - 未提供token") return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"statusCode": 401, "msg": "未提供认证Token"}, ) user_info = await verify_token(token) if not user_info: logger.error("认证中间件 - Token验证失败,返回401") return JSONResponse( status_code=status.HTTP_401_UNAUTHORIZED, content={"statusCode": 401, "msg": "Token验证失败"}, ) logger.info(f"认证中间件 - Token验证成功,用户 {user_info.username} ({user_info.account})") request.state.user = user_info return await call_next(request)