auth_middleware.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from fastapi import Request, status
  2. from fastapi.responses import JSONResponse
  3. from .logger import logger
  4. from .token import verify_token
  5. async def auth_middleware(request: Request, call_next):
  6. """统一 token 认证中间件。"""
  7. whitelist_paths = [
  8. "/",
  9. "/health",
  10. "/docs",
  11. "/redoc",
  12. "/openapi.json",
  13. "/static",
  14. "/assets",
  15. "/apiv1/auth/local_login",
  16. "/apiv1/auth/register",
  17. ]
  18. path = request.url.path
  19. for whitelist_path in whitelist_paths:
  20. if path.startswith(whitelist_path):
  21. request.state.user = None
  22. return await call_next(request)
  23. auth_header = (request.headers.get("Authorization") or "").strip()
  24. token = request.headers.get("token") or request.headers.get("Token") or auth_header
  25. if auth_header.lower().startswith("bearer "):
  26. token = auth_header[7:].strip()
  27. logger.info(f"认证中间件 - 路径: {path}")
  28. logger.info(f"认证中间件 - Token (前20字符): {token[:20] if token else 'None'}...")
  29. if not token:
  30. logger.warning("认证中间件 - 未提供token")
  31. return JSONResponse(
  32. status_code=status.HTTP_401_UNAUTHORIZED,
  33. content={"statusCode": 401, "msg": "未提供认证Token"},
  34. )
  35. user_info = await verify_token(token)
  36. if not user_info:
  37. logger.error("认证中间件 - Token验证失败,返回401")
  38. return JSONResponse(
  39. status_code=status.HTTP_401_UNAUTHORIZED,
  40. content={"statusCode": 401, "msg": "Token验证失败"},
  41. )
  42. logger.info(f"认证中间件 - Token验证成功,用户 {user_info.username} ({user_info.account})")
  43. request.state.user = user_info
  44. return await call_next(request)