| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757 |
- """
- 系统管理服务层扩展 - 用户管理和应用管理
- """
- import sys
- import os
- # 添加src目录到Python路径
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
- import logging
- import uuid
- import json
- import secrets
- from typing import Optional, List, Dict, Any, Tuple
- from datetime import datetime, timezone
- from app.base.async_mysql_connection import get_db_connection
- logger = logging.getLogger(__name__)
- class SystemServiceExt:
- """系统管理服务扩展类 - 用户管理和应用管理"""
-
- def __init__(self):
- """初始化服务"""
- pass
-
- # ==================== 用户管理 ====================
-
- async def get_user_detail(self, user_id: str) -> Optional[Dict[str, Any]]:
- """获取用户详情(包含角色信息)"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- # 查询用户基本信息和详情
- cursor.execute("""
- SELECT u.id, u.username, u.email, u.phone, u.is_active, u.is_superuser,
- u.last_login_at, u.created_time, up.real_name, up.company, up.department
- FROM t_sys_user u
- LEFT JOIN t_sys_user_profile up ON u.id = up.user_id
- WHERE u.id = %s
- """, (user_id,))
-
- user_data = cursor.fetchone()
- if not user_data:
- return None
-
- # 查询用户角色
- cursor.execute("""
- SELECT r.id, r.name, r.code
- FROM t_sys_user_role ur
- JOIN t_sys_role r ON ur.role_id = r.id
- WHERE ur.user_id = %s AND ur.is_active = 1
- """, (user_id,))
-
- roles = cursor.fetchall()
- role_ids = [role['id'] for role in roles]
- role_names = [role['name'] for role in roles]
-
- # 构建返回数据
- user_detail = {
- "id": user_data['id'],
- "username": user_data['username'],
- "email": user_data['email'],
- "phone": user_data['phone'],
- "is_active": bool(user_data['is_active']),
- "is_superuser": bool(user_data['is_superuser']),
- "last_login_at": user_data['last_login_at'].isoformat() if user_data['last_login_at'] else None,
- "created_time": user_data['created_time'].isoformat() if user_data['created_time'] else None,
- "real_name": user_data['real_name'],
- "company": user_data['company'],
- "department": user_data['department'],
- "role_ids": role_ids,
- "roles": ', '.join(role_names) if role_names else None
- }
-
- return user_detail
- finally:
- cursor.close()
- conn.close()
-
- async def get_users(self, page: int, page_size: int, keyword: Optional[str] = None) -> Tuple[List[Dict[str, Any]], int]:
- """获取用户列表"""
- conn = get_db_connection()
- if not conn:
- return [], 0
-
- cursor = conn.cursor()
-
- try:
- # 构建查询条件
- where_conditions = []
- params = []
-
- if keyword:
- where_conditions.append("(u.username LIKE %s OR u.email LIKE %s OR up.real_name LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"])
-
- where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
-
- # 查询总数
- cursor.execute(f"SELECT COUNT(*) as count FROM t_sys_user u LEFT JOIN t_sys_user_profile up ON u.id = up.user_id WHERE {where_clause}", params)
- total = cursor.fetchone()['count']
-
- # 查询用户列表
- offset = (page - 1) * page_size
- cursor.execute(f"""
- SELECT u.id, u.username, u.email, u.phone, u.is_active, u.is_superuser,
- u.last_login_at, u.created_time, u.updated_time, u.created_by, u.updated_by,
- up.real_name, up.company, up.department,
- GROUP_CONCAT(r.name) as roles,
- creator.username as created_by_name,
- updater.username as updated_by_name
- FROM t_sys_user u
- LEFT JOIN t_sys_user_profile up ON u.id = up.user_id
- LEFT JOIN t_sys_user_role ur ON u.id = ur.user_id AND ur.is_active = 1
- LEFT JOIN t_sys_role r ON ur.role_id = r.id
- LEFT JOIN t_sys_user creator ON u.created_by = creator.id
- LEFT JOIN t_sys_user updater ON u.updated_by = updater.id
- WHERE {where_clause}
- GROUP BY u.id, u.username, u.email, u.phone, u.is_active, u.is_superuser,
- u.last_login_at, u.created_time, u.updated_time, u.created_by, u.updated_by,
- up.real_name, up.company, up.department, creator.username, updater.username
- ORDER BY u.created_time DESC
- LIMIT %s OFFSET %s
- """, params + [page_size, offset])
-
- users = []
- for row in cursor.fetchall():
- users.append({
- "id": row['id'],
- "username": row['username'],
- "email": row['email'],
- "phone": row['phone'],
- "is_active": bool(row['is_active']),
- "is_superuser": bool(row['is_superuser']),
- "last_login_at": row['last_login_at'].isoformat() if row['last_login_at'] else None,
- "created_time": row['created_time'].isoformat() if row['created_time'] else None,
- "updated_time": row['updated_time'].isoformat() if row['updated_time'] else None,
- "created_by": row['created_by'],
- "updated_by": row['updated_by'],
- "created_by_name": row['created_by_name'],
- "updated_by_name": row['updated_by_name'],
- "real_name": row['real_name'],
- "company": row['company'],
- "department": row['department'],
- "roles": row['roles']
- })
-
- return users, total
- finally:
- cursor.close()
- conn.close()
-
- async def create_user(self, user_data: Dict[str, Any], password_hash: str, creator_id: str) -> Tuple[bool, str]:
- """创建用户"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查用户名和邮箱是否已存在
- cursor.execute("SELECT id FROM t_sys_user WHERE username = %s OR email = %s",
- (user_data['username'], user_data['email']))
- if cursor.fetchone():
- return False, "用户名或邮箱已存在"
-
- # 生成用户ID
- user_id = str(uuid.uuid4())
- # 插入用户
- cursor.execute("""
- INSERT INTO t_sys_user (id, username, email, phone, password_hash, is_active, is_superuser, created_by, created_time, updated_time)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """, (user_id, user_data['username'], user_data['email'], user_data.get('phone'),
- password_hash, user_data.get('is_active', True), user_data.get('is_superuser', False), creator_id))
-
- # 插入用户详情
- if any(key in user_data for key in ['real_name', 'company', 'department']):
- profile_id = str(uuid.uuid4())
- cursor.execute("""
- INSERT INTO t_sys_user_profile (id, user_id, real_name, company, department, created_by, created_time, updated_time)
- VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())
- """, (profile_id, user_id, user_data.get('real_name'), user_data.get('company'), user_data.get('department'), creator_id))
-
- # 分配角色
- if 'role_ids' in user_data and user_data['role_ids']:
- for role_id in user_data['role_ids']:
- role_assignment_id = str(uuid.uuid4())
- cursor.execute("""
- INSERT INTO t_sys_user_role (id, user_id, role_id, assigned_by, created_time)
- VALUES (%s, %s, %s, %s, NOW())
- """, (role_assignment_id, user_id, role_id, creator_id))
-
- conn.commit()
- return True, "用户创建成功"
- except Exception as e:
- logger.exception("创建用户错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def update_user(self, user_id: str, user_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
- """更新用户"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 更新用户基本信息
- update_fields = []
- update_values = []
-
- # 处理密码字段 - 需要哈希
- if 'password' in user_data and user_data['password']:
- update_fields.append('password_hash = %s')
- update_values.append(user_data['password'])
-
- for field in ['email', 'phone', 'is_active', 'is_superuser']:
- if field in user_data:
- update_fields.append(f'{field} = %s')
- update_values.append(user_data[field])
-
- if update_fields:
- update_values.append(updater_id)
- update_values.append(user_id)
- cursor.execute(f"""
- UPDATE t_sys_user
- SET {', '.join(update_fields)}, updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """, update_values)
-
- # 更新用户详情
- profile_fields = ['real_name', 'company', 'department']
- profile_updates = {k: v for k, v in user_data.items() if k in profile_fields}
-
- if profile_updates:
- # 检查是否已有记录
- cursor.execute("SELECT id FROM t_sys_user_profile WHERE user_id = %s", (user_id,))
- profile_exists = cursor.fetchone()
-
- if profile_exists:
- update_fields = []
- update_values = []
- for field, value in profile_updates.items():
- update_fields.append(f'{field} = %s')
- update_values.append(value)
-
- update_values.append(updater_id)
- update_values.append(user_id)
- cursor.execute(f"""
- UPDATE t_sys_user_profile
- SET {', '.join(update_fields)}, updated_by = %s, updated_time = NOW()
- WHERE user_id = %s
- """, update_values)
- else:
- profile_id = str(uuid.uuid4())
- fields = ['id', 'user_id'] + list(profile_updates.keys())
- values = [profile_id, user_id] + list(profile_updates.values())
- placeholders = ', '.join(['%s'] * len(values))
-
- cursor.execute(f"""
- INSERT INTO t_sys_user_profile ({', '.join(fields)}, created_by, created_time, updated_time)
- VALUES ({placeholders}, %s, NOW(), NOW())
- """, values + [updater_id])
-
- # 更新用户角色
- if 'role_ids' in user_data:
- # 删除现有角色
- cursor.execute("DELETE FROM t_sys_user_role WHERE user_id = %s", (user_id,))
-
- # 添加新角色
- for role_id in user_data['role_ids']:
- assignment_id = str(uuid.uuid4())
- cursor.execute("""
- INSERT INTO t_sys_user_role (id, user_id, role_id, assigned_by, created_time)
- VALUES (%s, %s, %s, %s, NOW())
- """, (assignment_id, user_id, role_id, updater_id))
-
- conn.commit()
- return True, "用户更新成功"
- except Exception as e:
- logger.exception("更新用户错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def delete_user(self, user_id: str, current_user_id: str) -> Tuple[bool, str]:
- """删除用户"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 不能删除自己
- if user_id == current_user_id:
- return False, "不能删除自己"
-
- # 检查是否为超级管理员
- cursor.execute("""
- SELECT COUNT(*) as count FROM t_sys_user_role ur
- JOIN t_sys_role r ON ur.role_id = r.id
- WHERE ur.user_id = %s AND r.name = 'super_admin' AND ur.is_active = 1
- """, (user_id,))
-
- if cursor.fetchone()['count'] > 0:
- return False, "不能删除超级管理员"
-
- # 删除相关数据
- cursor.execute("DELETE FROM t_sys_user_role WHERE user_id = %s", (user_id,))
- cursor.execute("DELETE FROM t_sys_user_profile WHERE user_id = %s", (user_id,))
- cursor.execute("DELETE FROM t_sys_user WHERE id = %s", (user_id,))
-
- conn.commit()
- return True, "用户删除成功"
- except Exception as e:
- logger.exception("删除用户错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- # ==================== 应用管理 ====================
-
- async def get_apps(self, page: int, page_size: int, user_id: str, is_app_manager: bool,
- keyword: str = "", status: str = "") -> Tuple[List[Dict[str, Any]], int]:
- """获取应用列表"""
- conn = get_db_connection()
- if not conn:
- return [], 0
-
- cursor = conn.cursor()
-
- try:
- # 构建查询条件
- where_conditions = []
- params = []
-
- # 如果不是应用管理员,只显示自己创建的应用
- if not is_app_manager:
- where_conditions.append("created_by = %s")
- params.append(user_id)
-
- if keyword:
- where_conditions.append("(name LIKE %s OR description LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
-
- if status == "active":
- where_conditions.append("is_active = 1")
- elif status == "inactive":
- where_conditions.append("is_active = 0")
-
- where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
-
- # 查询总数
- cursor.execute(f"SELECT COUNT(*) as count FROM t_sys_app WHERE {where_clause}", params)
- total = cursor.fetchone()['count']
-
- # 查询应用列表
- offset = (page - 1) * page_size
- cursor.execute(f"""
- SELECT id, name, app_key, description, icon_url, redirect_uris, scope,
- is_active, is_trusted, access_token_expires, refresh_token_expires,
- created_time, updated_time
- FROM t_sys_app
- WHERE {where_clause}
- ORDER BY created_time DESC
- LIMIT %s OFFSET %s
- """, params + [page_size, offset])
-
- apps = []
- for row in cursor.fetchall():
- app = {
- "id": row['id'],
- "name": row['name'],
- "app_key": row['app_key'],
- "description": row['description'],
- "icon_url": row['icon_url'],
- "redirect_uris": json.loads(row['redirect_uris']) if row['redirect_uris'] else [],
- "scope": json.loads(row['scope']) if row['scope'] else [],
- "is_active": bool(row['is_active']),
- "is_trusted": bool(row['is_trusted']),
- "access_token_expires": row['access_token_expires'],
- "refresh_token_expires": row['refresh_token_expires'],
- "created_time": row['created_time'].isoformat() if row['created_time'] else None,
- "updated_time": row['updated_time'].isoformat() if row['updated_time'] else None,
- # 模拟统计数据
- "today_requests": secrets.randbelow(1000),
- "active_users": secrets.randbelow(100)
- }
- apps.append(app)
-
- return apps, total
- finally:
- cursor.close()
- conn.close()
-
- async def get_app_detail(self, app_id: str, user_id: str) -> Optional[Dict[str, Any]]:
- """获取应用详情(包含密钥)"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- # 查询应用详情(包含密钥)
- cursor.execute("""
- SELECT id, name, app_key, app_secret, description, icon_url,
- redirect_uris, scope, is_active, is_trusted,
- access_token_expires, refresh_token_expires,
- created_time, updated_time
- FROM t_sys_app
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- return None
-
- app_detail = {
- "id": app_data["id"],
- "name": app_data["name"],
- "app_key": app_data["app_key"],
- "app_secret": app_data["app_secret"],
- "description": app_data["description"],
- "icon_url": app_data["icon_url"],
- "redirect_uris": json.loads(app_data["redirect_uris"]) if app_data["redirect_uris"] else [],
- "scope": json.loads(app_data["scope"]) if app_data["scope"] else [],
- "is_active": bool(app_data["is_active"]),
- "is_trusted": bool(app_data["is_trusted"]),
- "access_token_expires": app_data["access_token_expires"],
- "refresh_token_expires": app_data["refresh_token_expires"],
- "created_time": app_data["created_time"].isoformat() if app_data["created_time"] else None,
- "updated_time": app_data["updated_time"].isoformat() if app_data["updated_time"] else None
- }
-
- return app_detail
- finally:
- cursor.close()
- conn.close()
-
- async def create_app(self, app_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
- """创建应用"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
-
- try:
- # 生成应用ID和密钥
- app_id = str(uuid.uuid4())
- app_key = secrets.token_urlsafe(32)[:32]
- app_secret = secrets.token_urlsafe(64)[:64]
-
- # 插入应用记录
- cursor.execute("""
- INSERT INTO t_sys_app (
- id, name, app_key, app_secret, description, icon_url,
- redirect_uris, scope, is_active, is_trusted,
- access_token_expires, refresh_token_expires, created_by,
- created_time, updated_time
- ) VALUES (
- %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()
- )
- """, (
- app_id,
- app_data['name'],
- app_key,
- app_secret,
- app_data.get('description', ''),
- app_data.get('icon_url', ''),
- json.dumps(app_data['redirect_uris']),
- json.dumps(app_data.get('scope', ['profile'])),
- True,
- app_data.get('is_trusted', False),
- app_data.get('access_token_expires', 7200),
- app_data.get('refresh_token_expires', 2592000),
- user_id
- ))
-
- conn.commit()
-
- # 返回创建的应用信息
- app_info = {
- "id": app_id,
- "name": app_data['name'],
- "app_key": app_key,
- "app_secret": app_secret,
- "description": app_data.get('description', ''),
- "redirect_uris": app_data['redirect_uris'],
- "scope": app_data.get('scope', ['profile']),
- "is_active": True,
- "is_trusted": app_data.get('is_trusted', False)
- }
-
- return True, "应用创建成功", app_info
- except Exception as e:
- logger.exception("创建应用错误")
- conn.rollback()
- return False, "服务器内部错误", None
- finally:
- cursor.close()
- conn.close()
-
- async def update_app(self, app_id: str, app_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[Dict[str, Any]]]:
- """更新应用信息"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
-
- try:
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM t_sys_app
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- existing_app = cursor.fetchone()
-
- if not existing_app:
- return False, "应用不存在或无权限", None
-
- # 检查应用名称是否已被其他应用使用
- name = app_data.get('name', '').strip()
- if name:
- cursor.execute("""
- SELECT id FROM t_sys_app
- WHERE name = %s AND created_by = %s AND id != %s
- """, (name, user_id, app_id))
-
- if cursor.fetchone():
- return False, "应用名称已存在", None
-
- # 准备更新数据
- description = (app_data.get('description') or '').strip()
- icon_url = (app_data.get('icon_url') or '').strip()
- redirect_uris = app_data.get('redirect_uris', [])
- scope = app_data.get('scope', ['profile', 'email'])
- is_trusted = app_data.get('is_trusted', False)
- access_token_expires = app_data.get('access_token_expires', 7200)
- refresh_token_expires = app_data.get('refresh_token_expires', 2592000)
-
- # 更新应用信息
- cursor.execute("""
- UPDATE t_sys_app
- SET name = %s, description = %s, icon_url = %s,
- redirect_uris = %s, scope = %s, is_trusted = %s,
- access_token_expires = %s, refresh_token_expires = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """, (
- name, description, icon_url,
- json.dumps(redirect_uris), json.dumps(scope), is_trusted,
- access_token_expires, refresh_token_expires, user_id, app_id
- ))
-
- conn.commit()
-
- # 获取更新后的应用信息
- cursor.execute("""
- SELECT id, name, app_key, description, icon_url,
- redirect_uris, scope, is_active, is_trusted,
- access_token_expires, refresh_token_expires,
- created_time, updated_time
- FROM t_sys_app
- WHERE id = %s
- """, (app_id,))
-
- app_info = cursor.fetchone()
-
- if app_info:
- app_result = {
- "id": app_info["id"],
- "name": app_info["name"],
- "app_key": app_info["app_key"],
- "description": app_info["description"],
- "icon_url": app_info["icon_url"],
- "redirect_uris": json.loads(app_info["redirect_uris"]) if app_info["redirect_uris"] else [],
- "scope": json.loads(app_info["scope"]) if app_info["scope"] else [],
- "is_active": bool(app_info["is_active"]),
- "is_trusted": bool(app_info["is_trusted"]),
- "access_token_expires": app_info["access_token_expires"],
- "refresh_token_expires": app_info["refresh_token_expires"],
- "created_time": app_info["created_time"].isoformat() if app_info["created_time"] else None,
- "updated_time": app_info["updated_time"].isoformat() if app_info["updated_time"] else None
- }
- return True, "应用更新成功", app_result
-
- return True, "应用更新成功", None
- except Exception as e:
- logger.exception("更新应用错误")
- conn.rollback()
- return False, "服务器内部错误", None
- finally:
- cursor.close()
- conn.close()
-
- async def toggle_app_status(self, app_id: str, is_active: bool, user_id: str) -> Tuple[bool, str]:
- """切换应用状态"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM t_sys_app
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- return False, "应用不存在或无权限"
-
- # 更新应用状态
- cursor.execute("""
- UPDATE t_sys_app
- SET is_active = %s, updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """, (is_active, user_id, app_id))
-
- conn.commit()
-
- action = "启用" if is_active else "禁用"
- logger.info(f"应用状态已更新: {app_data['name']} -> {action}")
-
- return True, f"应用已{action}"
- except Exception as e:
- logger.exception("切换应用状态错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def check_user_app_manager_role(self, user_id: str) -> bool:
- """检查用户是否是应用管理员"""
- conn = get_db_connection()
- if not conn:
- return False
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("""
- SELECT COUNT(*) as count FROM t_sys_user_role ur
- JOIN t_sys_role r ON ur.role_id = r.id
- WHERE ur.user_id = %s AND r.name IN ('super_admin', 'admin', 'app_manager') AND ur.is_active = 1
- """, (user_id,))
-
- return cursor.fetchone()['count'] > 0
- finally:
- cursor.close()
- conn.close()
-
- async def reset_app_secret(self, app_id: str, user_id: str) -> Tuple[bool, str, Optional[str]]:
- """重置应用密钥"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
-
- try:
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM t_sys_app
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- return False, "应用不存在或无权限", None
-
- # 生成新的应用密钥
- new_secret = secrets.token_urlsafe(64)[:64]
-
- # 更新应用密钥
- cursor.execute("""
- UPDATE t_sys_app
- SET app_secret = %s, updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """, (new_secret, user_id, app_id))
-
- conn.commit()
-
- logger.info(f"应用密钥已重置: {app_data['name']}")
-
- return True, "应用密钥已重置", new_secret
- except Exception as e:
- logger.exception("重置应用密钥错误")
- conn.rollback()
- return False, "服务器内部错误", None
- finally:
- cursor.close()
- conn.close()
-
- async def delete_app_by_id(self, app_id: str, user_id: str) -> Tuple[bool, str]:
- """删除应用"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查应用是否存在且属于当前用户
- cursor.execute("""
- SELECT id, name FROM t_sys_app
- WHERE id = %s AND created_by = %s
- """, (app_id, user_id))
-
- app_data = cursor.fetchone()
-
- if not app_data:
- return False, "应用不存在或无权限"
-
- # 删除应用(级联删除相关数据)
- cursor.execute("DELETE FROM t_sys_app WHERE id = %s", (app_id,))
-
- conn.commit()
-
- logger.info(f"应用已删除: {app_data['name']}")
-
- return True, "应用已删除"
- except Exception as e:
- logger.exception("删除应用错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
|