| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- """
- 字典项服务层
- """
- import sys
- import os
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
- import logging
- from typing import Optional, List, Dict, Any, Tuple
- from datetime import datetime
- from app.base.async_mysql_connection import get_db_connection
- logger = logging.getLogger(__name__)
- class DictItemService:
- """字典项服务类"""
-
- def __init__(self):
- """初始化服务"""
- pass
-
- async def get_item_list(
- self,
- page: int,
- page_size: int,
- category_id: Optional[str] = None,
- keyword: Optional[str] = None,
- enable_flag: Optional[str] = None
- ) -> Tuple[List[Dict[str, Any]], int]:
- """获取字典项列表"""
- conn = get_db_connection()
- if not conn:
- return [], 0
-
- cursor = conn.cursor()
-
- try:
- where_conditions = ["di.del_flag = '0'"]
- params = []
-
- if category_id:
- where_conditions.append("di.category_id = %s")
- params.append(category_id)
-
- if keyword:
- where_conditions.append("(di.dict_name LIKE %s OR di.dict_value LIKE %s OR di.dict_desc LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"])
-
- if enable_flag:
- where_conditions.append("di.enable_flag = %s")
- params.append(enable_flag)
-
- where_clause = " AND ".join(where_conditions)
-
- # 查询总数
- cursor.execute(f"SELECT COUNT(*) as count FROM t_sys_dict_category_item di WHERE {where_clause}", params)
- total = cursor.fetchone()['count']
-
- # 查询列表
- offset = (page - 1) * page_size
- cursor.execute(f"""
- SELECT di.dict_id, di.dict_name, di.dict_value, di.dict_desc,
- di.category_id, di.enable_flag, di.del_flag, di.sort,
- di.created_by, di.created_time, di.updated_by, di.updated_time,
- dc.category_name,
- creator.username as created_by_name,
- updater.username as updated_by_name
- FROM t_sys_dict_category_item di
- LEFT JOIN t_sys_dict_category dc ON di.category_id = dc.category_id
- LEFT JOIN t_sys_user creator ON di.created_by = creator.id
- LEFT JOIN t_sys_user updater ON di.updated_by = updater.id
- WHERE {where_clause}
- ORDER BY di.sort ASC, di.dict_id ASC
- LIMIT %s OFFSET %s
- """, params + [page_size, offset])
-
- items = []
- for row in cursor.fetchall():
- item = {
- "dict_id": row['dict_id'],
- "dict_name": row['dict_name'],
- "dict_value": row['dict_value'],
- "dict_desc": row['dict_desc'],
- "category_id": row['category_id'],
- "category_name": row['category_name'],
- "enable_flag": row['enable_flag'],
- "del_flag": row['del_flag'],
- "sort": row['sort'],
- "created_by": row['created_by'],
- "created_time": row['created_time'].isoformat() if row['created_time'] else None,
- "updated_by": row['updated_by'],
- "updated_time": row['updated_time'].isoformat() if row['updated_time'] else None,
- "created_by_name": row['created_by_name'],
- "updated_by_name": row['updated_by_name']
- }
- items.append(item)
-
- return items, total
- except Exception as e:
- logger.exception("获取字典项列表错误")
- return [], 0
- finally:
- cursor.close()
- conn.close()
-
- async def get_item_by_id(self, dict_id: int) -> Optional[Dict[str, Any]]:
- """根据ID获取字典项"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("""
- SELECT di.dict_id, di.dict_name, di.dict_value, di.dict_desc,
- di.category_id, di.enable_flag, di.del_flag, di.sort,
- di.created_by, di.created_time, di.updated_by, di.updated_time,
- dc.category_name,
- creator.username as created_by_name,
- updater.username as updated_by_name
- FROM t_sys_dict_category_item di
- LEFT JOIN t_sys_dict_category dc ON di.category_id = dc.category_id
- LEFT JOIN t_sys_user creator ON di.created_by = creator.id
- LEFT JOIN t_sys_user updater ON di.updated_by = updater.id
- WHERE di.dict_id = %s AND di.del_flag = '0'
- """, (dict_id,))
-
- row = cursor.fetchone()
- if not row:
- return None
-
- item = {
- "dict_id": row['dict_id'],
- "dict_name": row['dict_name'],
- "dict_value": row['dict_value'],
- "dict_desc": row['dict_desc'],
- "category_id": row['category_id'],
- "category_name": row['category_name'],
- "enable_flag": row['enable_flag'],
- "del_flag": row['del_flag'],
- "sort": row['sort'],
- "created_by": row['created_by'],
- "created_time": row['created_time'].isoformat() if row['created_time'] else None,
- "updated_by": row['updated_by'],
- "updated_time": row['updated_time'].isoformat() if row['updated_time'] else None,
- "created_by_name": row['created_by_name'],
- "updated_by_name": row['updated_by_name']
- }
-
- return item
- except Exception as e:
- logger.exception("获取字典项详情错误")
- return None
- finally:
- cursor.close()
- conn.close()
-
- async def create_item(self, item_data: Dict[str, Any], creator_id: str) -> Tuple[bool, str, Optional[int]]:
- """创建字典项"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
-
- try:
- # 检查字典类型是否存在
- cursor.execute("SELECT category_id FROM t_sys_dict_category WHERE category_id = %s AND del_flag = '0'",
- (item_data['category_id'],))
- if not cursor.fetchone():
- return False, "字典类型不存在", None
-
- # 创建字典项
- cursor.execute("""
- INSERT INTO t_sys_dict_category_item
- (dict_name, dict_value, dict_desc, category_id, enable_flag, del_flag, sort,
- created_by, created_time, updated_time)
- VALUES (%s, %s, %s, %s, %s, '0', %s, %s, NOW(), NOW())
- """, (
- item_data['dict_name'],
- item_data['dict_value'],
- item_data.get('dict_desc'),
- item_data['category_id'],
- item_data.get('enable_flag', '1'),
- item_data.get('sort'),
- creator_id
- ))
-
- dict_id = cursor.lastrowid
- conn.commit()
- return True, "字典项创建成功", dict_id
- except Exception as e:
- logger.exception("创建字典项错误")
- conn.rollback()
- return False, "服务器内部错误", None
- finally:
- cursor.close()
- conn.close()
-
- async def update_item(self, dict_id: int, item_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
- """更新字典项"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查字典项是否存在
- cursor.execute("SELECT dict_id FROM t_sys_dict_category_item WHERE dict_id = %s AND del_flag = '0'", (dict_id,))
- if not cursor.fetchone():
- return False, "字典项不存在"
-
- # 如果更新了category_id,检查字典类型是否存在
- if 'category_id' in item_data and item_data['category_id']:
- cursor.execute("SELECT category_id FROM t_sys_dict_category WHERE category_id = %s AND del_flag = '0'",
- (item_data['category_id'],))
- if not cursor.fetchone():
- return False, "字典类型不存在"
-
- # 更新字典项
- update_fields = []
- update_values = []
-
- for field in ['dict_name', 'dict_value', 'dict_desc', 'category_id', 'enable_flag', 'sort']:
- if field in item_data and item_data[field] is not None:
- update_fields.append(f'{field} = %s')
- update_values.append(item_data[field])
-
- if update_fields:
- update_values.extend([updater_id, dict_id])
- cursor.execute(f"""
- UPDATE t_sys_dict_category_item
- SET {', '.join(update_fields)}, updated_by = %s, updated_time = NOW()
- WHERE dict_id = %s
- """, update_values)
-
- conn.commit()
- return True, "字典项更新成功"
- except Exception as e:
- logger.exception("更新字典项错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def delete_item(self, dict_id: int) -> Tuple[bool, str]:
- """删除字典项(逻辑删除)"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查字典项是否存在
- cursor.execute("SELECT dict_id FROM t_sys_dict_category_item WHERE dict_id = %s AND del_flag = '0'", (dict_id,))
- if not cursor.fetchone():
- return False, "字典项不存在"
-
- # 逻辑删除
- cursor.execute("""
- UPDATE t_sys_dict_category_item
- SET del_flag = '1', updated_time = NOW()
- WHERE dict_id = %s
- """, (dict_id,))
-
- conn.commit()
- return True, "字典项删除成功"
- except Exception as e:
- logger.exception("删除字典项错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def batch_delete_items(self, dict_ids: List[int]) -> Tuple[bool, str]:
- """批量删除字典项(逻辑删除)"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- if not dict_ids:
- return False, "字典项ID列表不能为空"
-
- # 批量逻辑删除
- placeholders = ','.join(['%s'] * len(dict_ids))
- cursor.execute(f"""
- UPDATE t_sys_dict_category_item
- SET del_flag = '1', updated_time = NOW()
- WHERE dict_id IN ({placeholders}) AND del_flag = '0'
- """, dict_ids)
-
- affected_rows = cursor.rowcount
- conn.commit()
-
- if affected_rows == 0:
- return False, "没有找到可删除的字典项"
-
- return True, f"成功删除{affected_rows}个字典项"
- except Exception as e:
- logger.exception("批量删除字典项错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
-
- async def toggle_item_status(self, dict_id: int, enable_flag: str, updater_id: str) -> Tuple[bool, str]:
- """切换字典项启用状态"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 检查字典项是否存在
- cursor.execute("SELECT dict_id FROM t_sys_dict_category_item WHERE dict_id = %s AND del_flag = '0'", (dict_id,))
- if not cursor.fetchone():
- return False, "字典项不存在"
-
- # 更新状态
- cursor.execute("""
- UPDATE t_sys_dict_category_item
- SET enable_flag = %s, updated_by = %s, updated_time = NOW()
- WHERE dict_id = %s
- """, (enable_flag, updater_id, dict_id))
-
- conn.commit()
- status_text = "启用" if enable_flag == "1" else "禁用"
- return True, f"字典项{status_text}成功"
- except Exception as e:
- logger.exception("切换字典项状态错误")
- conn.rollback()
- return False, "服务器内部错误"
- finally:
- cursor.close()
- conn.close()
|