# 在 views/system_view.py 中 import sys import os import uuid import traceback import json import secrets import logging # 添加src目录到Python路径 sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..')) sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..')) """ 系统管理视图路由 包含:用户管理、角色管理、菜单管理、权限管理、系统日志、仪表盘、应用管理 """ from fastapi import APIRouter, Depends, HTTPException, Request, Response from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from typing import Optional from datetime import datetime, timezone from app.services.jwt_token import verify_token from app.services.auth_service import AuthService from app.utils.auth_decorator import get_current_user_with_sliding_expiration from app.schemas.base import ApiResponse from app.utils.security import hash_password, verify_password from app.services.system_service import SystemService from app.services.system_service_ext import SystemServiceExt from app.utils.auth_dependency import get_current_user_with_refresh # 获取logger logger = logging.getLogger(__name__) router = APIRouter(prefix="/system", tags=["系统管理"]) security = HTTPBearer() @router.get("/dashboard") async def get_dashboard( request: Request, credentials: dict = Depends(get_current_user_with_refresh) ): """获取仪表盘数据(支持滑动过期)""" try: # 使用AuthService验证令牌并获取用户信息(支持滑动过期) auth_service = AuthService(db) user, new_token = await auth_service.get_current_user(credentials.credentials) if not user: return ApiResponse( code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 获取仪表盘数据 dashboard_data = { "total_users": 0, "total_apps": 0, "today_logins": 0, "system_status": "正常", "current_user": { "id": user.id, "username": user.username, "email": user.email } } response_data = ApiResponse( code=0, message="获取仪表盘数据成功", data=dashboard_data, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 如果token被刷新,添加提示信息 if new_token: response_data["token_refreshed"] = True response_data["new_token"] = new_token logger.info(f"仪表盘API - Token已刷新,用户: {user.username}") return response_data except Exception as e: logger.error(f"获取仪表盘数据错误: {e}") return ApiResponse( code=500, message=f"获取仪表盘数据失败: {str(e)}", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.get("/info") async def get_system_info_with_sliding_token( user_info: dict = Depends(get_current_user_with_sliding_expiration) ): """获取系统信息(使用滑动过期依赖注入)""" try: user = user_info["user"] new_token = user_info["new_token"] # 获取系统信息 system_info = { "system_name": "LQ后台管理系统", "version": "1.0.0", "current_time": datetime.now(timezone.utc).isoformat(), "current_user": { "id": user.id, "username": user.username, "email": user.email, "is_superuser": user.is_superuser } } response_data = ApiResponse( code=0, message="获取系统信息成功", data=system_info, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 如果token被刷新,添加提示信息 if new_token: response_data["token_refreshed"] = True response_data["new_token"] = new_token logger.info(f"系统信息API - Token已刷新,用户: {user.username}") return response_data except Exception as e: logger.error(f"获取系统信息错误: {e}") return ApiResponse( code=500, message=f"获取系统信息失败: {str(e)}", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.get("/logs") async def get_logs( page: int = 1, page_size: int = 20, log_type: str = "", credentials: dict = Depends(get_current_user_with_refresh) ): """获取系统日志""" return { "code": 0, "message": "获取系统日志成功", "data": { "items": [], "total": 0, "page": page, "page_size": page_size }, "timestamp": datetime.now(timezone.utc).isoformat() } @router.get("/users/profile") async def get_user_profile( request: Request, credentials: dict = Depends(get_current_user_with_refresh) ): """获取用户资料""" try: # 验证令牌 # 验证令牌 payload = credentials if not payload: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") if not user_id: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service = SystemService() user_info = await system_service.get_user_profile(user_id) if not user_info: return ApiResponse( code=200001, message="用户不存在", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() return ApiResponse( code=0, message="获取用户资料成功", data=user_info, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("获取用户资料错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.put("/users/profile") async def update_user_profile( request: Request, profile_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """更新用户资料""" try: # 验证令牌 payload = credentials if not payload: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") # 调用 service 层 system_service = SystemService() success, message = await system_service.update_user_profile(user_id, profile_data) if success: return ApiResponse( code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: return ApiResponse( code=500001, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("更新用户资料错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.put("/users/password") async def change_user_password( request: Request, password_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """修改用户密码""" try: # 验证令牌 payload = credentials if not payload: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") old_password = password_data.get('old_password') new_password = password_data.get('new_password') if not old_password or not new_password: return ApiResponse( code=100001, message="缺少必要参数", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service = SystemService() success, message = await system_service.change_user_password(user_id, old_password, new_password) if success: return ApiResponse( code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: return ApiResponse( code=200001, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("修改密码错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() def hash_password_simple(password): """密码哈希 - 使用bcrypt""" from app.utils.security import hash_password return hash_password(password) def verify_password_simple(plain_password: str, hashed_password: str) -> bool: """ 验证密码 支持两种格式: 1. sha256$salt$hash - 自定义格式 2. bcrypt格式 - 使用bcrypt验证 """ import hashlib try: # 检查是否是自定义sha256格式 if hashed_password.startswith("sha256$"): parts = hashed_password.split("$") if len(parts) == 3: _, salt, stored_hash = parts # 使用相同的盐值计算哈希 computed_hash = hashlib.sha256((plain_password + salt).encode()).hexdigest() return computed_hash == stored_hash # 尝试bcrypt验证 try: from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") return pwd_context.verify(plain_password, hashed_password) except ImportError: # 如果没有passlib,尝试使用bcrypt try: import bcrypt return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8')) except ImportError: pass return False except Exception as e: logger.exception("密码验证错误") return False # RBAC权限管理API @router.get("/user/menus") async def api_get_user_menus( request: Request, credentials: dict = Depends(get_current_user_with_refresh) ): """获取用户菜单""" try: payload = credentials if not payload: return ApiResponse( code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") # 调用 service 层 system_service = SystemService() menu_tree = await system_service.get_user_menus(user_id) return ApiResponse( code=0, message="获取用户菜单成功", data=menu_tree, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("获取用户菜单错误") return ApiResponse( code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.get("/admin/menus") async def api_get_all_menus( request: Request, credentials: dict = Depends(get_current_user_with_refresh), page: int = 1, page_size: int = 1000, # 增大默认页面大小,确保返回所有菜单 keyword: Optional[str] = None, ): """获取所有菜单(管理员)""" try: #payload = verify_token(credentials.credentials) payload = credentials logger.info(f"credentials={credentials}") if not payload: return ApiResponse( code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service = SystemService() menus, total = await system_service.get_all_menus(page, page_size, keyword) return ApiResponse( code=0, message="获取菜单列表成功", data={ "items": menus, "total": total, "page": page, "page_size": page_size }, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("获取菜单列表错误") return ApiResponse( code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.get("/admin/roles") async def api_get_all_roles( request: Request, credentials: dict = Depends(get_current_user_with_refresh), page: int = 1, page_size: int = 20, keyword: Optional[str] = None, ): """获取所有角色""" try: #payload = verify_token(credentials.credentials) payload = credentials logger.info(f"credentials={credentials}") if not payload: return ApiResponse( code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service = SystemService() roles, total = await system_service.get_all_roles(page , page_size , keyword) return ApiResponse( code=0, message="获取角色列表成功", data={ "items": roles, "total": total, "page": page, "page_size": page_size }, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("获取角色列表错误") return ApiResponse( code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.get("/user/permissions") async def api_get_user_permissions(credentials: dict = Depends(get_current_user_with_refresh)): """获取用户权限""" try: payload = credentials if not payload: return ApiResponse( code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") # 调用 service 层 system_service = SystemService() permissions = await system_service.get_user_permissions(user_id) return ApiResponse( code=0, message="获取用户权限成功", data=permissions, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("获取用户权限错误") return ApiResponse( code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 用户管理API @router.get("/admin/users") async def get_users( request: Request, credentials: dict = Depends(get_current_user_with_refresh), page: int = 1, page_size: int = 20, keyword: Optional[str] = None ): """获取用户列表""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() # 调用 service 层 system_service_ext = SystemServiceExt() users, total = await system_service_ext.get_users(page, page_size, keyword) return ApiResponse( code=0, message="获取用户列表成功", data={"items": users, "total": total, "page": page, "page_size": page_size}, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("获取用户列表错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() @router.post("/admin/users") async def create_user( request: Request, user_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """创建用户""" try: #payload = verify_token(credentials.credentials) payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() is_superuser = payload.get("is_superuser", False) if not is_superuser: return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() creator_id = payload.get("sub") # 创建密码哈希 password_hash = hash_password_simple(user_data['password']) # 调用 service 层 system_service_ext = SystemServiceExt() success, message = await system_service_ext.create_user(user_data, password_hash, creator_id) if success: return ApiResponse(code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() else: return ApiResponse(code=400, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("创建用户错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() @router.get("/admin/users/{user_id}") async def get_user_detail( user_id: str, credentials: dict = Depends(get_current_user_with_refresh) ): """获取用户详情""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() # 调用 service 层获取用户详情 system_service_ext = SystemServiceExt() user_detail = await system_service_ext.get_user_detail(user_id) if user_detail: return ApiResponse(code=0, data=user_detail, message="Success", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() else: return ApiResponse(code=404, message="用户不存在", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("获取用户详情错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() @router.put("/admin/users/{user_id}") async def update_user( user_id: str, user_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """更新用户""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() updater_id = payload.get("sub") # 只有当密码不为空时才进行哈希处理 if user_data.get('password'): password_hash = hash_password_simple(user_data['password']) user_data['password'] = password_hash # 调用 service 层 system_service_ext = SystemServiceExt() success, message = await system_service_ext.update_user(user_id, user_data, updater_id) if success: return ApiResponse(code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() else: return ApiResponse(code=400, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("更新用户错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() @router.delete("/admin/users/{user_id}") async def delete_user( user_id: str, credentials: dict = Depends(get_current_user_with_refresh) ): """删除用户""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() is_superuser = payload.get("is_superuser", False) if not is_superuser: return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() current_user_id = payload.get("sub") # 不能删除自己 if user_id == current_user_id: return ApiResponse(code=400, message="不能删除自己", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() # 调用 service 层 system_service_ext = SystemServiceExt() success, message = await system_service_ext.delete_user(user_id, current_user_id) if success: return ApiResponse(code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() else: return ApiResponse(code=400, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("删除用户错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() # 角色管理API @router.post("/admin/roles") async def create_role( role_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """创建角色""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() current_user_id = payload.get("sub") # 调用 service 层 system_service = SystemService() success, message = await system_service.create_role(role_data , current_user_id) if success: return ApiResponse(code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() else: return ApiResponse(code=400, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("创建角色错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() @router.put("/admin/roles/{role_id}") async def update_role( role_id: str, role_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """更新角色""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() is_superuser = payload.get("is_superuser", False) if not is_superuser: return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() user_id = payload.get("sub") # 调用 service 层 system_service = SystemService() success, message = await system_service.update_role(role_id, role_data , user_id) if success: return ApiResponse(code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() else: code = 404 if "不存在" in message else 400 return ApiResponse(code=code, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("更新角色错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() @router.delete("/admin/roles/{role_id}") async def delete_role( role_id: str, credentials: dict = Depends(get_current_user_with_refresh) ): """删除角色""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() is_superuser = payload.get("is_superuser", False) if not is_superuser: return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() # 调用 service 层 system_service = SystemService() success, message = await system_service.delete_role(role_id) if success: return ApiResponse(code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() else: code = 404 if "不存在" in message else 400 return ApiResponse(code=code, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("删除角色错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() # 角色菜单权限管理API @router.get("/admin/roles/{role_id}/menus") async def get_role_menus( role_id: str, credentials: dict = Depends(get_current_user_with_refresh) ): """获取角色的菜单权限""" try: payload = credentials if not payload: return ApiResponse( code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service = SystemService() success, data, message = await system_service.get_role_menus(role_id) if success: return ApiResponse( code=0, message=message, data=data, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: code = 404 if "不存在" in message else 500 return ApiResponse( code=code, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("获取角色菜单权限错误") return ApiResponse( code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.put("/admin/roles/{role_id}/menus") async def update_role_menus( role_id: str, request: Request, credentials: dict = Depends(get_current_user_with_refresh) ): """更新角色的菜单权限""" try: payload = credentials if not payload: return ApiResponse( code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 获取请求数据 body = await request.json() menu_ids = body.get("menu_ids", []) if not isinstance(menu_ids, list): return ApiResponse( code=400, message="菜单ID列表格式错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service = SystemService() updater_id = payload.get("sub") success, data, message = await system_service.update_role_menus(role_id, menu_ids, updater_id) if success: return ApiResponse( code=0, message=message, data=data, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: code = 404 if "不存在" in message else 400 return ApiResponse( code=code, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("更新角色菜单权限错误") return ApiResponse( code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 菜单管理API @router.post("/admin/menus") async def create_menu( request: Request, credentials: dict = Depends(get_current_user_with_refresh) ): """创建菜单""" try: payload = credentials if not payload: return ApiResponse( code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 获取请求数据 body = await request.json() # 验证必填字段 required_fields = ['title', 'name', 'menu_type'] for field in required_fields: if field not in body or not body[field]: return ApiResponse( code=400, message=f"缺少必填字段: {field}", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service = SystemService() creator_id = payload.get("sub") success, message = await system_service.create_menu(body, creator_id) if success: return ApiResponse( code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: return ApiResponse( code=400, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("创建菜单错误") return ApiResponse( code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.put("/admin/menus/{menu_id}") async def update_menu( menu_id: str, request: Request, credentials: dict = Depends(get_current_user_with_refresh) ): """更新菜单""" try: payload = credentials if not payload: return ApiResponse( code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 获取请求数据 body = await request.json() # 调用 service 层 system_service = SystemService() updater_id = payload.get("sub") success, message = await system_service.update_menu(menu_id, body, updater_id) if success: return ApiResponse( code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: return ApiResponse( code=400, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("更新菜单错误") return ApiResponse( code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.delete("/admin/menus/{menu_id}") async def delete_menu( menu_id: str, credentials: dict = Depends(get_current_user_with_refresh) ): """删除菜单""" try: payload = credentials if not payload: return ApiResponse( code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service = SystemService() success, message = await system_service.delete_menu(menu_id) if success: return ApiResponse( code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: return ApiResponse( code=400, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("删除菜单错误") return ApiResponse( code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.post("/admin/menus") async def create_menu( menu_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """创建菜单""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() is_superuser = payload.get("is_superuser", False) if not is_superuser: return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() user_id = payload.get("sub") # 调用 service 层 system_service = SystemService() success, message = await system_service.create_menu(menu_data , user_id) if success: return ApiResponse(code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() else: return ApiResponse(code=400, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("创建菜单错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() @router.put("/admin/menus/{menu_id}") async def update_menu( menu_id: str, menu_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """更新菜单""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() is_superuser = payload.get("is_superuser", False) if not is_superuser: return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() user_id = payload.get("sub") # 调用 service 层 system_service = SystemService() success, message = await system_service.update_menu(menu_id, menu_data , user_id) if success: return ApiResponse(code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() else: return ApiResponse(code=400, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("更新菜单错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() @router.delete("/admin/menus/{menu_id}") async def delete_menu( menu_id: str, credentials: dict = Depends(get_current_user_with_refresh) ): """删除菜单""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() is_superuser = payload.get("is_superuser", False) if not is_superuser: return ApiResponse(code=403, message="权限不足", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() # 调用 service 层 system_service = SystemService() success, message = await system_service.delete_menu(menu_id) if success: return ApiResponse(code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() else: return ApiResponse(code=400, message=message, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("删除菜单错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() # 获取所有角色(用于下拉选择) @router.get("/roles/all") async def get_all_roles_simple(credentials: dict = Depends(get_current_user_with_refresh)): """获取所有角色(简化版,用于下拉选择)""" try: payload = credentials if not payload: return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() # 调用 service 层 system_service = SystemService() roles = await system_service.get_all_roles_simple() return ApiResponse(code=0, message="获取角色列表成功", data=roles, timestamp=datetime.now(timezone.utc).isoformat()).model_dump() except Exception as e: logger.exception("获取角色列表错误") return ApiResponse(code=500, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat()).model_dump() @router.get("/apps") async def get_apps( page: int = 1, page_size: int = 20, keyword: str = "", status: str = "", credentials: dict = Depends(get_current_user_with_refresh) ): """获取应用列表""" try: # 验证令牌 payload = credentials if not payload: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") # 调用 service 层检查用户角色 system_service_ext = SystemServiceExt() is_app_manager = await system_service_ext.check_user_app_manager_role(user_id) # 调用 service 层获取应用列表 apps, total = await system_service_ext.get_apps(page, page_size, user_id, is_app_manager, keyword, status) return ApiResponse( code=0, message="获取应用列表成功", data={ "items": apps, "total": total, "page": page, "page_size": page_size }, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("获取应用列表错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.get("/apps/{app_id}") async def get_app_detail( app_id: str, credentials: dict = Depends(get_current_user_with_refresh) ): """获取应用详情(包含密钥)""" try: # 验证令牌 payload = credentials if not payload: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") # 调用 service 层 system_service_ext = SystemServiceExt() app_detail = await system_service_ext.get_app_detail(app_id, user_id) if not app_detail: return ApiResponse( code=200001, message="应用不存在或无权限", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() return ApiResponse( code=0, message="获取应用详情成功", data=app_detail, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("获取应用详情错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.post("/apps") async def create_app( request: Request, app_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """创建应用""" try: # 验证令牌 payload = credentials if not payload: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") # 验证必要字段 if not app_data.get('name') or not app_data.get('redirect_uris'): return ApiResponse( code=100001, message="缺少必要参数", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service_ext = SystemServiceExt() success, message, app_info = await system_service_ext.create_app(app_data, user_id) if success: return ApiResponse( code=0, message=message, data=app_info, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: return ApiResponse( code=500001, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("创建应用错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.put("/apps/{app_id}/status") async def toggle_app_status( app_id: str, status_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """切换应用状态""" try: # 验证令牌 payload = credentials if not payload: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") is_active = status_data.get('is_active') if is_active is None: return ApiResponse( code=100001, message="缺少必要参数", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service_ext = SystemServiceExt() success, message = await system_service_ext.toggle_app_status(app_id, is_active, user_id) if success: return ApiResponse( code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: return ApiResponse( code=200001, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("切换应用状态错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.put("/apps/{app_id}") async def update_app( app_id: str, app_data: dict, credentials: dict = Depends(get_current_user_with_refresh) ): """更新应用信息""" try: # 验证令牌 payload = credentials if not payload: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") # 验证必要参数 name = app_data.get('name', '').strip() if not name: return ApiResponse( code=100001, message="应用名称不能为空", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() # 调用 service 层 system_service_ext = SystemServiceExt() success, message, app_result = await system_service_ext.update_app(app_id, app_data, user_id) if success: return ApiResponse( code=0, message=message, data=app_result, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: return ApiResponse( code=200001, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("更新应用错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.delete("/apps/{app_id}") async def delete_app( app_id: str, credentials: dict = Depends(get_current_user_with_refresh) ): """删除应用""" try: # 验证令牌 payload = credentials if not payload: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") # 调用 service 层 system_service_ext = SystemServiceExt() success, message = await system_service_ext.delete_app_by_id(app_id, user_id) if success: return ApiResponse( code=0, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: return ApiResponse( code=200001, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("删除应用错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("删除应用错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() @router.post("/apps/{app_id}/reset-secret") async def reset_app_secret( app_id: str, credentials: dict = Depends(get_current_user_with_refresh) ): """重置应用密钥""" try: # 验证令牌 payload = credentials if not payload: return ApiResponse( code=200002, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() user_id = payload.get("sub") # 调用 service 层 system_service_ext = SystemServiceExt() success, message, new_secret = await system_service_ext.reset_app_secret(app_id, user_id) if success: return ApiResponse( code=0, message=message, data={"app_secret": new_secret}, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() else: return ApiResponse( code=200001, message=message, timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() except Exception as e: logger.exception("重置应用密钥错误") return ApiResponse( code=500001, message="服务器内部错误", timestamp=datetime.now(timezone.utc).isoformat() ).model_dump() def generate_random_string(length=32): """生成随机字符串""" import secrets import string alphabet = string.ascii_letters + string.digits return ''.join(secrets.choice(alphabet) for _ in range(length))