""" 字典项服务层 """ import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..')) sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..')) import logging from typing import Optional, List, Dict, Any, Tuple from datetime import datetime from app.base.async_mysql_connection import get_db_connection logger = logging.getLogger(__name__) class DictItemService: """字典项服务类""" def __init__(self): """初始化服务""" pass async def get_item_list( self, page: int, page_size: int, category_id: Optional[str] = None, keyword: Optional[str] = None, enable_flag: Optional[str] = None ) -> Tuple[List[Dict[str, Any]], int]: """获取字典项列表""" conn = get_db_connection() if not conn: return [], 0 cursor = conn.cursor() try: where_conditions = ["di.del_flag = '0'"] params = [] if category_id: where_conditions.append("di.category_id = %s") params.append(category_id) if keyword: where_conditions.append("(di.dict_name LIKE %s OR di.dict_value LIKE %s OR di.dict_desc LIKE %s)") params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"]) if enable_flag: where_conditions.append("di.enable_flag = %s") params.append(enable_flag) where_clause = " AND ".join(where_conditions) # 查询总数 cursor.execute(f"SELECT COUNT(*) as count FROM t_sys_dict_category_item di WHERE {where_clause}", params) total = cursor.fetchone()['count'] # 查询列表 offset = (page - 1) * page_size cursor.execute(f""" SELECT di.dict_id, di.dict_name, di.dict_value, di.dict_desc, di.category_id, di.enable_flag, di.del_flag, di.sort, di.created_by, di.created_time, di.updated_by, di.updated_time, dc.category_name, creator.username as created_by_name, updater.username as updated_by_name FROM t_sys_dict_category_item di LEFT JOIN t_sys_dict_category dc ON di.category_id = dc.category_id LEFT JOIN t_sys_user creator ON di.created_by = creator.id LEFT JOIN t_sys_user updater ON di.updated_by = updater.id WHERE {where_clause} ORDER BY di.sort ASC, di.dict_id ASC LIMIT %s OFFSET %s """, params + [page_size, offset]) items = [] for row in cursor.fetchall(): item = { "dict_id": row['dict_id'], "dict_name": row['dict_name'], "dict_value": row['dict_value'], "dict_desc": row['dict_desc'], "category_id": row['category_id'], "category_name": row['category_name'], "enable_flag": row['enable_flag'], "del_flag": row['del_flag'], "sort": row['sort'], "created_by": row['created_by'], "created_time": row['created_time'].isoformat() if row['created_time'] else None, "updated_by": row['updated_by'], "updated_time": row['updated_time'].isoformat() if row['updated_time'] else None, "created_by_name": row['created_by_name'], "updated_by_name": row['updated_by_name'] } items.append(item) return items, total except Exception as e: logger.exception("获取字典项列表错误") return [], 0 finally: cursor.close() conn.close() async def get_item_by_id(self, dict_id: int) -> Optional[Dict[str, Any]]: """根据ID获取字典项""" conn = get_db_connection() if not conn: return None cursor = conn.cursor() try: cursor.execute(""" SELECT di.dict_id, di.dict_name, di.dict_value, di.dict_desc, di.category_id, di.enable_flag, di.del_flag, di.sort, di.created_by, di.created_time, di.updated_by, di.updated_time, dc.category_name, creator.username as created_by_name, updater.username as updated_by_name FROM t_sys_dict_category_item di LEFT JOIN t_sys_dict_category dc ON di.category_id = dc.category_id LEFT JOIN t_sys_user creator ON di.created_by = creator.id LEFT JOIN t_sys_user updater ON di.updated_by = updater.id WHERE di.dict_id = %s AND di.del_flag = '0' """, (dict_id,)) row = cursor.fetchone() if not row: return None item = { "dict_id": row['dict_id'], "dict_name": row['dict_name'], "dict_value": row['dict_value'], "dict_desc": row['dict_desc'], "category_id": row['category_id'], "category_name": row['category_name'], "enable_flag": row['enable_flag'], "del_flag": row['del_flag'], "sort": row['sort'], "created_by": row['created_by'], "created_time": row['created_time'].isoformat() if row['created_time'] else None, "updated_by": row['updated_by'], "updated_time": row['updated_time'].isoformat() if row['updated_time'] else None, "created_by_name": row['created_by_name'], "updated_by_name": row['updated_by_name'] } return item except Exception as e: logger.exception("获取字典项详情错误") return None finally: cursor.close() conn.close() async def create_item(self, item_data: Dict[str, Any], creator_id: str) -> Tuple[bool, str, Optional[int]]: """创建字典项""" conn = get_db_connection() if not conn: return False, "数据库连接失败", None cursor = conn.cursor() try: # 检查字典类型是否存在 cursor.execute("SELECT category_id FROM t_sys_dict_category WHERE category_id = %s AND del_flag = '0'", (item_data['category_id'],)) if not cursor.fetchone(): return False, "字典类型不存在", None # 创建字典项 cursor.execute(""" INSERT INTO t_sys_dict_category_item (dict_name, dict_value, dict_desc, category_id, enable_flag, del_flag, sort, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, '0', %s, %s, NOW(), NOW()) """, ( item_data['dict_name'], item_data['dict_value'], item_data.get('dict_desc'), item_data['category_id'], item_data.get('enable_flag', '1'), item_data.get('sort'), creator_id )) dict_id = cursor.lastrowid conn.commit() return True, "字典项创建成功", dict_id except Exception as e: logger.exception("创建字典项错误") conn.rollback() return False, "服务器内部错误", None finally: cursor.close() conn.close() async def update_item(self, dict_id: int, item_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]: """更新字典项""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查字典项是否存在 cursor.execute("SELECT dict_id FROM t_sys_dict_category_item WHERE dict_id = %s AND del_flag = '0'", (dict_id,)) if not cursor.fetchone(): return False, "字典项不存在" # 如果更新了category_id,检查字典类型是否存在 if 'category_id' in item_data and item_data['category_id']: cursor.execute("SELECT category_id FROM t_sys_dict_category WHERE category_id = %s AND del_flag = '0'", (item_data['category_id'],)) if not cursor.fetchone(): return False, "字典类型不存在" # 更新字典项 update_fields = [] update_values = [] for field in ['dict_name', 'dict_value', 'dict_desc', 'category_id', 'enable_flag', 'sort']: if field in item_data and item_data[field] is not None: update_fields.append(f'{field} = %s') update_values.append(item_data[field]) if update_fields: update_values.extend([updater_id, dict_id]) cursor.execute(f""" UPDATE t_sys_dict_category_item SET {', '.join(update_fields)}, updated_by = %s, updated_time = NOW() WHERE dict_id = %s """, update_values) conn.commit() return True, "字典项更新成功" except Exception as e: logger.exception("更新字典项错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def delete_item(self, dict_id: int) -> Tuple[bool, str]: """删除字典项(逻辑删除)""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查字典项是否存在 cursor.execute("SELECT dict_id FROM t_sys_dict_category_item WHERE dict_id = %s AND del_flag = '0'", (dict_id,)) if not cursor.fetchone(): return False, "字典项不存在" # 逻辑删除 cursor.execute(""" UPDATE t_sys_dict_category_item SET del_flag = '1', updated_time = NOW() WHERE dict_id = %s """, (dict_id,)) conn.commit() return True, "字典项删除成功" except Exception as e: logger.exception("删除字典项错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def batch_delete_items(self, dict_ids: List[int]) -> Tuple[bool, str]: """批量删除字典项(逻辑删除)""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: if not dict_ids: return False, "字典项ID列表不能为空" # 批量逻辑删除 placeholders = ','.join(['%s'] * len(dict_ids)) cursor.execute(f""" UPDATE t_sys_dict_category_item SET del_flag = '1', updated_time = NOW() WHERE dict_id IN ({placeholders}) AND del_flag = '0' """, dict_ids) affected_rows = cursor.rowcount conn.commit() if affected_rows == 0: return False, "没有找到可删除的字典项" return True, f"成功删除{affected_rows}个字典项" except Exception as e: logger.exception("批量删除字典项错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close() async def toggle_item_status(self, dict_id: int, enable_flag: str, updater_id: str) -> Tuple[bool, str]: """切换字典项启用状态""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: # 检查字典项是否存在 cursor.execute("SELECT dict_id FROM t_sys_dict_category_item WHERE dict_id = %s AND del_flag = '0'", (dict_id,)) if not cursor.fetchone(): return False, "字典项不存在" # 更新状态 cursor.execute(""" UPDATE t_sys_dict_category_item SET enable_flag = %s, updated_by = %s, updated_time = NOW() WHERE dict_id = %s """, (enable_flag, updater_id, dict_id)) conn.commit() status_text = "启用" if enable_flag == "1" else "禁用" return True, f"字典项{status_text}成功" except Exception as e: logger.exception("切换字典项状态错误") conn.rollback() return False, "服务器内部错误" finally: cursor.close() conn.close()