token.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from typing import Optional
  2. from pydantic import BaseModel
  3. import httpx
  4. import jwt
  5. from .config import settings
  6. from .logger import logger
  7. # 本地JWT密钥(与auth.py保持一致)
  8. LOCAL_JWT_SECRET = "shudao-local-jwt-secret-2024"
  9. class TokenUserInfo(BaseModel):
  10. user_id: int
  11. username: str
  12. account: str
  13. role: str = "user" # 默认角色为user
  14. def verify_local_token(token: str) -> Optional[TokenUserInfo]:
  15. """验证本地JWT Token"""
  16. try:
  17. payload = jwt.decode(token, LOCAL_JWT_SECRET, algorithms=["HS256"])
  18. # 检查是否是本地token
  19. if payload.get("source") != "local":
  20. return None
  21. logger.info(f"本地Token验证成功: user_id={payload.get('user_id')}, username={payload.get('username')}")
  22. return TokenUserInfo(
  23. user_id=payload.get("user_id"),
  24. username=payload.get("username", ""),
  25. account=payload.get("username", ""), # 本地登录使用username作为account
  26. role=payload.get("role", "user") # 从JWT中提取角色
  27. )
  28. except jwt.ExpiredSignatureError:
  29. logger.warning("本地Token已过期")
  30. return None
  31. except jwt.InvalidTokenError as e:
  32. logger.debug(f"本地Token验证失败: {e}")
  33. return None
  34. async def verify_token(token: str) -> Optional[TokenUserInfo]:
  35. """验证Token并返回用户信息(支持本地JWT和外部认证)"""
  36. if not token:
  37. return None
  38. # 优先尝试本地JWT验证
  39. local_user = verify_local_token(token)
  40. if local_user:
  41. return local_user
  42. # 尝试外部认证服务器验证
  43. try:
  44. async with httpx.AsyncClient() as client:
  45. # 调用 shudao-4Aserver 的验证接口
  46. response = await client.post(
  47. settings.auth.api_url,
  48. json={"token": token},
  49. timeout=10.0
  50. )
  51. logger.info(f"Token验证请求: {settings.auth.api_url}")
  52. logger.info(f"Token验证响应状态: {response.status_code}")
  53. if response.status_code == 200:
  54. data = response.json()
  55. logger.info(f"Token验证响应数据: {data}")
  56. # 检查是否有效
  57. if not data.get("valid", False):
  58. logger.warning("Token无效")
  59. return None
  60. # 适配 shudao-4Aserver 的字段名
  61. return TokenUserInfo(
  62. user_id=hash(data.get("accountID", "")) % 1000000, # 临时生成数字ID
  63. username=data.get("name", ""),
  64. account=data.get("accountID", ""),
  65. role=data.get("role", "user") # 外部认证的角色
  66. )
  67. except Exception as e:
  68. logger.error(f"外部Token验证失败: {e}")
  69. return None
  70. async def get_user_info_from_token(token: str) -> Optional[TokenUserInfo]:
  71. """从Token获取用户信息"""
  72. return await verify_token(token)