""" Open API authentication service. Handles API Key + Secret HMAC-SHA256 signature verification and JWT token generation. """ import hmac import hashlib import time import logging import jwt from fastapi import HTTPException, status from config import settings logger = logging.getLogger(__name__) _NONCE_CACHE: dict[str, int] = {} TIMESTAMP_TOLERANCE = 300 # ±5 minutes TOKEN_EXPIRE_SECONDS = 7200 # 2 hours def verify_signature(app_id: str, timestamp: str, nonce: str, signature: str, app_secret: str) -> bool: """Verify HMAC-SHA256 signature.""" message = f"{app_id}{timestamp}{nonce}" expected = hmac.new(app_secret.encode(), message.encode(), hashlib.sha256).hexdigest() return hmac.compare_digest(expected, signature) def check_and_store_nonce(nonce: str) -> bool: """Return True if nonce is duplicate (should reject). Stores nonce with expiry.""" now = int(time.time()) expired = [n for n, exp in _NONCE_CACHE.items() if exp < now] for n in expired: del _NONCE_CACHE[n] if nonce in _NONCE_CACHE: return True _NONCE_CACHE[nonce] = now + TIMESTAMP_TOLERANCE * 2 return False def create_open_api_token(app_id: str, app_name: str) -> str: """Create a JWT access token for Open API access.""" import datetime expire = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=TOKEN_EXPIRE_SECONDS) payload = { "sub": app_id, "app_id": app_id, "app_name": app_name, "iat": int(time.time()), "exp": int(expire.timestamp()), "type": "open_api_access", } return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) def validate_timestamp(timestamp_str: str) -> int: """Validate that the request timestamp is within tolerance.""" try: ts = int(timestamp_str) except (ValueError, TypeError): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"error_code": "TIMESTAMP_EXPIRED", "message": "时间戳格式无效"}, ) if abs(int(time.time()) - ts) > TIMESTAMP_TOLERANCE: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"error_code": "TIMESTAMP_EXPIRED", "message": "时间戳过期(超过 ±5 分钟)"}, ) return ts def get_application_by_app_id(db_conn, app_id: str) -> dict | None: """Look up an application by app_id.""" cursor = db_conn.cursor() cursor.execute( "SELECT id, app_id, app_name, app_secret, status FROM api_applications WHERE app_id = %s", (app_id,), ) row = cursor.fetchone() return dict(row) if row else None def update_last_used(db_conn, app_id: str): """Update the last_used_at timestamp for an application.""" cursor = db_conn.cursor() cursor.execute("UPDATE api_applications SET last_used_at = NOW() WHERE app_id = %s", (app_id,))