""" Open API authentication router. Provides token acquisition via API Key + Secret HMAC-SHA256 signature. """ from fastapi import APIRouter, Header, HTTPException, status from database import get_db_connection from schemas.open_auth import TokenResponse, TokenResponseData from services.api.open_auth_service import ( verify_signature, check_and_store_nonce, create_open_api_token, validate_timestamp, get_application_by_app_id, update_last_used, ) import logging logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/v1/open/auth", tags=["open-api-auth"], ) @router.post("/token", response_model=TokenResponse) async def get_access_token( x_api_key: str = Header(..., alias="X-Api-Key"), x_signature: str = Header(..., alias="X-Signature"), x_timestamp: str = Header(..., alias="X-Timestamp"), x_nonce: str = Header(..., alias="X-Nonce"), ): """获取 Access Token — 通过 API Key + Secret 签名认证""" with get_db_connection() as conn: app = get_application_by_app_id(conn, x_api_key) if not app: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"error_code": "INVALID_API_KEY", "message": "app_id 不存在"}, ) if app["status"] != "active": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"error_code": "APP_DISABLED", "message": "应用已被禁用"}, ) validate_timestamp(x_timestamp) if check_and_store_nonce(x_nonce): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"error_code": "NONCE_USED", "message": "Nonce 已被使用(重放攻击)"}, ) if not verify_signature(x_api_key, x_timestamp, x_nonce, x_signature, app["app_secret"]): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail={"error_code": "INVALID_SIGNATURE", "message": "签名验证失败"}, ) token = create_open_api_token(app["app_id"], app["app_name"]) update_last_used(conn, app["app_id"]) logger.info(f"Open API token issued for app_id={x_api_key}") return TokenResponse( code=0, message="success", data=TokenResponseData( access_token=token, token_type="Bearer", expires_in=7200, ), )