| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- """
- Token 解析与认证工具
- 支持本地登录 token 和 4A 外部 token。
- """
- from dataclasses import dataclass, field
- import time
- from typing import Any, Dict, Optional, Tuple
- import httpx
- import jwt
- from utils.config import settings
- from utils.logger import logger
- LOCAL_JWT_SECRET = "shudao-local-jwt-secret-2024"
- LOCAL_JWT_ALGORITHM = "HS256"
- EXTERNAL_TOKEN_CACHE_TTL = 60
- USER_ID_CACHE_TTL = 300
- _external_token_cache: Dict[str, Tuple[float, "TokenUserInfo"]] = {}
- _external_user_id_cache: Dict[str, Tuple[float, int]] = {}
- @dataclass
- class TokenUserInfo:
- """统一的 request.state.user 对象。"""
- user_id: int
- account: str
- username: str
- role: str = "user"
- source: str = "external"
- name: str = ""
- account_id: str = ""
- user_code: str = ""
- contact_number: str = ""
- raw: Dict[str, Any] = field(default_factory=dict)
- @property
- def id(self) -> int:
- return self.user_id
- @property
- def userCode(self) -> str:
- """兼容旧代码中的 camelCase 字段访问。"""
- return (
- self.user_code
- or self.account_id
- or self.account
- or self.username
- or str(self.user_id)
- )
- @property
- def accountID(self) -> str:
- """兼容旧代码中的 camelCase 字段访问。"""
- return self.account_id or self.account or self.username
- @property
- def contactNumber(self) -> str:
- """兼容旧代码中的 camelCase 字段访问。"""
- return self.contact_number
- def _normalize_token(token: str) -> str:
- token = (token or "").strip()
- if token.lower().startswith("bearer "):
- return token[7:].strip()
- return token
- def _get_cached_external_user(token: str) -> Optional["TokenUserInfo"]:
- cache_item = _external_token_cache.get(token)
- if not cache_item:
- return None
- expires_at, user_info = cache_item
- if expires_at <= time.time():
- _external_token_cache.pop(token, None)
- return None
- return user_info
- def _cache_external_user(token: str, user_info: "TokenUserInfo", exp: Any = None) -> None:
- ttl = EXTERNAL_TOKEN_CACHE_TTL
- try:
- if exp:
- ttl = max(1, min(EXTERNAL_TOKEN_CACHE_TTL, int(exp) - int(time.time())))
- except Exception:
- ttl = EXTERNAL_TOKEN_CACHE_TTL
- _external_token_cache[token] = (time.time() + ttl, user_info)
- def _get_cached_external_user_id(account_id: str) -> Optional[int]:
- cache_item = _external_user_id_cache.get(account_id)
- if not cache_item:
- return None
- expires_at, user_id = cache_item
- if expires_at <= time.time():
- _external_user_id_cache.pop(account_id, None)
- return None
- return user_id
- def _cache_external_user_id(account_id: str, user_id: int) -> None:
- _external_user_id_cache[account_id] = (time.time() + USER_ID_CACHE_TTL, user_id)
- def verify_local_token(token: str) -> Optional[TokenUserInfo]:
- """
- 验证是否为本地登录生成的 token。
- """
- token = _normalize_token(token)
- if not token:
- return None
- try:
- decoded = jwt.decode(token, LOCAL_JWT_SECRET, algorithms=[LOCAL_JWT_ALGORITHM])
- except jwt.InvalidTokenError:
- return None
- except Exception as e:
- logger.warning(f"[Token验证] 本地 token 校验异常: {e}")
- return None
- if decoded.get("source") != "local" and not (
- "user_id" in decoded and "username" in decoded
- ):
- return None
- username = str(decoded.get("username") or decoded.get("account") or "")
- user_id = int(decoded.get("user_id") or 0)
- logger.info(f"[Token验证] 识别为本地 token: {username or 'unknown'}")
- return TokenUserInfo(
- user_id=user_id,
- account=str(decoded.get("account") or username),
- username=username,
- role=str(decoded.get("role") or "user"),
- source="local",
- name=str(decoded.get("name") or username),
- raw=decoded,
- )
- async def _resolve_external_user_id(account_id: str) -> int:
- """将外部账号映射到本地 UserData 主键,便于复用现有业务表。"""
- if not account_id:
- return 0
- cached_user_id = _get_cached_external_user_id(account_id)
- if cached_user_id is not None:
- return cached_user_id
- try:
- from database import SessionLocal
- from models.user_data import UserData
- db = SessionLocal()
- try:
- user_data = db.query(UserData).filter(UserData.accountID == account_id).first()
- user_id = int(user_data.id) if user_data else 0
- _cache_external_user_id(account_id, user_id)
- return user_id
- finally:
- db.close()
- except Exception as e:
- logger.warning(f"[Token验证] 外部用户ID映射失败: {e}")
- return 0
- async def verify_external_token(token: str) -> Optional[TokenUserInfo]:
- """
- 调用 4A 验证外部 token。
- """
- token = _normalize_token(token)
- if not token:
- return None
- cached_user = _get_cached_external_user(token)
- if cached_user:
- return cached_user
- verify_url = getattr(settings.auth, "api_url", "")
- if not verify_url:
- logger.warning("[Token验证] 未配置外部 token 验证地址")
- return None
- try:
- async with httpx.AsyncClient(timeout=10.0) as client:
- response = await client.post(
- verify_url,
- json={"token": token},
- headers={"Content-Type": "application/json"},
- )
- response.raise_for_status()
- result = response.json()
- except Exception as e:
- logger.warning(f"[Token验证] 外部 token 验证失败: {e}")
- return None
- if not result.get("valid"):
- logger.warning("[Token验证] 外部 token 无效或已过期")
- return None
- account_id = str(result.get("accountID") or "")
- username = str(result.get("name") or account_id or "external_user")
- user_id = await _resolve_external_user_id(account_id)
- logger.info(f"[Token验证] 识别为外部 token: {account_id or username}")
- user_info = TokenUserInfo(
- user_id=user_id,
- account=account_id,
- username=username,
- role=str(result.get("role") or "user"),
- source="external",
- name=username,
- account_id=account_id,
- user_code=str(result.get("userCode") or ""),
- contact_number=str(result.get("contactNumber") or ""),
- raw=result,
- )
- _cache_external_user(token, user_info, result.get("exp"))
- return user_info
- async def verify_token(token: str) -> Optional[TokenUserInfo]:
- """
- 统一验证 token,优先走本地 token,再尝试外部 4A token。
- """
- normalized = _normalize_token(token)
- if not normalized:
- return None
- local_user = verify_local_token(normalized)
- if local_user:
- return local_user
- return await verify_external_token(normalized)
- def is_local_token(token: str) -> bool:
- """
- 判断是否为本地 token。
- """
- return verify_local_token(token) is not None
|