| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- """
- Open API authentication router.
- Provides token acquisition via API Key + Secret HMAC-SHA256 signature.
- """
- from fastapi import APIRouter, Header, HTTPException, status
- from database import get_db_connection
- from schemas.open_auth import TokenResponse, TokenResponseData
- from services.api.open_auth_service import (
- verify_signature,
- check_and_store_nonce,
- create_open_api_token,
- validate_timestamp,
- get_application_by_app_id,
- update_last_used,
- )
- import logging
- logger = logging.getLogger(__name__)
- router = APIRouter(
- prefix="/api/v1/open/auth",
- tags=["open-api-auth"],
- )
- @router.post("/token", response_model=TokenResponse)
- async def get_access_token(
- x_api_key: str = Header(..., alias="X-Api-Key"),
- x_signature: str = Header(..., alias="X-Signature"),
- x_timestamp: str = Header(..., alias="X-Timestamp"),
- x_nonce: str = Header(..., alias="X-Nonce"),
- ):
- """获取 Access Token — 通过 API Key + Secret 签名认证"""
- with get_db_connection() as conn:
- app = get_application_by_app_id(conn, x_api_key)
- if not app:
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={"error_code": "INVALID_API_KEY", "message": "app_id 不存在"},
- )
- if app["status"] != "active":
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={"error_code": "APP_DISABLED", "message": "应用已被禁用"},
- )
- validate_timestamp(x_timestamp)
- if check_and_store_nonce(x_nonce):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={"error_code": "NONCE_USED", "message": "Nonce 已被使用(重放攻击)"},
- )
- if not verify_signature(x_api_key, x_timestamp, x_nonce, x_signature, app["app_secret"]):
- raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail={"error_code": "INVALID_SIGNATURE", "message": "签名验证失败"},
- )
- token = create_open_api_token(app["app_id"], app["app_name"])
- update_last_used(conn, app["app_id"])
- logger.info(f"Open API token issued for app_id={x_api_key}")
- return TokenResponse(
- code=0,
- message="success",
- data=TokenResponseData(
- access_token=token,
- token_type="Bearer",
- expires_in=7200,
- ),
- )
|