from fastapi import APIRouter, Request from fastapi.responses import JSONResponse from database import SessionLocal from models.total import User from utils.logger import logger import bcrypt import jwt import time router = APIRouter() # 本地JWT密钥(与Go版本保持一致,可配置到config.yaml) LOCAL_JWT_SECRET = "shudao-local-jwt-secret-2024" LOCAL_JWT_EXPIRE = 24 * 3600 # 24小时 def generate_local_token(user_id: int, username: str, role: str) -> str: """生成本地JWT Token""" payload = { "user_id": user_id, "username": username, "role": role, "source": "local", "exp": int(time.time()) + LOCAL_JWT_EXPIRE, "iat": int(time.time()), } return jwt.encode(payload, LOCAL_JWT_SECRET, algorithm="HS256") @router.post("/local_login") async def local_login(request: Request): """ 本地用户登录接口 - 支持 account + password 方式 - 返回 JWT token - 路径: POST /apiv1/auth/local_login (与Go版本对齐) """ try: body = await request.json() except Exception: return JSONResponse(content={"statusCode": 400, "msg": "请求参数解析失败"}) username = body.get("username", "").strip() password = body.get("password", "") if not username or not password: return JSONResponse(content={"statusCode": 400, "msg": "用户名和密码不能为空"}) logger.info(f"[用户登录] 用户 {username} 尝试登录") db = SessionLocal() try: user = db.query(User).filter( User.username == username, User.is_deleted == 0 ).first() if not user: logger.warning(f"[用户登录] 用户不存在: {username}") return JSONResponse(content={"statusCode": 401, "msg": "用户名或密码错误"}) if user.status != 1: logger.warning(f"[用户登录] 用户已被禁用: {username}") return JSONResponse(content={"statusCode": 403, "msg": "用户已被禁用"}) # 验证bcrypt密码 try: if not bcrypt.checkpw(password.encode("utf-8"), user.password.encode("utf-8")): logger.warning(f"[用户登录] 密码错误: {username}") return JSONResponse(content={"statusCode": 401, "msg": "用户名或密码错误"}) except Exception: logger.warning(f"[用户登录] 密码验证异常: {username}") return JSONResponse(content={"statusCode": 401, "msg": "用户名或密码错误"}) token = generate_local_token(user.id, user.username, user.role or "user") logger.info(f"[用户登录] 用户 {username} 登录成功") return JSONResponse(content={ "statusCode": 200, "msg": "登录成功", "token": token, "userInfo": { "id": user.id, "username": user.username, "nickname": user.nickname or "", "role": user.role or "user", "email": user.email or "" } }) except Exception as e: logger.error(f"[用户登录] 异常: {e}") return JSONResponse(content={"statusCode": 500, "msg": f"登录失败: {str(e)}"}) finally: db.close() @router.post("/register") async def register(request: Request): """ 用户注册接口 - 创建新用户账号 - 使用 bcrypt 加密密码 - 支持 account 和 username 两个字段名(向后兼容) """ try: body = await request.json() except Exception: return JSONResponse(content={"statusCode": 400, "msg": "请求参数解析失败"}) # 同时支持 account 和 username 字段 account = body.get("account") or body.get("username", "") account = account.strip() if account else "" password = body.get("password", "") name = body.get("name", "").strip() if not account or not password: return JSONResponse(content={"statusCode": 400, "msg": "账号和密码不能为空"}) logger.info(f"[用户注册] 账号 {account} 尝试注册") db = SessionLocal() try: # 检查账号是否已存在 existing_user = db.query(User).filter( User.username == account, User.is_deleted == 0 ).first() if existing_user: logger.warning(f"[用户注册] 账号已存在: {account}") return JSONResponse(content={"statusCode": 400, "msg": "账号已存在"}) # 加密密码 hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") # 创建新用户 new_user = User( username=account, password=hashed_password, nickname=name or account, role="user", status=1, is_deleted=0 ) db.add(new_user) db.commit() db.refresh(new_user) logger.info(f"[用户注册] 用户 {account} 注册成功") return JSONResponse(content={ "statusCode": 200, "msg": "注册成功", "data": { "user_id": new_user.id, "account": new_user.username } }) except Exception as e: db.rollback() logger.error(f"[用户注册] 异常: {e}") return JSONResponse(content={"statusCode": 500, "msg": f"注册失败: {str(e)}"}) finally: db.close() @router.get("/user/info") async def get_user_info(request: Request): """ 获取用户信息接口 - 需要认证 - 返回当前登录用户的详细信息 """ user_info = request.state.user if not user_info: return JSONResponse(status_code=401, content={"statusCode": 401, "msg": "未认证"}) db = SessionLocal() try: # 优先查询本地用户表 user = db.query(User).filter( User.id == user_info.user_id, User.is_deleted == 0 ).first() if user: # 本地用户 logger.info(f"[获取用户信息] 本地用户: {user.username}") return JSONResponse(content={ "statusCode": 200, "msg": "success", "data": { "user_id": user.id, "account": user.username, "name": user.nickname or user.username, "points": 0, # 本地用户默认积分为0 "created_at": user.created_at or 0 } }) # 查询外部用户数据 from models.user_data import UserData user_data = db.query(UserData).filter( UserData.accountID == user_info.account ).first() if not user_data: logger.warning(f"[获取用户信息] 用户数据不存在: {user_info.account}") return JSONResponse(content={"statusCode": 404, "msg": "用户数据不存在"}) logger.info(f"[获取用户信息] 外部用户: {user_info.account}") return JSONResponse(content={ "statusCode": 200, "msg": "success", "data": { "user_id": user_data.id, "account": user_data.accountID, "name": user_data.name or "", "points": user_data.points or 0, "created_at": user_data.created_at or 0 } }) except Exception as e: logger.error(f"[获取用户信息] 异常: {e}") return JSONResponse(content={"statusCode": 500, "msg": f"获取用户信息失败: {str(e)}"}) finally: db.close()