token.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. """
  2. Token 解析与认证工具
  3. 支持本地登录 token 和 4A 外部 token。
  4. """
  5. from dataclasses import dataclass, field
  6. import time
  7. from typing import Any, Dict, Optional, Tuple
  8. import httpx
  9. import jwt
  10. from utils.config import settings
  11. from utils.logger import logger
  12. LOCAL_JWT_SECRET = "shudao-local-jwt-secret-2024"
  13. LOCAL_JWT_ALGORITHM = "HS256"
  14. EXTERNAL_TOKEN_CACHE_TTL = 60
  15. USER_ID_CACHE_TTL = 300
  16. _external_token_cache: Dict[str, Tuple[float, "TokenUserInfo"]] = {}
  17. _external_user_id_cache: Dict[str, Tuple[float, int]] = {}
  18. @dataclass
  19. class TokenUserInfo:
  20. """统一的 request.state.user 对象。"""
  21. user_id: int
  22. account: str
  23. username: str
  24. role: str = "user"
  25. source: str = "external"
  26. name: str = ""
  27. account_id: str = ""
  28. user_code: str = ""
  29. contact_number: str = ""
  30. raw: Dict[str, Any] = field(default_factory=dict)
  31. @property
  32. def id(self) -> int:
  33. return self.user_id
  34. @property
  35. def userCode(self) -> str:
  36. """兼容旧代码中的 camelCase 字段访问。"""
  37. return (
  38. self.user_code
  39. or self.account_id
  40. or self.account
  41. or self.username
  42. or str(self.user_id)
  43. )
  44. @property
  45. def accountID(self) -> str:
  46. """兼容旧代码中的 camelCase 字段访问。"""
  47. return self.account_id or self.account or self.username
  48. @property
  49. def contactNumber(self) -> str:
  50. """兼容旧代码中的 camelCase 字段访问。"""
  51. return self.contact_number
  52. def _normalize_token(token: str) -> str:
  53. token = (token or "").strip()
  54. if token.lower().startswith("bearer "):
  55. return token[7:].strip()
  56. return token
  57. def _get_cached_external_user(token: str) -> Optional["TokenUserInfo"]:
  58. cache_item = _external_token_cache.get(token)
  59. if not cache_item:
  60. return None
  61. expires_at, user_info = cache_item
  62. if expires_at <= time.time():
  63. _external_token_cache.pop(token, None)
  64. return None
  65. return user_info
  66. def _cache_external_user(token: str, user_info: "TokenUserInfo", exp: Any = None) -> None:
  67. ttl = EXTERNAL_TOKEN_CACHE_TTL
  68. try:
  69. if exp:
  70. ttl = max(1, min(EXTERNAL_TOKEN_CACHE_TTL, int(exp) - int(time.time())))
  71. except Exception:
  72. ttl = EXTERNAL_TOKEN_CACHE_TTL
  73. _external_token_cache[token] = (time.time() + ttl, user_info)
  74. def _get_cached_external_user_id(account_id: str) -> Optional[int]:
  75. cache_item = _external_user_id_cache.get(account_id)
  76. if not cache_item:
  77. return None
  78. expires_at, user_id = cache_item
  79. if expires_at <= time.time():
  80. _external_user_id_cache.pop(account_id, None)
  81. return None
  82. return user_id
  83. def _cache_external_user_id(account_id: str, user_id: int) -> None:
  84. _external_user_id_cache[account_id] = (time.time() + USER_ID_CACHE_TTL, user_id)
  85. def verify_local_token(token: str) -> Optional[TokenUserInfo]:
  86. """
  87. 验证是否为本地登录生成的 token。
  88. """
  89. token = _normalize_token(token)
  90. if not token:
  91. return None
  92. try:
  93. decoded = jwt.decode(token, LOCAL_JWT_SECRET, algorithms=[LOCAL_JWT_ALGORITHM])
  94. except jwt.InvalidTokenError:
  95. return None
  96. except Exception as e:
  97. logger.warning(f"[Token验证] 本地 token 校验异常: {e}")
  98. return None
  99. if decoded.get("source") != "local" and not (
  100. "user_id" in decoded and "username" in decoded
  101. ):
  102. return None
  103. username = str(decoded.get("username") or decoded.get("account") or "")
  104. user_id = int(decoded.get("user_id") or 0)
  105. logger.info(f"[Token验证] 识别为本地 token: {username or 'unknown'}")
  106. return TokenUserInfo(
  107. user_id=user_id,
  108. account=str(decoded.get("account") or username),
  109. username=username,
  110. role=str(decoded.get("role") or "user"),
  111. source="local",
  112. name=str(decoded.get("name") or username),
  113. raw=decoded,
  114. )
  115. async def _resolve_external_user_id(account_id: str) -> int:
  116. """将外部账号映射到本地 UserData 主键,便于复用现有业务表。"""
  117. if not account_id:
  118. return 0
  119. cached_user_id = _get_cached_external_user_id(account_id)
  120. if cached_user_id is not None:
  121. return cached_user_id
  122. try:
  123. from database import SessionLocal
  124. from models.user_data import UserData
  125. db = SessionLocal()
  126. try:
  127. user_data = db.query(UserData).filter(UserData.accountID == account_id).first()
  128. user_id = int(user_data.id) if user_data else 0
  129. _cache_external_user_id(account_id, user_id)
  130. return user_id
  131. finally:
  132. db.close()
  133. except Exception as e:
  134. logger.warning(f"[Token验证] 外部用户ID映射失败: {e}")
  135. return 0
  136. async def verify_external_token(token: str) -> Optional[TokenUserInfo]:
  137. """
  138. 调用 4A 验证外部 token。
  139. """
  140. token = _normalize_token(token)
  141. if not token:
  142. return None
  143. cached_user = _get_cached_external_user(token)
  144. if cached_user:
  145. return cached_user
  146. verify_url = getattr(settings.auth, "api_url", "")
  147. if not verify_url:
  148. logger.warning("[Token验证] 未配置外部 token 验证地址")
  149. return None
  150. try:
  151. async with httpx.AsyncClient(timeout=10.0) as client:
  152. response = await client.post(
  153. verify_url,
  154. json={"token": token},
  155. headers={"Content-Type": "application/json"},
  156. )
  157. response.raise_for_status()
  158. result = response.json()
  159. except Exception as e:
  160. logger.warning(f"[Token验证] 外部 token 验证失败: {e}")
  161. return None
  162. if not result.get("valid"):
  163. logger.warning("[Token验证] 外部 token 无效或已过期")
  164. return None
  165. account_id = str(result.get("accountID") or "")
  166. username = str(result.get("name") or account_id or "external_user")
  167. user_id = await _resolve_external_user_id(account_id)
  168. logger.info(f"[Token验证] 识别为外部 token: {account_id or username}")
  169. user_info = TokenUserInfo(
  170. user_id=user_id,
  171. account=account_id,
  172. username=username,
  173. role=str(result.get("role") or "user"),
  174. source="external",
  175. name=username,
  176. account_id=account_id,
  177. user_code=str(result.get("userCode") or ""),
  178. contact_number=str(result.get("contactNumber") or ""),
  179. raw=result,
  180. )
  181. _cache_external_user(token, user_info, result.get("exp"))
  182. return user_info
  183. async def verify_token(token: str) -> Optional[TokenUserInfo]:
  184. """
  185. 统一验证 token,优先走本地 token,再尝试外部 4A token。
  186. """
  187. normalized = _normalize_token(token)
  188. if not normalized:
  189. return None
  190. local_user = verify_local_token(normalized)
  191. if local_user:
  192. return local_user
  193. return await verify_external_token(normalized)
  194. def is_local_token(token: str) -> bool:
  195. """
  196. 判断是否为本地 token。
  197. """
  198. return verify_local_token(token) is not None