""" 系统管理服务层 从 system_view.py 提取的SQL查询逻辑 """ import logging import uuid import json import secrets from typing import Optional, List, Dict, Any, Tuple from datetime import datetime, timezone from app.base.async_mysql_connection import get_db_connection logger = logging.getLogger(__name__) def generate_random_string(length: int) -> str: """生成随机字符串""" return secrets.token_urlsafe(length)[:length] class SystemService: """系统管理服务类 - 使用 SQL 查询方式""" def __init__(self): """初始化服务""" pass # ==================== 用户资料管理 ==================== async def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]: """获取用户资料""" conn = get_db_connection() if not conn: return None cursor = conn.cursor() try: # 查找用户详细信息 cursor.execute(""" SELECT u.id, u.username, u.email, u.phone, u.avatar_url, u.is_active, u.is_superuser, u.last_login_at, u.created_at, u.updated_at, p.real_name, p.company, p.department, p.position FROM users u LEFT JOIN user_profiles p ON u.id = p.user_id WHERE u.id = %s """, (user_id,)) user_data = cursor.fetchone() if not user_data: return None # 获取用户角色 cursor.execute(""" SELECT r.name FROM user_roles ur JOIN roles r ON ur.role_id = r.id WHERE ur.user_id = %s AND ur.is_active = 1 """, (user_id,)) roles = [row['name'] for row in cursor.fetchall()] # 构建用户信息 user_info = { "id": user_data["id"], "username": user_data["username"], "email": user_data["email"], "phone": user_data["phone"], "avatar_url": user_data["avatar_url"], "is_active": user_data["is_active"], "is_superuser": user_data["is_superuser"], "last_login_at": user_data["last_login_at"].isoformat() if user_data["last_login_at"] else None, "created_at": user_data["created_at"].isoformat() if user_data["created_at"] else None, "updated_at": user_data["updated_at"].isoformat() if user_data["updated_at"] else None, "real_name": user_data["real_name"], "company": user_data["company"], "department": user_data["department"], "position": user_data["position"], "roles": roles } return user_info finally: cursor.close() conn.close() async def update_user_profile(self, user_id: str, profile_data: Dict[str, Any]) -> bool: """更新用户资料""" conn = get_db_connection() if not conn: return False cursor = conn.cursor() try: # 更新用户基本信息 update_fields = [] update_values = [] if 'email' in profile_data: update_fields.append('email = %s') update_values.append(profile_data['email']) if 'phone' in profile_data: update_fields.append('phone = %s') update_values.append(profile_data['phone']) if update_fields: update_values.append(user_id) cursor.execute(f""" UPDATE users SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s """, update_values) # 更新或插入用户详情 profile_fields = ['real_name', 'company', 'department', 'position'] profile_updates = {k: v for k, v in profile_data.items() if k in profile_fields} if profile_updates: # 检查是否已有记录 cursor.execute("SELECT id FROM user_profiles WHERE user_id = %s", (user_id,)) profile_exists = cursor.fetchone() if profile_exists: # 更新现有记录 update_fields = [] update_values = [] for field, value in profile_updates.items(): update_fields.append(f'{field} = %s') update_values.append(value) update_values.append(user_id) cursor.execute(f""" UPDATE user_profiles SET {', '.join(update_fields)}, updated_at = NOW() WHERE user_id = %s """, update_values) else: # 插入新记录 fields = ['user_id'] + list(profile_updates.keys()) values = [user_id] + list(profile_updates.values()) placeholders = ', '.join(['%s'] * len(values)) cursor.execute(f""" INSERT INTO user_profiles ({', '.join(fields)}, created_at, updated_at) VALUES ({placeholders}, NOW(), NOW()) """, values) conn.commit() return True except Exception as e: logger.exception("更新用户资料错误") conn.rollback() return False finally: cursor.close() conn.close() async def verify_and_update_password(self, user_id: str, old_password_hash: str, new_password_hash: str) -> Tuple[bool, str]: """验证旧密码并更新为新密码""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 验证当前密码 cursor.execute("SELECT password_hash FROM users WHERE id = %s", (user_id,)) result = cursor.fetchone() if not result: return False, "用户不存在" if result["password_hash"] != old_password_hash: return False, "当前密码错误" # 更新密码 cursor.execute(""" UPDATE users SET password_hash = %s, updated_at = NOW() WHERE id = %s """, (new_password_hash, user_id)) conn.commit() return True, "密码修改成功" except Exception as e: logger.exception("修改密码错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() # ==================== 菜单管理 ==================== async def get_user_menus(self, user_id: str) -> List[Dict[str, Any]]: """获取用户菜单""" conn = get_db_connection() if not conn: return [] cursor = conn.cursor() try: # 检查用户是否是超级管理员 cursor.execute(""" SELECT COUNT(*) as count FROM user_roles ur JOIN roles r ON ur.role_id = r.id WHERE ur.user_id = %s AND r.name = 'super_admin' AND ur.is_active = 1 """, (user_id,)) result = cursor.fetchone() is_super_admin = result['count'] > 0 if result else False if is_super_admin: # 超级管理员返回所有活跃菜单 cursor.execute(""" SELECT m.id, m.parent_id, m.name, m.title, m.path, m.component, m.icon, m.sort_order, m.menu_type, m.is_hidden, m.is_active FROM menus m WHERE m.is_active = 1 ORDER BY m.sort_order """) else: # 普通用户根据角色权限获取菜单 cursor.execute(""" SELECT m.id, m.parent_id, m.name, m.title, m.path, m.component, m.icon, m.sort_order, m.menu_type, m.is_hidden, m.is_active FROM menus m JOIN role_menus rm ON m.id = rm.menu_id JOIN user_roles ur ON rm.role_id = ur.role_id WHERE ur.user_id = %s AND ur.is_active = 1 AND m.is_active = 1 GROUP BY m.id, m.parent_id, m.name, m.title, m.path, m.component, m.icon, m.sort_order, m.menu_type, m.is_hidden, m.is_active ORDER BY m.sort_order """, (user_id,)) menus = [] for row in cursor.fetchall(): menu = { "id": row['id'], "parent_id": row['parent_id'], "name": row['name'], "title": row['title'], "path": row['path'], "component": row['component'], "icon": row['icon'], "sort_order": row['sort_order'], "menu_type": row['menu_type'], "is_hidden": bool(row['is_hidden']), "is_active": bool(row['is_active']), "children": [] } menus.append(menu) return menus finally: cursor.close() conn.close() async def get_all_menus(self, page: int, page_size: int, keyword: Optional[str] = None) -> Tuple[List[Dict[str, Any]], int]: """获取所有菜单(管理员)""" conn = get_db_connection() if not conn: return [], 0 cursor = conn.cursor() try: # 构建查询条件 where_conditions = [] params = [] if keyword: where_conditions.append("(m.title LIKE %s OR m.name LIKE %s)") params.extend([f"%{keyword}%", f"%{keyword}%"]) where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" # 查询总数 cursor.execute(f"SELECT COUNT(*) as count FROM menus m WHERE {where_clause}", params) total = cursor.fetchone()['count'] # 查询菜单列表 cursor.execute(f""" SELECT m.id, m.parent_id, m.name, m.title, m.path, m.component, m.icon, m.sort_order, m.menu_type, m.is_hidden, m.is_active, m.description, m.created_at, m.updated_at, pm.title as parent_title FROM menus m LEFT JOIN menus pm ON m.parent_id = pm.id WHERE {where_clause} ORDER BY CASE WHEN m.parent_id IS NULL THEN 0 ELSE 1 END, m.sort_order, CASE WHEN m.menu_type = 'menu' THEN 0 ELSE 1 END, m.created_at LIMIT %s OFFSET %s """, params + [page_size, (page - 1) * page_size]) menus = [] for row in cursor.fetchall(): menu = { "id": row['id'], "parent_id": row['parent_id'], "name": row['name'], "title": row['title'], "path": row['path'], "component": row['component'], "icon": row['icon'], "sort_order": row['sort_order'], "menu_type": row['menu_type'], "is_hidden": bool(row['is_hidden']), "is_active": bool(row['is_active']), "description": row['description'], "created_at": row['created_at'].isoformat() if row['created_at'] else None, "updated_at": row['updated_at'].isoformat() if row['updated_at'] else None, "parent_title": row['parent_title'] } menus.append(menu) return menus, total finally: cursor.close() conn.close() async def create_menu(self, menu_data: Dict[str, Any]) -> Tuple[bool, str]: """创建菜单""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查菜单名是否已存在 cursor.execute("SELECT id FROM menus WHERE name = %s", (menu_data['name'],)) if cursor.fetchone(): return False, "菜单标识已存在" # 创建菜单 menu_id = str(uuid.uuid4()) cursor.execute(""" INSERT INTO menus (id, parent_id, name, title, path, component, icon, sort_order, menu_type, is_hidden, is_active, description, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) """, ( menu_id, menu_data.get('parent_id'), menu_data['name'], menu_data['title'], menu_data.get('path'), menu_data.get('component'), menu_data.get('icon'), menu_data.get('sort_order', 0), menu_data.get('menu_type', 'menu'), menu_data.get('is_hidden', False), menu_data.get('is_active', True), menu_data.get('description') )) conn.commit() return True, "菜单创建成功" except Exception as e: logger.exception("创建菜单错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def update_menu(self, menu_id: str, menu_data: Dict[str, Any]) -> Tuple[bool, str]: """更新菜单""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 更新菜单 update_fields = [] update_values = [] for field in ['parent_id', 'title', 'path', 'component', 'icon', 'sort_order', 'menu_type', 'is_hidden', 'is_active', 'description']: if field in menu_data: update_fields.append(f'{field} = %s') update_values.append(menu_data[field]) if update_fields: update_values.append(menu_id) cursor.execute(f""" UPDATE menus SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s """, update_values) conn.commit() return True, "菜单更新成功" except Exception as e: logger.exception("更新菜单错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def delete_menu(self, menu_id: str) -> Tuple[bool, str]: """删除菜单""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查是否有子菜单 cursor.execute("SELECT COUNT(*) as count FROM menus WHERE parent_id = %s", (menu_id,)) if cursor.fetchone()['count'] > 0: return False, "该菜单下有子菜单,无法删除" # 删除菜单相关数据 cursor.execute("DELETE FROM role_menus WHERE menu_id = %s", (menu_id,)) cursor.execute("DELETE FROM menus WHERE id = %s", (menu_id,)) conn.commit() return True, "菜单删除成功" except Exception as e: logger.exception("删除菜单错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() # ==================== 角色管理 ==================== async def get_all_roles(self, page: int, page_size: int, keyword: Optional[str] = None) -> Tuple[List[Dict[str, Any]], int]: """获取所有角色""" conn = get_db_connection() if not conn: return [], 0 cursor = conn.cursor() try: # 构建查询条件 where_conditions = [] params = [] if keyword: where_conditions.append("(r.display_name LIKE %s OR r.name LIKE %s)") params.extend([f"%{keyword}%", f"%{keyword}%"]) where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" # 查询总数 cursor.execute(f"SELECT COUNT(*) as count FROM roles r WHERE {where_clause}", params) total = cursor.fetchone()['count'] # 查询角色列表 offset = (page - 1) * page_size cursor.execute(f""" SELECT r.id, r.name, r.display_name, r.description, r.is_active, r.is_system, r.created_at, r.updated_at, COUNT(ur.user_id) as user_count FROM roles r LEFT JOIN user_roles ur ON r.id = ur.role_id AND ur.is_active = 1 WHERE {where_clause} GROUP BY r.id ORDER BY r.is_system DESC, r.created_at LIMIT %s OFFSET %s """, params + [page_size, offset]) roles = [] for row in cursor.fetchall(): role = { "id": row['id'], "name": row['name'], "display_name": row['display_name'], "description": row['description'], "is_active": bool(row['is_active']), "is_system": bool(row['is_system']), "created_at": row['created_at'].isoformat() if row['created_at'] else None, "updated_at": row['updated_at'].isoformat() if row['updated_at'] else None, "user_count": row['user_count'] } roles.append(role) return roles, total finally: cursor.close() conn.close() async def get_all_roles_simple(self) -> List[Dict[str, Any]]: """获取所有角色(简化版,用于下拉选择)""" conn = get_db_connection() if not conn: return [] cursor = conn.cursor() try: cursor.execute(""" SELECT id, name, display_name, is_system, is_active FROM roles WHERE is_active = 1 ORDER BY is_system DESC, display_name """) roles = [] for row in cursor.fetchall(): roles.append({ "id": row['id'], "name": row['name'], "display_name": row['display_name'], "is_system": bool(row['is_system']), "is_active": bool(row['is_active']) }) return roles finally: cursor.close() conn.close() async def create_role(self, role_data: Dict[str, Any]) -> Tuple[bool, str]: """创建角色""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查角色名是否已存在 cursor.execute("SELECT id FROM roles WHERE name = %s", (role_data['name'],)) if cursor.fetchone(): return False, "角色名已存在" # 创建角色 role_id = str(uuid.uuid4()) cursor.execute(""" INSERT INTO roles (id, name, display_name, description, is_active, is_system, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW()) """, (role_id, role_data['name'], role_data['display_name'], role_data.get('description'), role_data.get('is_active', True), False)) conn.commit() return True, "角色创建成功" except Exception as e: logger.exception("创建角色错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def update_role(self, role_id: str, role_data: Dict[str, Any]) -> Tuple[bool, str]: """更新角色""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查是否为系统角色 cursor.execute("SELECT is_system FROM roles WHERE id = %s", (role_id,)) role = cursor.fetchone() if not role: return False, "角色不存在" if role["is_system"]: return False, "不能修改系统角色" # 更新角色 update_fields = [] update_values = [] for field in ['display_name', 'description', 'is_active']: if field in role_data: update_fields.append(f'{field} = %s') update_values.append(role_data[field]) if update_fields: update_values.append(role_id) cursor.execute(f""" UPDATE roles SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s """, update_values) conn.commit() return True, "角色更新成功" except Exception as e: logger.exception("更新角色错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def delete_role(self, role_id: str) -> Tuple[bool, str]: """删除角色""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查是否为系统角色 cursor.execute("SELECT is_system FROM roles WHERE id = %s", (role_id,)) role = cursor.fetchone() if not role: return False, "角色不存在" if role["is_system"]: return False, "不能删除系统角色" # 检查是否有用户使用此角色 cursor.execute("SELECT COUNT(*) as count FROM user_roles WHERE role_id = %s", (role_id,)) if cursor.fetchone()['count'] > 0: return False, "该角色正在被使用,无法删除" # 删除角色相关数据 cursor.execute("DELETE FROM role_permissions WHERE role_id = %s", (role_id,)) cursor.execute("DELETE FROM role_menus WHERE role_id = %s", (role_id,)) cursor.execute("DELETE FROM roles WHERE id = %s", (role_id,)) conn.commit() return True, "角色删除成功" except Exception as e: logger.exception("删除角色错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def get_role_menus(self, role_id: str) -> Tuple[Optional[Dict[str, Any]], List[str], List[Dict[str, Any]]]: """获取角色的菜单权限""" conn = get_db_connection() if not conn: return None, [], [] cursor = conn.cursor() try: # 检查角色是否存在 cursor.execute("SELECT id, name FROM roles WHERE id = %s", (role_id,)) role = cursor.fetchone() if not role: return None, [], [] # 检查是否为超级管理员角色 role_name = role["name"] is_super_admin_role = role_name == "super_admin" if is_super_admin_role: # 超级管理员默认拥有所有菜单权限 cursor.execute(""" SELECT id, name, title, parent_id, menu_type FROM menus WHERE is_active = 1 ORDER BY sort_order """) menu_permissions = cursor.fetchall() else: # 普通角色查询已分配的菜单权限 cursor.execute(""" SELECT m.id, m.name, m.title, m.parent_id, m.menu_type FROM role_menus rm JOIN menus m ON rm.menu_id = m.id WHERE rm.role_id = %s AND m.is_active = 1 ORDER BY m.sort_order """, (role_id,)) menu_permissions = cursor.fetchall() # 构建返回数据 menu_ids = [menu["id"] for menu in menu_permissions] menu_details = [] for menu in menu_permissions: menu_details.append({ "id": menu["id"], "name": menu["name"], "title": menu["title"], "parent_id": menu["parent_id"], "menu_type": menu["menu_type"] }) role_info = { "id": role["id"], "name": role["name"] } return role_info, menu_ids, menu_details finally: cursor.close() conn.close() async def update_role_menus(self, role_id: str, menu_ids: List[str]) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """更新角色的菜单权限""" conn = get_db_connection() if not conn: return False, "数据库连接失败", None cursor = conn.cursor() try: # 检查角色是否存在 cursor.execute("SELECT id, name FROM roles WHERE id = %s", (role_id,)) role = cursor.fetchone() if not role: return False, "角色不存在", None # 检查是否为超级管理员角色 role_name = role["name"] is_super_admin_role = role_name == "super_admin" if is_super_admin_role: return False, "超级管理员角色拥有全部权限,无需修改", None # 验证菜单ID是否存在 if menu_ids: placeholders = ','.join(['%s'] * len(menu_ids)) cursor.execute(f""" SELECT id FROM menus WHERE id IN ({placeholders}) AND is_active = 1 """, menu_ids) valid_menu_ids = [row['id'] for row in cursor.fetchall()] invalid_menu_ids = set(menu_ids) - set(valid_menu_ids) if invalid_menu_ids: return False, f"无效的菜单ID: {', '.join(invalid_menu_ids)}", None # 开始事务 cursor.execute("START TRANSACTION") # 删除角色现有的菜单权限 cursor.execute("DELETE FROM role_menus WHERE role_id = %s", (role_id,)) # 添加新的菜单权限 if menu_ids: values = [(role_id, menu_id) for menu_id in menu_ids] cursor.executemany(""" INSERT INTO role_menus (role_id, menu_id, created_at) VALUES (%s, %s, NOW()) """, values) # 提交事务 conn.commit() result_data = { "role_id": role_id, "role_name": role["name"], "menu_ids": menu_ids, "updated_count": len(menu_ids) } return True, "角色菜单权限更新成功", result_data except Exception as e: logger.exception("更新角色菜单权限错误") conn.rollback() return False, "服务器内部错误", None finally: cursor.close() conn.close() async def get_user_permissions(self, user_id: str) -> List[Dict[str, Any]]: """获取用户权限""" conn = get_db_connection() if not conn: return [] cursor = conn.cursor() try: # 获取用户权限 cursor.execute(""" SELECT DISTINCT p.name, p.resource, p.action FROM permissions p JOIN role_permissions rp ON p.id = rp.permission_id JOIN user_roles ur ON rp.role_id = ur.role_id WHERE ur.user_id = %s AND ur.is_active = 1 AND p.is_active = 1 """, (user_id,)) permissions = [] for row in cursor.fetchall(): permissions.append({ "name": row['name'], "resource": row['resource'], "action": row['action'] }) return permissions finally: cursor.close() conn.close()