""" Token 解析与认证工具 支持本地登录 token 和 4A 外部 token。 """ from dataclasses import dataclass, field import time from typing import Any, Dict, Optional, Tuple import httpx import jwt from utils.config import settings from utils.logger import logger LOCAL_JWT_SECRET = "shudao-local-jwt-secret-2024" LOCAL_JWT_ALGORITHM = "HS256" EXTERNAL_TOKEN_CACHE_TTL = 60 USER_ID_CACHE_TTL = 300 _external_token_cache: Dict[str, Tuple[float, "TokenUserInfo"]] = {} _external_user_id_cache: Dict[str, Tuple[float, int]] = {} @dataclass class TokenUserInfo: """统一的 request.state.user 对象。""" user_id: int account: str username: str role: str = "user" source: str = "external" name: str = "" account_id: str = "" user_code: str = "" contact_number: str = "" raw: Dict[str, Any] = field(default_factory=dict) @property def id(self) -> int: return self.user_id @property def userCode(self) -> str: """兼容旧代码中的 camelCase 字段访问。""" return ( self.user_code or self.account_id or self.account or self.username or str(self.user_id) ) @property def accountID(self) -> str: """兼容旧代码中的 camelCase 字段访问。""" return self.account_id or self.account or self.username @property def contactNumber(self) -> str: """兼容旧代码中的 camelCase 字段访问。""" return self.contact_number def _normalize_token(token: str) -> str: token = (token or "").strip() if token.lower().startswith("bearer "): return token[7:].strip() return token def _get_cached_external_user(token: str) -> Optional["TokenUserInfo"]: cache_item = _external_token_cache.get(token) if not cache_item: return None expires_at, user_info = cache_item if expires_at <= time.time(): _external_token_cache.pop(token, None) return None return user_info def _cache_external_user(token: str, user_info: "TokenUserInfo", exp: Any = None) -> None: ttl = EXTERNAL_TOKEN_CACHE_TTL try: if exp: ttl = max(1, min(EXTERNAL_TOKEN_CACHE_TTL, int(exp) - int(time.time()))) except Exception: ttl = EXTERNAL_TOKEN_CACHE_TTL _external_token_cache[token] = (time.time() + ttl, user_info) def _get_cached_external_user_id(account_id: str) -> Optional[int]: cache_item = _external_user_id_cache.get(account_id) if not cache_item: return None expires_at, user_id = cache_item if expires_at <= time.time(): _external_user_id_cache.pop(account_id, None) return None return user_id def _cache_external_user_id(account_id: str, user_id: int) -> None: _external_user_id_cache[account_id] = (time.time() + USER_ID_CACHE_TTL, user_id) def verify_local_token(token: str) -> Optional[TokenUserInfo]: """ 验证是否为本地登录生成的 token。 """ token = _normalize_token(token) if not token: return None try: decoded = jwt.decode(token, LOCAL_JWT_SECRET, algorithms=[LOCAL_JWT_ALGORITHM]) except jwt.InvalidTokenError: return None except Exception as e: logger.warning(f"[Token验证] 本地 token 校验异常: {e}") return None if decoded.get("source") != "local" and not ( "user_id" in decoded and "username" in decoded ): return None username = str(decoded.get("username") or decoded.get("account") or "") user_id = int(decoded.get("user_id") or 0) logger.info(f"[Token验证] 识别为本地 token: {username or 'unknown'}") return TokenUserInfo( user_id=user_id, account=str(decoded.get("account") or username), username=username, role=str(decoded.get("role") or "user"), source="local", name=str(decoded.get("name") or username), raw=decoded, ) async def _resolve_external_user_id(account_id: str) -> int: """将外部账号映射到本地 UserData 主键,便于复用现有业务表。""" if not account_id: return 0 cached_user_id = _get_cached_external_user_id(account_id) if cached_user_id is not None: return cached_user_id try: from database import SessionLocal from models.user_data import UserData db = SessionLocal() try: user_data = db.query(UserData).filter(UserData.accountID == account_id).first() user_id = int(user_data.id) if user_data else 0 _cache_external_user_id(account_id, user_id) return user_id finally: db.close() except Exception as e: logger.warning(f"[Token验证] 外部用户ID映射失败: {e}") return 0 async def verify_external_token(token: str) -> Optional[TokenUserInfo]: """ 调用 4A 验证外部 token。 """ token = _normalize_token(token) if not token: return None cached_user = _get_cached_external_user(token) if cached_user: return cached_user verify_url = getattr(settings.auth, "api_url", "") if not verify_url: logger.warning("[Token验证] 未配置外部 token 验证地址") return None try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post( verify_url, json={"token": token}, headers={"Content-Type": "application/json"}, ) response.raise_for_status() result = response.json() except Exception as e: logger.warning(f"[Token验证] 外部 token 验证失败: {e}") return None if not result.get("valid"): logger.warning("[Token验证] 外部 token 无效或已过期") return None account_id = str(result.get("accountID") or "") username = str(result.get("name") or account_id or "external_user") user_id = await _resolve_external_user_id(account_id) logger.info(f"[Token验证] 识别为外部 token: {account_id or username}") user_info = TokenUserInfo( user_id=user_id, account=account_id, username=username, role=str(result.get("role") or "user"), source="external", name=username, account_id=account_id, user_code=str(result.get("userCode") or ""), contact_number=str(result.get("contactNumber") or ""), raw=result, ) _cache_external_user(token, user_info, result.get("exp")) return user_info async def verify_token(token: str) -> Optional[TokenUserInfo]: """ 统一验证 token,优先走本地 token,再尝试外部 4A token。 """ normalized = _normalize_token(token) if not normalized: return None local_user = verify_local_token(normalized) if local_user: return local_user return await verify_external_token(normalized) def is_local_token(token: str) -> bool: """ 判断是否为本地 token。 """ return verify_local_token(token) is not None