""" Token 解析与认证工具 支持本地登录 token 和 4A 外部 token。 """ from dataclasses import dataclass, field import time from typing import Any, Dict, Optional, Tuple import httpx import jwt from sqlalchemy.exc import OperationalError from utils.config import settings from utils.logger import logger LOCAL_JWT_SECRET = "shudao-local-jwt-secret-2024" LOCAL_JWT_ALGORITHM = "HS256" EXTERNAL_TOKEN_CACHE_TTL = 60 USER_ID_CACHE_TTL = 300 _external_token_cache: Dict[str, Tuple[float, "TokenUserInfo"]] = {} _external_user_id_cache: Dict[str, Tuple[float, int]] = {} @dataclass class TokenUserInfo: """统一的 request.state.user 对象。""" user_id: int account: str username: str role: str = "user" source: str = "external" name: str = "" account_id: str = "" user_code: str = "" contact_number: str = "" raw: Dict[str, Any] = field(default_factory=dict) @property def id(self) -> int: return self.user_id @property def userCode(self) -> str: """兼容旧代码中的 camelCase 字段访问。""" return ( self.user_code or self.account_id or self.account or self.username or str(self.user_id) ) @property def accountID(self) -> str: """兼容旧代码中的 camelCase 字段访问。""" return self.account_id or self.account or self.username @property def contactNumber(self) -> str: """兼容旧代码中的 camelCase 字段访问。""" return self.contact_number def _normalize_token(token: str) -> str: token = (token or "").strip() if token.lower().startswith("bearer "): return token[7:].strip() return token def _get_cached_external_user(token: str) -> Optional["TokenUserInfo"]: cache_item = _external_token_cache.get(token) if not cache_item: return None expires_at, user_info = cache_item if expires_at <= time.time(): _external_token_cache.pop(token, None) return None return user_info def _cache_external_user(token: str, user_info: "TokenUserInfo", exp: Any = None) -> None: ttl = EXTERNAL_TOKEN_CACHE_TTL try: if exp: ttl = max(1, min(EXTERNAL_TOKEN_CACHE_TTL, int(exp) - int(time.time()))) except Exception: ttl = EXTERNAL_TOKEN_CACHE_TTL _external_token_cache[token] = (time.time() + ttl, user_info) def _get_cached_external_user_id(account_id: str) -> Optional[int]: cache_item = _external_user_id_cache.get(account_id) if not cache_item: return None expires_at, user_id = cache_item if expires_at <= time.time(): _external_user_id_cache.pop(account_id, None) return None return user_id def _cache_external_user_id(account_id: str, user_id: int) -> None: _external_user_id_cache[account_id] = (time.time() + USER_ID_CACHE_TTL, user_id) def verify_local_token(token: str) -> Optional[TokenUserInfo]: """ 验证是否为本地登录生成的 token。 """ token = _normalize_token(token) if not token: return None try: decoded = jwt.decode(token, LOCAL_JWT_SECRET, algorithms=[LOCAL_JWT_ALGORITHM]) except jwt.InvalidTokenError: return None except Exception as e: logger.warning(f"[Token验证] 本地 token 校验异常: {e}") return None if decoded.get("source") != "local" and not ( "user_id" in decoded and "username" in decoded ): return None username = str(decoded.get("username") or decoded.get("account") or "") user_id = int(decoded.get("user_id") or 0) logger.info(f"[Token验证] 识别为本地 token: {username or 'unknown'}") return TokenUserInfo( user_id=user_id, account=str(decoded.get("account") or username), username=username, role=str(decoded.get("role") or "user"), source="local", name=str(decoded.get("name") or username), raw=decoded, ) async def _resolve_external_user_id(account_id: str) -> int: """将外部账号映射到本地 UserData 主键,便于复用现有业务表。""" if not account_id: return 0 cached_user_id = _get_cached_external_user_id(account_id) if cached_user_id is not None: return cached_user_id try: from database import SessionLocal from models.user_data import UserData last_error = None for attempt in range(2): db = SessionLocal() try: user_data = db.query(UserData).filter(UserData.accountID == account_id).first() user_id = int(user_data.id) if user_data else 0 _cache_external_user_id(account_id, user_id) return user_id except OperationalError as e: last_error = e logger.warning( f"[Token验证] 外部用户ID映射查询失败,准备重试: " f"account={account_id}, attempt={attempt + 1}/2, detail={repr(e)}") if attempt == 1: raise finally: db.close() if last_error: raise last_error except Exception as e: logger.warning(f"[Token验证] 外部用户ID映射失败: {e}") return 0 async def verify_external_token(token: str) -> Optional[TokenUserInfo]: """ 调用 4A 验证外部 token。 """ token = _normalize_token(token) if not token: return None cached_user = _get_cached_external_user(token) if cached_user: return cached_user verify_url = getattr(settings.auth, "api_url", "") if not verify_url: logger.warning("[Token验证] 未配置外部 token 验证地址") return None try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post( verify_url, json={"token": token}, headers={"Content-Type": "application/json"}, ) response.raise_for_status() result = response.json() except Exception as e: logger.warning(f"[Token验证] 外部 token 验证失败: {e}") return None if not result.get("valid"): logger.warning("[Token验证] 外部 token 无效或已过期") return None account_id = str(result.get("accountID") or "") username = str(result.get("name") or account_id or "external_user") user_id = await _resolve_external_user_id(account_id) logger.info(f"[Token验证] 识别为外部 token: {account_id or username}") user_info = TokenUserInfo( user_id=user_id, account=account_id, username=username, role=str(result.get("role") or "user"), source="external", name=username, account_id=account_id, user_code=str(result.get("userCode") or ""), contact_number=str(result.get("contactNumber") or ""), raw=result, ) _cache_external_user(token, user_info, result.get("exp")) return user_info async def verify_token(token: str) -> Optional[TokenUserInfo]: """ 统一验证 token,优先走本地 token,再尝试外部 4A token。 """ normalized = _normalize_token(token) if not normalized: return None local_user = verify_local_token(normalized) if local_user: return local_user return await verify_external_token(normalized) def is_local_token(token: str) -> bool: """ 判断是否为本地 token。 """ return verify_local_token(token) is not None