| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- from typing import Optional
- from pydantic import BaseModel
- import httpx
- import jwt
- from .config import settings
- from .logger import logger
- # 本地JWT密钥(与auth.py保持一致)
- LOCAL_JWT_SECRET = "shudao-local-jwt-secret-2024"
- class TokenUserInfo(BaseModel):
- user_id: int
- username: str
- account: str
- role: str = "user" # 默认角色为user
- def verify_local_token(token: str) -> Optional[TokenUserInfo]:
- """验证本地JWT Token"""
- try:
- payload = jwt.decode(token, LOCAL_JWT_SECRET, algorithms=["HS256"])
-
- # 检查是否是本地token
- if payload.get("source") != "local":
- return None
-
- logger.info(f"本地Token验证成功: user_id={payload.get('user_id')}, username={payload.get('username')}")
-
- return TokenUserInfo(
- user_id=payload.get("user_id"),
- username=payload.get("username", ""),
- account=payload.get("username", ""), # 本地登录使用username作为account
- role=payload.get("role", "user") # 从JWT中提取角色
- )
- except jwt.ExpiredSignatureError:
- logger.warning("本地Token已过期")
- return None
- except jwt.InvalidTokenError as e:
- logger.debug(f"本地Token验证失败: {e}")
- return None
- async def verify_token(token: str) -> Optional[TokenUserInfo]:
- """验证Token并返回用户信息(支持本地JWT和外部认证)"""
- if not token:
- return None
-
- # 优先尝试本地JWT验证
- local_user = verify_local_token(token)
- if local_user:
- return local_user
-
- # 尝试外部认证服务器验证
- try:
- async with httpx.AsyncClient() as client:
- # 调用 shudao-4Aserver 的验证接口
- response = await client.post(
- settings.auth.api_url,
- json={"token": token},
- timeout=10.0
- )
-
- logger.info(f"Token验证请求: {settings.auth.api_url}")
- logger.info(f"Token验证响应状态: {response.status_code}")
-
- if response.status_code == 200:
- data = response.json()
- logger.info(f"Token验证响应数据: {data}")
-
- # 检查是否有效
- if not data.get("valid", False):
- logger.warning("Token无效")
- return None
-
- # 适配 shudao-4Aserver 的字段名
- return TokenUserInfo(
- user_id=hash(data.get("accountID", "")) % 1000000, # 临时生成数字ID
- username=data.get("name", ""),
- account=data.get("accountID", ""),
- role=data.get("role", "user") # 外部认证的角色
- )
- except Exception as e:
- logger.error(f"外部Token验证失败: {e}")
-
- return None
- async def get_user_info_from_token(token: str) -> Optional[TokenUserInfo]:
- """从Token获取用户信息"""
- return await verify_token(token)
|