| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809 |
- """
- 系统管理服务层
- 从 system_view.py 提取的SQL查询逻辑
- """
- import logging
- import uuid
- import json
- import secrets
- from typing import Optional, List, Dict, Any, Tuple
- from datetime import datetime, timezone
- from app.base.async_mysql_connection import get_db_connection
- logger = logging.getLogger(__name__)
- def generate_random_string(length: int) -> str:
- """生成随机字符串"""
- return secrets.token_urlsafe(length)[:length]
- class SystemService:
- """系统管理服务类 - 使用 SQL 查询方式"""
-
- def __init__(self):
- """初始化服务"""
- pass
-
- # ==================== 用户资料管理 ====================
-
- async def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]:
- """获取用户资料"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- # 查找用户详细信息
- cursor.execute("""
- SELECT u.id, u.username, u.email, u.phone, u.avatar_url, u.is_active, u.is_superuser,
- u.last_login_at, u.created_at, u.updated_at,
- p.real_name, p.company, p.department, p.position
- FROM users u
- LEFT JOIN user_profiles p ON u.id = p.user_id
- WHERE u.id = %s
- """, (user_id,))
-
- user_data = cursor.fetchone()
-
- if not user_data:
- return None
-
- # 获取用户角色
- cursor.execute("""
- SELECT r.name
- FROM user_roles ur
- JOIN roles r ON ur.role_id = r.id
- WHERE ur.user_id = %s AND ur.is_active = 1
- """, (user_id,))
-
- roles = [row['name'] for row in cursor.fetchall()]
-
- # 构建用户信息
- user_info = {
- "id": user_data["id"],
- "username": user_data["username"],
- "email": user_data["email"],
- "phone": user_data["phone"],
- "avatar_url": user_data["avatar_url"],
- "is_active": user_data["is_active"],
- "is_superuser": user_data["is_superuser"],
- "last_login_at": user_data["last_login_at"].isoformat() if user_data["last_login_at"] else None,
- "created_at": user_data["created_at"].isoformat() if user_data["created_at"] else None,
- "updated_at": user_data["updated_at"].isoformat() if user_data["updated_at"] else None,
- "real_name": user_data["real_name"],
- "company": user_data["company"],
- "department": user_data["department"],
- "position": user_data["position"],
- "roles": roles
- }
-
- return user_info
- finally:
- cursor.close()
- conn.close()
-
- async def update_user_profile(self, user_id: str, profile_data: Dict[str, Any]) -> bool:
- """更新用户资料"""
- conn = get_db_connection()
- if not conn:
- return False
-
- cursor = conn.cursor()
-
- try:
- # 更新用户基本信息
- update_fields = []
- update_values = []
-
- if 'email' in profile_data:
- update_fields.append('email = %s')
- update_values.append(profile_data['email'])
-
- if 'phone' in profile_data:
- update_fields.append('phone = %s')
- update_values.append(profile_data['phone'])
-
- if update_fields:
- update_values.append(user_id)
- cursor.execute(f"""
- UPDATE users
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE id = %s
- """, update_values)
-
- # 更新或插入用户详情
- profile_fields = ['real_name', 'company', 'department', 'position']
- profile_updates = {k: v for k, v in profile_data.items() if k in profile_fields}
-
- if profile_updates:
- # 检查是否已有记录
- cursor.execute("SELECT id FROM user_profiles WHERE user_id = %s", (user_id,))
- profile_exists = cursor.fetchone()
-
- if profile_exists:
- # 更新现有记录
- update_fields = []
- update_values = []
- for field, value in profile_updates.items():
- update_fields.append(f'{field} = %s')
- update_values.append(value)
-
- update_values.append(user_id)
- cursor.execute(f"""
- UPDATE user_profiles
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE user_id = %s
- """, update_values)
- else:
- # 插入新记录
- fields = ['user_id'] + list(profile_updates.keys())
- values = [user_id] + list(profile_updates.values())
- placeholders = ', '.join(['%s'] * len(values))
-
- cursor.execute(f"""
- INSERT INTO user_profiles ({', '.join(fields)}, created_at, updated_at)
- VALUES ({placeholders}, NOW(), NOW())
- """, values)
-
- conn.commit()
- return True
- except Exception as e:
- logger.exception("更新用户资料错误")
- conn.rollback()
- return False
- finally:
- cursor.close()
- conn.close()
-
- async def verify_and_update_password(self, user_id: str, old_password_hash: str, new_password_hash: str) -> Tuple[bool, str]:
- """验证旧密码并更新为新密码"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 验证当前密码
- cursor.execute("SELECT password_hash FROM users WHERE id = %s", (user_id,))
- result = cursor.fetchone()
-
- if not result:
- return False, "用户不存在"
-
- if result["password_hash"] != old_password_hash:
- return False, "当前密码错误"
-
- # 更新密码
- cursor.execute("""
- UPDATE users
- SET password_hash = %s, updated_at = NOW()
- WHERE id = %s
- """, (new_password_hash, user_id))
-
- conn.commit()
- return True, "密码修改成功"
- except Exception as e:
- logger.exception("修改密码错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- # ==================== 菜单管理 ====================
-
- async def get_user_menus(self, user_id: str) -> List[Dict[str, Any]]:
- """获取用户菜单"""
- conn = get_db_connection()
- if not conn:
- return []
-
- cursor = conn.cursor()
-
- try:
- # 检查用户是否是超级管理员
- cursor.execute("""
- SELECT COUNT(*) as count FROM user_roles ur
- JOIN roles r ON ur.role_id = r.id
- WHERE ur.user_id = %s AND r.name = 'super_admin' AND ur.is_active = 1
- """, (user_id,))
-
- result = cursor.fetchone()
- is_super_admin = result['count'] > 0 if result else False
-
- if is_super_admin:
- # 超级管理员返回所有活跃菜单
- cursor.execute("""
- SELECT m.id, m.parent_id, m.name, m.title, m.path,
- m.component, m.icon, m.sort_order, m.menu_type,
- m.is_hidden, m.is_active
- FROM menus m
- WHERE m.is_active = 1
- ORDER BY m.sort_order
- """)
- else:
- # 普通用户根据角色权限获取菜单
- cursor.execute("""
- SELECT m.id, m.parent_id, m.name, m.title, m.path,
- m.component, m.icon, m.sort_order, m.menu_type,
- m.is_hidden, m.is_active
- FROM menus m
- JOIN role_menus rm ON m.id = rm.menu_id
- JOIN user_roles ur ON rm.role_id = ur.role_id
- WHERE ur.user_id = %s
- AND ur.is_active = 1
- AND m.is_active = 1
- GROUP BY m.id, m.parent_id, m.name, m.title, m.path,
- m.component, m.icon, m.sort_order, m.menu_type,
- m.is_hidden, m.is_active
- ORDER BY m.sort_order
- """, (user_id,))
-
- menus = []
- for row in cursor.fetchall():
- menu = {
- "id": row['id'],
- "parent_id": row['parent_id'],
- "name": row['name'],
- "title": row['title'],
- "path": row['path'],
- "component": row['component'],
- "icon": row['icon'],
- "sort_order": row['sort_order'],
- "menu_type": row['menu_type'],
- "is_hidden": bool(row['is_hidden']),
- "is_active": bool(row['is_active']),
- "children": []
- }
- menus.append(menu)
-
- return menus
- finally:
- cursor.close()
- conn.close()
-
- async def get_all_menus(self, page: int, page_size: int, keyword: Optional[str] = None) -> Tuple[List[Dict[str, Any]], int]:
- """获取所有菜单(管理员)"""
- conn = get_db_connection()
- if not conn:
- return [], 0
-
- cursor = conn.cursor()
-
- try:
- # 构建查询条件
- where_conditions = []
- params = []
-
- if keyword:
- where_conditions.append("(m.title LIKE %s OR m.name LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
-
- where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
-
- # 查询总数
- cursor.execute(f"SELECT COUNT(*) as count FROM menus m WHERE {where_clause}", params)
- total = cursor.fetchone()['count']
-
- # 查询菜单列表
- cursor.execute(f"""
- SELECT m.id, m.parent_id, m.name, m.title, m.path, m.component,
- m.icon, m.sort_order, m.menu_type, m.is_hidden, m.is_active,
- m.description, m.created_at, m.updated_at,
- pm.title as parent_title
- FROM menus m
- LEFT JOIN menus pm ON m.parent_id = pm.id
- WHERE {where_clause}
- ORDER BY
- CASE WHEN m.parent_id IS NULL THEN 0 ELSE 1 END,
- m.sort_order,
- CASE WHEN m.menu_type = 'menu' THEN 0 ELSE 1 END,
- m.created_at
- LIMIT %s OFFSET %s
- """, params + [page_size, (page - 1) * page_size])
-
- menus = []
- for row in cursor.fetchall():
- menu = {
- "id": row['id'],
- "parent_id": row['parent_id'],
- "name": row['name'],
- "title": row['title'],
- "path": row['path'],
- "component": row['component'],
- "icon": row['icon'],
- "sort_order": row['sort_order'],
- "menu_type": row['menu_type'],
- "is_hidden": bool(row['is_hidden']),
- "is_active": bool(row['is_active']),
- "description": row['description'],
- "created_at": row['created_at'].isoformat() if row['created_at'] else None,
- "updated_at": row['updated_at'].isoformat() if row['updated_at'] else None,
- "parent_title": row['parent_title']
- }
- menus.append(menu)
-
- return menus, total
- finally:
- cursor.close()
- conn.close()
-
- async def create_menu(self, menu_data: Dict[str, Any]) -> Tuple[bool, str]:
- """创建菜单"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查菜单名是否已存在
- cursor.execute("SELECT id FROM menus WHERE name = %s", (menu_data['name'],))
- if cursor.fetchone():
- return False, "菜单标识已存在"
-
- # 创建菜单
- menu_id = str(uuid.uuid4())
- cursor.execute("""
- INSERT INTO menus (id, parent_id, name, title, path, component, icon,
- sort_order, menu_type, is_hidden, is_active, description, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """, (
- menu_id, menu_data.get('parent_id'), menu_data['name'], menu_data['title'],
- menu_data.get('path'), menu_data.get('component'), menu_data.get('icon'),
- menu_data.get('sort_order', 0), menu_data.get('menu_type', 'menu'),
- menu_data.get('is_hidden', False), menu_data.get('is_active', True),
- menu_data.get('description')
- ))
-
- conn.commit()
- return True, "菜单创建成功"
- except Exception as e:
- logger.exception("创建菜单错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def update_menu(self, menu_id: str, menu_data: Dict[str, Any]) -> Tuple[bool, str]:
- """更新菜单"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 更新菜单
- update_fields = []
- update_values = []
-
- for field in ['parent_id', 'title', 'path', 'component', 'icon', 'sort_order',
- 'menu_type', 'is_hidden', 'is_active', 'description']:
- if field in menu_data:
- update_fields.append(f'{field} = %s')
- update_values.append(menu_data[field])
-
- if update_fields:
- update_values.append(menu_id)
- cursor.execute(f"""
- UPDATE menus
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE id = %s
- """, update_values)
-
- conn.commit()
- return True, "菜单更新成功"
- except Exception as e:
- logger.exception("更新菜单错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def delete_menu(self, menu_id: str) -> Tuple[bool, str]:
- """删除菜单"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查是否有子菜单
- cursor.execute("SELECT COUNT(*) as count FROM menus WHERE parent_id = %s", (menu_id,))
- if cursor.fetchone()['count'] > 0:
- return False, "该菜单下有子菜单,无法删除"
-
- # 删除菜单相关数据
- cursor.execute("DELETE FROM role_menus WHERE menu_id = %s", (menu_id,))
- cursor.execute("DELETE FROM menus WHERE id = %s", (menu_id,))
-
- conn.commit()
- return True, "菜单删除成功"
- except Exception as e:
- logger.exception("删除菜单错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- # ==================== 角色管理 ====================
-
- async def get_all_roles(self, page: int, page_size: int, keyword: Optional[str] = None) -> Tuple[List[Dict[str, Any]], int]:
- """获取所有角色"""
- conn = get_db_connection()
- if not conn:
- return [], 0
-
- cursor = conn.cursor()
-
- try:
- # 构建查询条件
- where_conditions = []
- params = []
-
- if keyword:
- where_conditions.append("(r.display_name LIKE %s OR r.name LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
-
- where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
-
- # 查询总数
- cursor.execute(f"SELECT COUNT(*) as count FROM roles r WHERE {where_clause}", params)
- total = cursor.fetchone()['count']
-
- # 查询角色列表
- offset = (page - 1) * page_size
- cursor.execute(f"""
- SELECT r.id, r.name, r.display_name, r.description, r.is_active,
- r.is_system, r.created_at, r.updated_at,
- COUNT(ur.user_id) as user_count
- FROM roles r
- LEFT JOIN user_roles ur ON r.id = ur.role_id AND ur.is_active = 1
- WHERE {where_clause}
- GROUP BY r.id
- ORDER BY r.is_system DESC, r.created_at
- LIMIT %s OFFSET %s
- """, params + [page_size, offset])
-
- roles = []
- for row in cursor.fetchall():
- role = {
- "id": row['id'],
- "name": row['name'],
- "display_name": row['display_name'],
- "description": row['description'],
- "is_active": bool(row['is_active']),
- "is_system": bool(row['is_system']),
- "created_at": row['created_at'].isoformat() if row['created_at'] else None,
- "updated_at": row['updated_at'].isoformat() if row['updated_at'] else None,
- "user_count": row['user_count']
- }
- roles.append(role)
-
- return roles, total
- finally:
- cursor.close()
- conn.close()
-
- async def get_all_roles_simple(self) -> List[Dict[str, Any]]:
- """获取所有角色(简化版,用于下拉选择)"""
- conn = get_db_connection()
- if not conn:
- return []
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("""
- SELECT id, name, display_name, is_system, is_active
- FROM roles
- WHERE is_active = 1
- ORDER BY is_system DESC, display_name
- """)
-
- roles = []
- for row in cursor.fetchall():
- roles.append({
- "id": row['id'],
- "name": row['name'],
- "display_name": row['display_name'],
- "is_system": bool(row['is_system']),
- "is_active": bool(row['is_active'])
- })
-
- return roles
- finally:
- cursor.close()
- conn.close()
-
- async def create_role(self, role_data: Dict[str, Any]) -> Tuple[bool, str]:
- """创建角色"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查角色名是否已存在
- cursor.execute("SELECT id FROM roles WHERE name = %s", (role_data['name'],))
- if cursor.fetchone():
- return False, "角色名已存在"
-
- # 创建角色
- role_id = str(uuid.uuid4())
- cursor.execute("""
- INSERT INTO roles (id, name, display_name, description, is_active, is_system, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())
- """, (role_id, role_data['name'], role_data['display_name'], role_data.get('description'),
- role_data.get('is_active', True), False))
-
- conn.commit()
- return True, "角色创建成功"
- except Exception as e:
- logger.exception("创建角色错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def update_role(self, role_id: str, role_data: Dict[str, Any]) -> Tuple[bool, str]:
- """更新角色"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查是否为系统角色
- cursor.execute("SELECT is_system FROM roles WHERE id = %s", (role_id,))
- role = cursor.fetchone()
- if not role:
- return False, "角色不存在"
-
- if role["is_system"]:
- return False, "不能修改系统角色"
-
- # 更新角色
- update_fields = []
- update_values = []
-
- for field in ['display_name', 'description', 'is_active']:
- if field in role_data:
- update_fields.append(f'{field} = %s')
- update_values.append(role_data[field])
-
- if update_fields:
- update_values.append(role_id)
- cursor.execute(f"""
- UPDATE roles
- SET {', '.join(update_fields)}, updated_at = NOW()
- WHERE id = %s
- """, update_values)
-
- conn.commit()
- return True, "角色更新成功"
- except Exception as e:
- logger.exception("更新角色错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def delete_role(self, role_id: str) -> Tuple[bool, str]:
- """删除角色"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查是否为系统角色
- cursor.execute("SELECT is_system FROM roles WHERE id = %s", (role_id,))
- role = cursor.fetchone()
- if not role:
- return False, "角色不存在"
-
- if role["is_system"]:
- return False, "不能删除系统角色"
-
- # 检查是否有用户使用此角色
- cursor.execute("SELECT COUNT(*) as count FROM user_roles WHERE role_id = %s", (role_id,))
- if cursor.fetchone()['count'] > 0:
- return False, "该角色正在被使用,无法删除"
-
- # 删除角色相关数据
- cursor.execute("DELETE FROM role_permissions WHERE role_id = %s", (role_id,))
- cursor.execute("DELETE FROM role_menus WHERE role_id = %s", (role_id,))
- cursor.execute("DELETE FROM roles WHERE id = %s", (role_id,))
-
- conn.commit()
- return True, "角色删除成功"
- except Exception as e:
- logger.exception("删除角色错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def get_role_menus(self, role_id: str) -> Tuple[Optional[Dict[str, Any]], List[str], List[Dict[str, Any]]]:
- """获取角色的菜单权限"""
- conn = get_db_connection()
- if not conn:
- return None, [], []
-
- cursor = conn.cursor()
-
- try:
- # 检查角色是否存在
- cursor.execute("SELECT id, name FROM roles WHERE id = %s", (role_id,))
- role = cursor.fetchone()
-
- if not role:
- return None, [], []
-
- # 检查是否为超级管理员角色
- role_name = role["name"]
- is_super_admin_role = role_name == "super_admin"
-
- if is_super_admin_role:
- # 超级管理员默认拥有所有菜单权限
- cursor.execute("""
- SELECT id, name, title, parent_id, menu_type
- FROM menus
- WHERE is_active = 1
- ORDER BY sort_order
- """)
- menu_permissions = cursor.fetchall()
- else:
- # 普通角色查询已分配的菜单权限
- cursor.execute("""
- SELECT m.id, m.name, m.title, m.parent_id, m.menu_type
- FROM role_menus rm
- JOIN menus m ON rm.menu_id = m.id
- WHERE rm.role_id = %s AND m.is_active = 1
- ORDER BY m.sort_order
- """, (role_id,))
- menu_permissions = cursor.fetchall()
-
- # 构建返回数据
- menu_ids = [menu["id"] for menu in menu_permissions]
- menu_details = []
-
- for menu in menu_permissions:
- menu_details.append({
- "id": menu["id"],
- "name": menu["name"],
- "title": menu["title"],
- "parent_id": menu["parent_id"],
- "menu_type": menu["menu_type"]
- })
-
- role_info = {
- "id": role["id"],
- "name": role["name"]
- }
-
- return role_info, menu_ids, menu_details
- finally:
- cursor.close()
- conn.close()
-
- async def update_role_menus(self, role_id: str, menu_ids: List[str]) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
- """更新角色的菜单权限"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
-
- try:
- # 检查角色是否存在
- cursor.execute("SELECT id, name FROM roles WHERE id = %s", (role_id,))
- role = cursor.fetchone()
-
- if not role:
- return False, "角色不存在", None
-
- # 检查是否为超级管理员角色
- role_name = role["name"]
- is_super_admin_role = role_name == "super_admin"
-
- if is_super_admin_role:
- return False, "超级管理员角色拥有全部权限,无需修改", None
-
- # 验证菜单ID是否存在
- if menu_ids:
- placeholders = ','.join(['%s'] * len(menu_ids))
- cursor.execute(f"""
- SELECT id FROM menus
- WHERE id IN ({placeholders}) AND is_active = 1
- """, menu_ids)
-
- valid_menu_ids = [row['id'] for row in cursor.fetchall()]
- invalid_menu_ids = set(menu_ids) - set(valid_menu_ids)
-
- if invalid_menu_ids:
- return False, f"无效的菜单ID: {', '.join(invalid_menu_ids)}", None
-
- # 开始事务
- cursor.execute("START TRANSACTION")
-
- # 删除角色现有的菜单权限
- cursor.execute("DELETE FROM role_menus WHERE role_id = %s", (role_id,))
-
- # 添加新的菜单权限
- if menu_ids:
- values = [(role_id, menu_id) for menu_id in menu_ids]
- cursor.executemany("""
- INSERT INTO role_menus (role_id, menu_id, created_at)
- VALUES (%s, %s, NOW())
- """, values)
-
- # 提交事务
- conn.commit()
-
- result_data = {
- "role_id": role_id,
- "role_name": role["name"],
- "menu_ids": menu_ids,
- "updated_count": len(menu_ids)
- }
-
- return True, "角色菜单权限更新成功", result_data
- except Exception as e:
- logger.exception("更新角色菜单权限错误")
- conn.rollback()
- return False, "服务器内部错误", None
- finally:
- cursor.close()
- conn.close()
-
- async def get_user_permissions(self, user_id: str) -> List[Dict[str, Any]]:
- """获取用户权限"""
- conn = get_db_connection()
- if not conn:
- return []
-
- cursor = conn.cursor()
-
- try:
- # 获取用户权限
- cursor.execute("""
- SELECT DISTINCT p.name, p.resource, p.action
- FROM permissions p
- JOIN role_permissions rp ON p.id = rp.permission_id
- JOIN user_roles ur ON rp.role_id = ur.role_id
- WHERE ur.user_id = %s
- AND ur.is_active = 1
- AND p.is_active = 1
- """, (user_id,))
-
- permissions = []
- for row in cursor.fetchall():
- permissions.append({
- "name": row['name'],
- "resource": row['resource'],
- "action": row['action']
- })
-
- return permissions
- finally:
- cursor.close()
- conn.close()
|