open_auth_service.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. """
  2. Open API authentication service.
  3. Handles API Key + Secret HMAC-SHA256 signature verification and JWT token generation.
  4. """
  5. import hmac
  6. import hashlib
  7. import time
  8. import logging
  9. import jwt
  10. from fastapi import HTTPException, status
  11. from config import settings
  12. logger = logging.getLogger(__name__)
  13. _NONCE_CACHE: dict[str, int] = {}
  14. TIMESTAMP_TOLERANCE = 300 # ±5 minutes
  15. TOKEN_EXPIRE_SECONDS = 7200 # 2 hours
  16. def verify_signature(app_id: str, timestamp: str, nonce: str, signature: str, app_secret: str) -> bool:
  17. """Verify HMAC-SHA256 signature."""
  18. message = f"{app_id}{timestamp}{nonce}"
  19. expected = hmac.new(app_secret.encode(), message.encode(), hashlib.sha256).hexdigest()
  20. return hmac.compare_digest(expected, signature)
  21. def check_and_store_nonce(nonce: str) -> bool:
  22. """Return True if nonce is duplicate (should reject). Stores nonce with expiry."""
  23. now = int(time.time())
  24. expired = [n for n, exp in _NONCE_CACHE.items() if exp < now]
  25. for n in expired:
  26. del _NONCE_CACHE[n]
  27. if nonce in _NONCE_CACHE:
  28. return True
  29. _NONCE_CACHE[nonce] = now + TIMESTAMP_TOLERANCE * 2
  30. return False
  31. def create_open_api_token(app_id: str, app_name: str) -> str:
  32. """Create a JWT access token for Open API access."""
  33. import datetime
  34. expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=TOKEN_EXPIRE_SECONDS)
  35. payload = {
  36. "sub": app_id,
  37. "app_id": app_id,
  38. "app_name": app_name,
  39. "iat": int(time.time()),
  40. "exp": int(expire.timestamp()),
  41. "type": "open_api_access",
  42. }
  43. return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
  44. def validate_timestamp(timestamp_str: str) -> int:
  45. """Validate that the request timestamp is within tolerance."""
  46. try:
  47. ts = int(timestamp_str)
  48. except (ValueError, TypeError):
  49. raise HTTPException(
  50. status_code=status.HTTP_401_UNAUTHORIZED,
  51. detail={"error_code": "TIMESTAMP_EXPIRED", "message": "时间戳格式无效"},
  52. )
  53. if abs(int(time.time()) - ts) > TIMESTAMP_TOLERANCE:
  54. raise HTTPException(
  55. status_code=status.HTTP_401_UNAUTHORIZED,
  56. detail={"error_code": "TIMESTAMP_EXPIRED", "message": "时间戳过期(超过 ±5 分钟)"},
  57. )
  58. return ts
  59. def get_application_by_app_id(db_conn, app_id: str) -> dict | None:
  60. """Look up an application by app_id."""
  61. cursor = db_conn.cursor()
  62. cursor.execute(
  63. "SELECT id, app_id, app_name, app_secret, status FROM api_applications WHERE app_id = %s",
  64. (app_id,),
  65. )
  66. row = cursor.fetchone()
  67. return dict(row) if row else None
  68. def update_last_used(db_conn, app_id: str):
  69. """Update the last_used_at timestamp for an application."""
  70. cursor = db_conn.cursor()
  71. cursor.execute("UPDATE api_applications SET last_used_at = NOW() WHERE app_id = %s", (app_id,))