| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- """
- Open API authentication service.
- Handles API Key + Secret HMAC-SHA256 signature verification and JWT token generation.
- """
- import hmac
- import hashlib
- import time
- import logging
- import jwt
- from fastapi import HTTPException, status
- from config import settings
- logger = logging.getLogger(__name__)
- _NONCE_CACHE: dict[str, int] = {}
- TIMESTAMP_TOLERANCE = 300 # ±5 minutes
- TOKEN_EXPIRE_SECONDS = 7200 # 2 hours
- def verify_signature(app_id: str, timestamp: str, nonce: str, signature: str, app_secret: str) -> bool:
- """Verify HMAC-SHA256 signature."""
- message = f"{app_id}{timestamp}{nonce}"
- expected = hmac.new(app_secret.encode(), message.encode(), hashlib.sha256).hexdigest()
- return hmac.compare_digest(expected, signature)
- def check_and_store_nonce(nonce: str) -> bool:
- """Return True if nonce is duplicate (should reject). Stores nonce with expiry."""
- now = int(time.time())
- expired = [n for n, exp in _NONCE_CACHE.items() if exp < now]
- for n in expired:
- del _NONCE_CACHE[n]
- if nonce in _NONCE_CACHE:
- return True
- _NONCE_CACHE[nonce] = now + TIMESTAMP_TOLERANCE * 2
- return False
- def create_open_api_token(app_id: str, app_name: str) -> str:
- """Create a JWT access token for Open API access."""
- import datetime
- expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=TOKEN_EXPIRE_SECONDS)
- payload = {
- "sub": app_id,
- "app_id": app_id,
- "app_name": app_name,
- "iat": int(time.time()),
- "exp": int(expire.timestamp()),
- "type": "open_api_access",
- }
- return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
- def validate_timestamp(timestamp_str: str) -> int:
- """Validate that the request timestamp is within tolerance."""
- try:
- ts = int(timestamp_str)
- except (ValueError, TypeError):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={"error_code": "TIMESTAMP_EXPIRED", "message": "时间戳格式无效"},
- )
- if abs(int(time.time()) - ts) > TIMESTAMP_TOLERANCE:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={"error_code": "TIMESTAMP_EXPIRED", "message": "时间戳过期(超过 ±5 分钟)"},
- )
- return ts
- def get_application_by_app_id(db_conn, app_id: str) -> dict | None:
- """Look up an application by app_id."""
- cursor = db_conn.cursor()
- cursor.execute(
- "SELECT id, app_id, app_name, app_secret, status FROM api_applications WHERE app_id = %s",
- (app_id,),
- )
- row = cursor.fetchone()
- return dict(row) if row else None
- def update_last_used(db_conn, app_id: str):
- """Update the last_used_at timestamp for an application."""
- cursor = db_conn.cursor()
- cursor.execute("UPDATE api_applications SET last_used_at = NOW() WHERE app_id = %s", (app_id,))
|