""" 系统管理服务层扩展 - 用户管理和应用管理 """ import sys import os # 添加src目录到Python路径 sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..')) sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..')) import logging import uuid import json import secrets from typing import Optional, List, Dict, Any, Tuple from datetime import datetime, timezone from app.base.async_mysql_connection import get_db_connection logger = logging.getLogger(__name__) class SystemServiceExt: """系统管理服务扩展类 - 用户管理和应用管理""" def __init__(self): """初始化服务""" pass # ==================== 用户管理 ==================== async def get_user_detail(self, user_id: str) -> Optional[Dict[str, Any]]: """获取用户详情(包含角色信息)""" conn = get_db_connection() if not conn: return None cursor = conn.cursor() try: # 查询用户基本信息和详情 cursor.execute(""" SELECT u.id, u.username, u.email, u.phone, u.is_active, u.is_superuser, u.last_login_at, u.created_time, up.real_name, up.company, up.department FROM t_sys_user u LEFT JOIN t_sys_user_profile up ON u.id = up.user_id WHERE u.id = %s """, (user_id,)) user_data = cursor.fetchone() if not user_data: return None # 查询用户角色 cursor.execute(""" SELECT r.id, r.name, r.code FROM t_sys_user_role ur JOIN t_sys_role r ON ur.role_id = r.id WHERE ur.user_id = %s AND ur.is_active = 1 """, (user_id,)) roles = cursor.fetchall() role_ids = [role['id'] for role in roles] role_names = [role['name'] for role in roles] # 构建返回数据 user_detail = { "id": user_data['id'], "username": user_data['username'], "email": user_data['email'], "phone": user_data['phone'], "is_active": bool(user_data['is_active']), "is_superuser": bool(user_data['is_superuser']), "last_login_at": user_data['last_login_at'].isoformat() if user_data['last_login_at'] else None, "created_time": user_data['created_time'].isoformat() if user_data['created_time'] else None, "real_name": user_data['real_name'], "company": user_data['company'], "department": user_data['department'], "role_ids": role_ids, "roles": ', '.join(role_names) if role_names else None } return user_detail finally: cursor.close() conn.close() async def get_users(self, page: int, page_size: int, keyword: Optional[str] = None) -> Tuple[List[Dict[str, Any]], int]: """获取用户列表""" conn = get_db_connection() if not conn: return [], 0 cursor = conn.cursor() try: # 构建查询条件 where_conditions = [] params = [] if keyword: where_conditions.append("(u.username LIKE %s OR u.email LIKE %s OR up.real_name LIKE %s)") params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"]) where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" # 查询总数 cursor.execute(f"SELECT COUNT(*) as count FROM t_sys_user u LEFT JOIN t_sys_user_profile up ON u.id = up.user_id WHERE {where_clause}", params) total = cursor.fetchone()['count'] # 查询用户列表 offset = (page - 1) * page_size cursor.execute(f""" SELECT u.id, u.username, u.email, u.phone, u.is_active, u.is_superuser, u.last_login_at, u.created_time, u.updated_time, u.created_by, u.updated_by, up.real_name, up.company, up.department, GROUP_CONCAT(r.name) as roles, creator.username as created_by_name, updater.username as updated_by_name FROM t_sys_user u LEFT JOIN t_sys_user_profile up ON u.id = up.user_id LEFT JOIN t_sys_user_role ur ON u.id = ur.user_id AND ur.is_active = 1 LEFT JOIN t_sys_role r ON ur.role_id = r.id LEFT JOIN t_sys_user creator ON u.created_by = creator.id LEFT JOIN t_sys_user updater ON u.updated_by = updater.id WHERE {where_clause} GROUP BY u.id, u.username, u.email, u.phone, u.is_active, u.is_superuser, u.last_login_at, u.created_time, u.updated_time, u.created_by, u.updated_by, up.real_name, up.company, up.department, creator.username, updater.username ORDER BY u.created_time DESC LIMIT %s OFFSET %s """, params + [page_size, offset]) users = [] for row in cursor.fetchall(): users.append({ "id": row['id'], "username": row['username'], "email": row['email'], "phone": row['phone'], "is_active": bool(row['is_active']), "is_superuser": bool(row['is_superuser']), "last_login_at": row['last_login_at'].isoformat() if row['last_login_at'] else None, "created_time": row['created_time'].isoformat() if row['created_time'] else None, "updated_time": row['updated_time'].isoformat() if row['updated_time'] else None, "created_by": row['created_by'], "updated_by": row['updated_by'], "created_by_name": row['created_by_name'], "updated_by_name": row['updated_by_name'], "real_name": row['real_name'], "company": row['company'], "department": row['department'], "roles": row['roles'] }) return users, total finally: cursor.close() conn.close() async def create_user(self, user_data: Dict[str, Any], password_hash: str, creator_id: str) -> Tuple[bool, str]: """创建用户""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查用户名和邮箱是否已存在 cursor.execute("SELECT id FROM t_sys_user WHERE username = %s OR email = %s", (user_data['username'], user_data['email'])) if cursor.fetchone(): return False, "用户名或邮箱已存在" # 生成用户ID user_id = str(uuid.uuid4()) # 插入用户 cursor.execute(""" INSERT INTO t_sys_user (id, username, email, phone, password_hash, is_active, is_superuser, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) """, (user_id, user_data['username'], user_data['email'], user_data.get('phone'), password_hash, user_data.get('is_active', True), user_data.get('is_superuser', False), creator_id)) # 插入用户详情 if any(key in user_data for key in ['real_name', 'company', 'department']): profile_id = str(uuid.uuid4()) cursor.execute(""" INSERT INTO t_sys_user_profile (id, user_id, real_name, company, department, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW()) """, (profile_id, user_id, user_data.get('real_name'), user_data.get('company'), user_data.get('department'), creator_id)) # 分配角色 if 'role_ids' in user_data and user_data['role_ids']: for role_id in user_data['role_ids']: role_assignment_id = str(uuid.uuid4()) cursor.execute(""" INSERT INTO t_sys_user_role (id, user_id, role_id, assigned_by, created_time) VALUES (%s, %s, %s, %s, NOW()) """, (role_assignment_id, user_id, role_id, creator_id)) conn.commit() return True, "用户创建成功" except Exception as e: logger.exception("创建用户错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def update_user(self, user_id: str, user_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]: """更新用户""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 更新用户基本信息 update_fields = [] update_values = [] # 处理密码字段 - 需要哈希 if 'password' in user_data and user_data['password']: update_fields.append('password_hash = %s') update_values.append(user_data['password']) for field in ['email', 'phone', 'is_active', 'is_superuser']: if field in user_data: update_fields.append(f'{field} = %s') update_values.append(user_data[field]) if update_fields: update_values.append(updater_id) update_values.append(user_id) cursor.execute(f""" UPDATE t_sys_user SET {', '.join(update_fields)}, updated_by = %s, updated_time = NOW() WHERE id = %s """, update_values) # 更新用户详情 profile_fields = ['real_name', 'company', 'department'] profile_updates = {k: v for k, v in user_data.items() if k in profile_fields} if profile_updates: # 检查是否已有记录 cursor.execute("SELECT id FROM t_sys_user_profile WHERE user_id = %s", (user_id,)) profile_exists = cursor.fetchone() if profile_exists: update_fields = [] update_values = [] for field, value in profile_updates.items(): update_fields.append(f'{field} = %s') update_values.append(value) update_values.append(updater_id) update_values.append(user_id) cursor.execute(f""" UPDATE t_sys_user_profile SET {', '.join(update_fields)}, updated_by = %s, updated_time = NOW() WHERE user_id = %s """, update_values) else: profile_id = str(uuid.uuid4()) fields = ['id', 'user_id'] + list(profile_updates.keys()) values = [profile_id, user_id] + list(profile_updates.values()) placeholders = ', '.join(['%s'] * len(values)) cursor.execute(f""" INSERT INTO t_sys_user_profile ({', '.join(fields)}, created_by, created_time, updated_time) VALUES ({placeholders}, %s, NOW(), NOW()) """, values + [updater_id]) # 更新用户角色 if 'role_ids' in user_data: # 删除现有角色 cursor.execute("DELETE FROM t_sys_user_role WHERE user_id = %s", (user_id,)) # 添加新角色 for role_id in user_data['role_ids']: assignment_id = str(uuid.uuid4()) cursor.execute(""" INSERT INTO t_sys_user_role (id, user_id, role_id, assigned_by, created_time) VALUES (%s, %s, %s, %s, NOW()) """, (assignment_id, user_id, role_id, updater_id)) conn.commit() return True, "用户更新成功" except Exception as e: logger.exception("更新用户错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def delete_user(self, user_id: str, current_user_id: str) -> Tuple[bool, str]: """删除用户""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 不能删除自己 if user_id == current_user_id: return False, "不能删除自己" # 检查是否为超级管理员 cursor.execute(""" SELECT COUNT(*) as count FROM t_sys_user_role ur JOIN t_sys_role r ON ur.role_id = r.id WHERE ur.user_id = %s AND r.name = 'super_admin' AND ur.is_active = 1 """, (user_id,)) if cursor.fetchone()['count'] > 0: return False, "不能删除超级管理员" # 删除相关数据 cursor.execute("DELETE FROM t_sys_user_role WHERE user_id = %s", (user_id,)) cursor.execute("DELETE FROM t_sys_user_profile WHERE user_id = %s", (user_id,)) cursor.execute("DELETE FROM t_sys_user WHERE id = %s", (user_id,)) conn.commit() return True, "用户删除成功" except Exception as e: logger.exception("删除用户错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() # ==================== 应用管理 ==================== async def get_apps(self, page: int, page_size: int, user_id: str, is_app_manager: bool, keyword: str = "", status: str = "") -> Tuple[List[Dict[str, Any]], int]: """获取应用列表""" conn = get_db_connection() if not conn: return [], 0 cursor = conn.cursor() try: # 构建查询条件 where_conditions = [] params = [] # 如果不是应用管理员,只显示自己创建的应用 if not is_app_manager: where_conditions.append("created_by = %s") params.append(user_id) if keyword: where_conditions.append("(name LIKE %s OR description LIKE %s)") params.extend([f"%{keyword}%", f"%{keyword}%"]) if status == "active": where_conditions.append("is_active = 1") elif status == "inactive": where_conditions.append("is_active = 0") where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" # 查询总数 cursor.execute(f"SELECT COUNT(*) as count FROM t_sys_app WHERE {where_clause}", params) total = cursor.fetchone()['count'] # 查询应用列表 offset = (page - 1) * page_size cursor.execute(f""" SELECT id, name, app_key, description, icon_url, redirect_uris, scope, is_active, is_trusted, access_token_expires, refresh_token_expires, created_time, updated_time FROM t_sys_app WHERE {where_clause} ORDER BY created_time DESC LIMIT %s OFFSET %s """, params + [page_size, offset]) apps = [] for row in cursor.fetchall(): app = { "id": row['id'], "name": row['name'], "app_key": row['app_key'], "description": row['description'], "icon_url": row['icon_url'], "redirect_uris": json.loads(row['redirect_uris']) if row['redirect_uris'] else [], "scope": json.loads(row['scope']) if row['scope'] else [], "is_active": bool(row['is_active']), "is_trusted": bool(row['is_trusted']), "access_token_expires": row['access_token_expires'], "refresh_token_expires": row['refresh_token_expires'], "created_time": row['created_time'].isoformat() if row['created_time'] else None, "updated_time": row['updated_time'].isoformat() if row['updated_time'] else None, # 模拟统计数据 "today_requests": secrets.randbelow(1000), "active_users": secrets.randbelow(100) } apps.append(app) return apps, total finally: cursor.close() conn.close() async def get_app_detail(self, app_id: str, user_id: str) -> Optional[Dict[str, Any]]: """获取应用详情(包含密钥)""" conn = get_db_connection() if not conn: return None cursor = conn.cursor() try: # 查询应用详情(包含密钥) cursor.execute(""" SELECT id, name, app_key, app_secret, description, icon_url, redirect_uris, scope, is_active, is_trusted, access_token_expires, refresh_token_expires, created_time, updated_time FROM t_sys_app WHERE id = %s AND created_by = %s """, (app_id, user_id)) app_data = cursor.fetchone() if not app_data: return None app_detail = { "id": app_data["id"], "name": app_data["name"], "app_key": app_data["app_key"], "app_secret": app_data["app_secret"], "description": app_data["description"], "icon_url": app_data["icon_url"], "redirect_uris": json.loads(app_data["redirect_uris"]) if app_data["redirect_uris"] else [], "scope": json.loads(app_data["scope"]) if app_data["scope"] else [], "is_active": bool(app_data["is_active"]), "is_trusted": bool(app_data["is_trusted"]), "access_token_expires": app_data["access_token_expires"], "refresh_token_expires": app_data["refresh_token_expires"], "created_time": app_data["created_time"].isoformat() if app_data["created_time"] else None, "updated_time": app_data["updated_time"].isoformat() if app_data["updated_time"] else None } return app_detail finally: cursor.close() conn.close() async def create_app(self, app_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """创建应用""" conn = get_db_connection() if not conn: return False, "数据库连接失败", None cursor = conn.cursor() try: # 生成应用ID和密钥 app_id = str(uuid.uuid4()) app_key = secrets.token_urlsafe(32)[:32] app_secret = secrets.token_urlsafe(64)[:64] # 插入应用记录 cursor.execute(""" INSERT INTO t_sys_app ( id, name, app_key, app_secret, description, icon_url, redirect_uris, scope, is_active, is_trusted, access_token_expires, refresh_token_expires, created_by, created_time, updated_time ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW() ) """, ( app_id, app_data['name'], app_key, app_secret, app_data.get('description', ''), app_data.get('icon_url', ''), json.dumps(app_data['redirect_uris']), json.dumps(app_data.get('scope', ['profile'])), True, app_data.get('is_trusted', False), app_data.get('access_token_expires', 7200), app_data.get('refresh_token_expires', 2592000), user_id )) conn.commit() # 返回创建的应用信息 app_info = { "id": app_id, "name": app_data['name'], "app_key": app_key, "app_secret": app_secret, "description": app_data.get('description', ''), "redirect_uris": app_data['redirect_uris'], "scope": app_data.get('scope', ['profile']), "is_active": True, "is_trusted": app_data.get('is_trusted', False) } return True, "应用创建成功", app_info except Exception as e: logger.exception("创建应用错误") conn.rollback() return False, "服务器内部错误", None finally: cursor.close() conn.close() async def update_app(self, app_id: str, app_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]: """更新应用信息""" conn = get_db_connection() if not conn: return False, "数据库连接失败", None cursor = conn.cursor() try: # 检查应用是否存在且属于当前用户 cursor.execute(""" SELECT id, name FROM t_sys_app WHERE id = %s AND created_by = %s """, (app_id, user_id)) existing_app = cursor.fetchone() if not existing_app: return False, "应用不存在或无权限", None # 检查应用名称是否已被其他应用使用 name = app_data.get('name', '').strip() if name: cursor.execute(""" SELECT id FROM t_sys_app WHERE name = %s AND created_by = %s AND id != %s """, (name, user_id, app_id)) if cursor.fetchone(): return False, "应用名称已存在", None # 准备更新数据 description = (app_data.get('description') or '').strip() icon_url = (app_data.get('icon_url') or '').strip() redirect_uris = app_data.get('redirect_uris', []) scope = app_data.get('scope', ['profile', 'email']) is_trusted = app_data.get('is_trusted', False) access_token_expires = app_data.get('access_token_expires', 7200) refresh_token_expires = app_data.get('refresh_token_expires', 2592000) # 更新应用信息 cursor.execute(""" UPDATE t_sys_app SET name = %s, description = %s, icon_url = %s, redirect_uris = %s, scope = %s, is_trusted = %s, access_token_expires = %s, refresh_token_expires = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """, ( name, description, icon_url, json.dumps(redirect_uris), json.dumps(scope), is_trusted, access_token_expires, refresh_token_expires, user_id, app_id )) conn.commit() # 获取更新后的应用信息 cursor.execute(""" SELECT id, name, app_key, description, icon_url, redirect_uris, scope, is_active, is_trusted, access_token_expires, refresh_token_expires, created_time, updated_time FROM t_sys_app WHERE id = %s """, (app_id,)) app_info = cursor.fetchone() if app_info: app_result = { "id": app_info["id"], "name": app_info["name"], "app_key": app_info["app_key"], "description": app_info["description"], "icon_url": app_info["icon_url"], "redirect_uris": json.loads(app_info["redirect_uris"]) if app_info["redirect_uris"] else [], "scope": json.loads(app_info["scope"]) if app_info["scope"] else [], "is_active": bool(app_info["is_active"]), "is_trusted": bool(app_info["is_trusted"]), "access_token_expires": app_info["access_token_expires"], "refresh_token_expires": app_info["refresh_token_expires"], "created_time": app_info["created_time"].isoformat() if app_info["created_time"] else None, "updated_time": app_info["updated_time"].isoformat() if app_info["updated_time"] else None } return True, "应用更新成功", app_result return True, "应用更新成功", None except Exception as e: logger.exception("更新应用错误") conn.rollback() return False, "服务器内部错误", None finally: cursor.close() conn.close() async def toggle_app_status(self, app_id: str, is_active: bool, user_id: str) -> Tuple[bool, str]: """切换应用状态""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查应用是否存在且属于当前用户 cursor.execute(""" SELECT id, name FROM t_sys_app WHERE id = %s AND created_by = %s """, (app_id, user_id)) app_data = cursor.fetchone() if not app_data: return False, "应用不存在或无权限" # 更新应用状态 cursor.execute(""" UPDATE t_sys_app SET is_active = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """, (is_active, user_id, app_id)) conn.commit() action = "启用" if is_active else "禁用" logger.info(f"应用状态已更新: {app_data['name']} -> {action}") return True, f"应用已{action}" except Exception as e: logger.exception("切换应用状态错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def check_user_app_manager_role(self, user_id: str) -> bool: """检查用户是否是应用管理员""" conn = get_db_connection() if not conn: return False cursor = conn.cursor() try: cursor.execute(""" SELECT COUNT(*) as count FROM t_sys_user_role ur JOIN t_sys_role r ON ur.role_id = r.id WHERE ur.user_id = %s AND r.name IN ('super_admin', 'admin', 'app_manager') AND ur.is_active = 1 """, (user_id,)) return cursor.fetchone()['count'] > 0 finally: cursor.close() conn.close() async def reset_app_secret(self, app_id: str, user_id: str) -> Tuple[bool, str, Optional[str]]: """重置应用密钥""" conn = get_db_connection() if not conn: return False, "数据库连接失败", None cursor = conn.cursor() try: # 检查应用是否存在且属于当前用户 cursor.execute(""" SELECT id, name FROM t_sys_app WHERE id = %s AND created_by = %s """, (app_id, user_id)) app_data = cursor.fetchone() if not app_data: return False, "应用不存在或无权限", None # 生成新的应用密钥 new_secret = secrets.token_urlsafe(64)[:64] # 更新应用密钥 cursor.execute(""" UPDATE t_sys_app SET app_secret = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """, (new_secret, user_id, app_id)) conn.commit() logger.info(f"应用密钥已重置: {app_data['name']}") return True, "应用密钥已重置", new_secret except Exception as e: logger.exception("重置应用密钥错误") conn.rollback() return False, "服务器内部错误", None finally: cursor.close() conn.close() async def delete_app_by_id(self, app_id: str, user_id: str) -> Tuple[bool, str]: """删除应用""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查应用是否存在且属于当前用户 cursor.execute(""" SELECT id, name FROM t_sys_app WHERE id = %s AND created_by = %s """, (app_id, user_id)) app_data = cursor.fetchone() if not app_data: return False, "应用不存在或无权限" # 删除应用(级联删除相关数据) cursor.execute("DELETE FROM t_sys_app WHERE id = %s", (app_id,)) conn.commit() logger.info(f"应用已删除: {app_data['name']}") return True, "应用已删除" except Exception as e: logger.exception("删除应用错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close()