token.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. """
  2. Token 解析与认证工具
  3. 支持本地登录 token 和 4A 外部 token。
  4. """
  5. from dataclasses import dataclass, field
  6. import time
  7. from typing import Any, Dict, Optional, Tuple
  8. import httpx
  9. import jwt
  10. from sqlalchemy.exc import OperationalError
  11. from utils.config import settings
  12. from utils.logger import logger
  13. LOCAL_JWT_SECRET = "shudao-local-jwt-secret-2024"
  14. LOCAL_JWT_ALGORITHM = "HS256"
  15. EXTERNAL_TOKEN_CACHE_TTL = 60
  16. USER_ID_CACHE_TTL = 300
  17. _external_token_cache: Dict[str, Tuple[float, "TokenUserInfo"]] = {}
  18. _external_user_id_cache: Dict[str, Tuple[float, int]] = {}
  19. @dataclass
  20. class TokenUserInfo:
  21. """统一的 request.state.user 对象。"""
  22. user_id: int
  23. account: str
  24. username: str
  25. role: str = "user"
  26. source: str = "external"
  27. name: str = ""
  28. account_id: str = ""
  29. user_code: str = ""
  30. contact_number: str = ""
  31. raw: Dict[str, Any] = field(default_factory=dict)
  32. @property
  33. def id(self) -> int:
  34. return self.user_id
  35. @property
  36. def userCode(self) -> str:
  37. """兼容旧代码中的 camelCase 字段访问。"""
  38. return (
  39. self.user_code
  40. or self.account_id
  41. or self.account
  42. or self.username
  43. or str(self.user_id)
  44. )
  45. @property
  46. def accountID(self) -> str:
  47. """兼容旧代码中的 camelCase 字段访问。"""
  48. return self.account_id or self.account or self.username
  49. @property
  50. def contactNumber(self) -> str:
  51. """兼容旧代码中的 camelCase 字段访问。"""
  52. return self.contact_number
  53. def _normalize_token(token: str) -> str:
  54. token = (token or "").strip()
  55. if token.lower().startswith("bearer "):
  56. return token[7:].strip()
  57. return token
  58. def _get_cached_external_user(token: str) -> Optional["TokenUserInfo"]:
  59. cache_item = _external_token_cache.get(token)
  60. if not cache_item:
  61. return None
  62. expires_at, user_info = cache_item
  63. if expires_at <= time.time():
  64. _external_token_cache.pop(token, None)
  65. return None
  66. return user_info
  67. def _cache_external_user(token: str, user_info: "TokenUserInfo", exp: Any = None) -> None:
  68. ttl = EXTERNAL_TOKEN_CACHE_TTL
  69. try:
  70. if exp:
  71. ttl = max(1, min(EXTERNAL_TOKEN_CACHE_TTL, int(exp) - int(time.time())))
  72. except Exception:
  73. ttl = EXTERNAL_TOKEN_CACHE_TTL
  74. _external_token_cache[token] = (time.time() + ttl, user_info)
  75. def _get_cached_external_user_id(account_id: str) -> Optional[int]:
  76. cache_item = _external_user_id_cache.get(account_id)
  77. if not cache_item:
  78. return None
  79. expires_at, user_id = cache_item
  80. if expires_at <= time.time():
  81. _external_user_id_cache.pop(account_id, None)
  82. return None
  83. return user_id
  84. def _cache_external_user_id(account_id: str, user_id: int) -> None:
  85. _external_user_id_cache[account_id] = (time.time() + USER_ID_CACHE_TTL, user_id)
  86. def verify_local_token(token: str) -> Optional[TokenUserInfo]:
  87. """
  88. 验证是否为本地登录生成的 token。
  89. """
  90. token = _normalize_token(token)
  91. if not token:
  92. return None
  93. try:
  94. decoded = jwt.decode(token, LOCAL_JWT_SECRET, algorithms=[LOCAL_JWT_ALGORITHM])
  95. except jwt.InvalidTokenError:
  96. return None
  97. except Exception as e:
  98. logger.warning(f"[Token验证] 本地 token 校验异常: {e}")
  99. return None
  100. if decoded.get("source") != "local" and not (
  101. "user_id" in decoded and "username" in decoded
  102. ):
  103. return None
  104. username = str(decoded.get("username") or decoded.get("account") or "")
  105. user_id = int(decoded.get("user_id") or 0)
  106. logger.info(f"[Token验证] 识别为本地 token: {username or 'unknown'}")
  107. return TokenUserInfo(
  108. user_id=user_id,
  109. account=str(decoded.get("account") or username),
  110. username=username,
  111. role=str(decoded.get("role") or "user"),
  112. source="local",
  113. name=str(decoded.get("name") or username),
  114. raw=decoded,
  115. )
  116. async def _resolve_external_user_id(account_id: str) -> int:
  117. """将外部账号映射到本地 UserData 主键,便于复用现有业务表。"""
  118. if not account_id:
  119. return 0
  120. cached_user_id = _get_cached_external_user_id(account_id)
  121. if cached_user_id is not None:
  122. return cached_user_id
  123. try:
  124. from database import SessionLocal
  125. from models.user_data import UserData
  126. last_error = None
  127. for attempt in range(2):
  128. db = SessionLocal()
  129. try:
  130. user_data = db.query(UserData).filter(UserData.accountID == account_id).first()
  131. user_id = int(user_data.id) if user_data else 0
  132. _cache_external_user_id(account_id, user_id)
  133. return user_id
  134. except OperationalError as e:
  135. last_error = e
  136. logger.warning(
  137. f"[Token验证] 外部用户ID映射查询失败,准备重试: "
  138. f"account={account_id}, attempt={attempt + 1}/2, detail={repr(e)}")
  139. if attempt == 1:
  140. raise
  141. finally:
  142. db.close()
  143. if last_error:
  144. raise last_error
  145. except Exception as e:
  146. logger.warning(f"[Token验证] 外部用户ID映射失败: {e}")
  147. return 0
  148. async def verify_external_token(token: str) -> Optional[TokenUserInfo]:
  149. """
  150. 调用 4A 验证外部 token。
  151. """
  152. token = _normalize_token(token)
  153. if not token:
  154. return None
  155. cached_user = _get_cached_external_user(token)
  156. if cached_user:
  157. return cached_user
  158. verify_url = getattr(settings.auth, "api_url", "")
  159. if not verify_url:
  160. logger.warning("[Token验证] 未配置外部 token 验证地址")
  161. return None
  162. try:
  163. async with httpx.AsyncClient(timeout=10.0) as client:
  164. response = await client.post(
  165. verify_url,
  166. json={"token": token},
  167. headers={"Content-Type": "application/json"},
  168. )
  169. response.raise_for_status()
  170. result = response.json()
  171. except Exception as e:
  172. logger.warning(f"[Token验证] 外部 token 验证失败: {e}")
  173. return None
  174. if not result.get("valid"):
  175. logger.warning("[Token验证] 外部 token 无效或已过期")
  176. return None
  177. account_id = str(result.get("accountID") or "")
  178. username = str(result.get("name") or account_id or "external_user")
  179. user_id = await _resolve_external_user_id(account_id)
  180. logger.info(f"[Token验证] 识别为外部 token: {account_id or username}")
  181. user_info = TokenUserInfo(
  182. user_id=user_id,
  183. account=account_id,
  184. username=username,
  185. role=str(result.get("role") or "user"),
  186. source="external",
  187. name=username,
  188. account_id=account_id,
  189. user_code=str(result.get("userCode") or ""),
  190. contact_number=str(result.get("contactNumber") or ""),
  191. raw=result,
  192. )
  193. _cache_external_user(token, user_info, result.get("exp"))
  194. return user_info
  195. async def verify_token(token: str) -> Optional[TokenUserInfo]:
  196. """
  197. 统一验证 token,优先走本地 token,再尝试外部 4A token。
  198. """
  199. normalized = _normalize_token(token)
  200. if not normalized:
  201. return None
  202. local_user = verify_local_token(normalized)
  203. if local_user:
  204. return local_user
  205. return await verify_external_token(normalized)
  206. def is_local_token(token: str) -> bool:
  207. """
  208. 判断是否为本地 token。
  209. """
  210. return verify_local_token(token) is not None