open_auth_view.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. """
  2. Open API authentication router.
  3. Provides token acquisition via API Key + Secret HMAC-SHA256 signature.
  4. """
  5. from fastapi import APIRouter, Header, HTTPException, status
  6. from database import get_db_connection
  7. from schemas.open_auth import TokenResponse, TokenResponseData
  8. from services.api.open_auth_service import (
  9. verify_signature,
  10. check_and_store_nonce,
  11. create_open_api_token,
  12. validate_timestamp,
  13. get_application_by_app_id,
  14. update_last_used,
  15. )
  16. import logging
  17. logger = logging.getLogger(__name__)
  18. router = APIRouter(
  19. prefix="/api/v1/open/auth",
  20. tags=["open-api-auth"],
  21. )
  22. @router.post("/token", response_model=TokenResponse)
  23. async def get_access_token(
  24. x_api_key: str = Header(..., alias="X-Api-Key"),
  25. x_signature: str = Header(..., alias="X-Signature"),
  26. x_timestamp: str = Header(..., alias="X-Timestamp"),
  27. x_nonce: str = Header(..., alias="X-Nonce"),
  28. ):
  29. """获取 Access Token — 通过 API Key + Secret 签名认证"""
  30. with get_db_connection() as conn:
  31. app = get_application_by_app_id(conn, x_api_key)
  32. if not app:
  33. raise HTTPException(
  34. status_code=status.HTTP_401_UNAUTHORIZED,
  35. detail={"error_code": "INVALID_API_KEY", "message": "app_id 不存在"},
  36. )
  37. if app["status"] != "active":
  38. raise HTTPException(
  39. status_code=status.HTTP_401_UNAUTHORIZED,
  40. detail={"error_code": "APP_DISABLED", "message": "应用已被禁用"},
  41. )
  42. validate_timestamp(x_timestamp)
  43. if check_and_store_nonce(x_nonce):
  44. raise HTTPException(
  45. status_code=status.HTTP_401_UNAUTHORIZED,
  46. detail={"error_code": "NONCE_USED", "message": "Nonce 已被使用(重放攻击)"},
  47. )
  48. if not verify_signature(x_api_key, x_timestamp, x_nonce, x_signature, app["app_secret"]):
  49. raise HTTPException(
  50. status_code=status.HTTP_401_UNAUTHORIZED,
  51. detail={"error_code": "INVALID_SIGNATURE", "message": "签名验证失败"},
  52. )
  53. token = create_open_api_token(app["app_id"], app["app_name"])
  54. update_last_used(conn, app["app_id"])
  55. logger.info(f"Open API token issued for app_id={x_api_key}")
  56. return TokenResponse(
  57. code=0,
  58. message="success",
  59. data=TokenResponseData(
  60. access_token=token,
  61. token_type="Bearer",
  62. expires_in=7200,
  63. ),
  64. )