from typing import Optional from pydantic import BaseModel import httpx import jwt from .config import settings from .logger import logger # 本地JWT密钥(与auth.py保持一致) LOCAL_JWT_SECRET = "shudao-local-jwt-secret-2024" class TokenUserInfo(BaseModel): user_id: int username: str account: str role: str = "user" # 默认角色为user def verify_local_token(token: str) -> Optional[TokenUserInfo]: """验证本地JWT Token""" try: payload = jwt.decode(token, LOCAL_JWT_SECRET, algorithms=["HS256"]) # 检查是否是本地token if payload.get("source") != "local": return None logger.info(f"本地Token验证成功: user_id={payload.get('user_id')}, username={payload.get('username')}") return TokenUserInfo( user_id=payload.get("user_id"), username=payload.get("username", ""), account=payload.get("username", ""), # 本地登录使用username作为account role=payload.get("role", "user") # 从JWT中提取角色 ) except jwt.ExpiredSignatureError: logger.warning("本地Token已过期") return None except jwt.InvalidTokenError as e: logger.debug(f"本地Token验证失败: {e}") return None async def verify_token(token: str) -> Optional[TokenUserInfo]: """验证Token并返回用户信息(支持本地JWT和外部认证)""" if not token: return None # 优先尝试本地JWT验证 local_user = verify_local_token(token) if local_user: return local_user # 尝试外部认证服务器验证 try: async with httpx.AsyncClient() as client: # 调用 shudao-4Aserver 的验证接口 response = await client.post( settings.auth.api_url, json={"token": token}, timeout=10.0 ) logger.info(f"Token验证请求: {settings.auth.api_url}") logger.info(f"Token验证响应状态: {response.status_code}") if response.status_code == 200: data = response.json() logger.info(f"Token验证响应数据: {data}") # 检查是否有效 if not data.get("valid", False): logger.warning("Token无效") return None # 适配 shudao-4Aserver 的字段名 return TokenUserInfo( user_id=hash(data.get("accountID", "")) % 1000000, # 临时生成数字ID username=data.get("name", ""), account=data.get("accountID", ""), role=data.get("role", "user") # 外部认证的角色 ) except Exception as e: logger.error(f"外部Token验证失败: {e}") return None async def get_user_info_from_token(token: str) -> Optional[TokenUserInfo]: """从Token获取用户信息""" return await verify_token(token)