dict_item_service.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. """
  2. 字典项服务层
  3. """
  4. import sys
  5. import os
  6. sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
  7. sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))
  8. import logging
  9. from typing import Optional, List, Dict, Any, Tuple
  10. from datetime import datetime
  11. from app.base.async_mysql_connection import get_db_connection
  12. logger = logging.getLogger(__name__)
  13. class DictItemService:
  14. """字典项服务类"""
  15. def __init__(self):
  16. """初始化服务"""
  17. pass
  18. async def get_item_list(
  19. self,
  20. page: int,
  21. page_size: int,
  22. category_id: Optional[str] = None,
  23. keyword: Optional[str] = None,
  24. enable_flag: Optional[str] = None
  25. ) -> Tuple[List[Dict[str, Any]], int]:
  26. """获取字典项列表"""
  27. conn = get_db_connection()
  28. if not conn:
  29. return [], 0
  30. cursor = conn.cursor()
  31. try:
  32. where_conditions = ["di.del_flag = '0'"]
  33. params = []
  34. if category_id:
  35. where_conditions.append("di.category_id = %s")
  36. params.append(category_id)
  37. if keyword:
  38. where_conditions.append("(di.dict_name LIKE %s OR di.dict_value LIKE %s OR di.dict_desc LIKE %s)")
  39. params.extend([f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"])
  40. if enable_flag:
  41. where_conditions.append("di.enable_flag = %s")
  42. params.append(enable_flag)
  43. where_clause = " AND ".join(where_conditions)
  44. # 查询总数
  45. cursor.execute(f"SELECT COUNT(*) as count FROM t_sys_dict_category_item di WHERE {where_clause}", params)
  46. total = cursor.fetchone()['count']
  47. # 查询列表
  48. offset = (page - 1) * page_size
  49. cursor.execute(f"""
  50. SELECT di.dict_id, di.dict_name, di.dict_value, di.dict_desc,
  51. di.category_id, di.enable_flag, di.del_flag, di.sort,
  52. di.created_by, di.created_time, di.updated_by, di.updated_time,
  53. dc.category_name,
  54. creator.username as created_by_name,
  55. updater.username as updated_by_name
  56. FROM t_sys_dict_category_item di
  57. LEFT JOIN t_sys_dict_category dc ON di.category_id = dc.category_id
  58. LEFT JOIN t_sys_user creator ON di.created_by = creator.id
  59. LEFT JOIN t_sys_user updater ON di.updated_by = updater.id
  60. WHERE {where_clause}
  61. ORDER BY di.sort ASC, di.dict_id ASC
  62. LIMIT %s OFFSET %s
  63. """, params + [page_size, offset])
  64. items = []
  65. for row in cursor.fetchall():
  66. item = {
  67. "dict_id": row['dict_id'],
  68. "dict_name": row['dict_name'],
  69. "dict_value": row['dict_value'],
  70. "dict_desc": row['dict_desc'],
  71. "category_id": row['category_id'],
  72. "category_name": row['category_name'],
  73. "enable_flag": row['enable_flag'],
  74. "del_flag": row['del_flag'],
  75. "sort": row['sort'],
  76. "created_by": row['created_by'],
  77. "created_time": row['created_time'].isoformat() if row['created_time'] else None,
  78. "updated_by": row['updated_by'],
  79. "updated_time": row['updated_time'].isoformat() if row['updated_time'] else None,
  80. "created_by_name": row['created_by_name'],
  81. "updated_by_name": row['updated_by_name']
  82. }
  83. items.append(item)
  84. return items, total
  85. except Exception as e:
  86. logger.exception("获取字典项列表错误")
  87. return [], 0
  88. finally:
  89. cursor.close()
  90. conn.close()
  91. async def get_item_by_id(self, dict_id: int) -> Optional[Dict[str, Any]]:
  92. """根据ID获取字典项"""
  93. conn = get_db_connection()
  94. if not conn:
  95. return None
  96. cursor = conn.cursor()
  97. try:
  98. cursor.execute("""
  99. SELECT di.dict_id, di.dict_name, di.dict_value, di.dict_desc,
  100. di.category_id, di.enable_flag, di.del_flag, di.sort,
  101. di.created_by, di.created_time, di.updated_by, di.updated_time,
  102. dc.category_name,
  103. creator.username as created_by_name,
  104. updater.username as updated_by_name
  105. FROM t_sys_dict_category_item di
  106. LEFT JOIN t_sys_dict_category dc ON di.category_id = dc.category_id
  107. LEFT JOIN t_sys_user creator ON di.created_by = creator.id
  108. LEFT JOIN t_sys_user updater ON di.updated_by = updater.id
  109. WHERE di.dict_id = %s AND di.del_flag = '0'
  110. """, (dict_id,))
  111. row = cursor.fetchone()
  112. if not row:
  113. return None
  114. item = {
  115. "dict_id": row['dict_id'],
  116. "dict_name": row['dict_name'],
  117. "dict_value": row['dict_value'],
  118. "dict_desc": row['dict_desc'],
  119. "category_id": row['category_id'],
  120. "category_name": row['category_name'],
  121. "enable_flag": row['enable_flag'],
  122. "del_flag": row['del_flag'],
  123. "sort": row['sort'],
  124. "created_by": row['created_by'],
  125. "created_time": row['created_time'].isoformat() if row['created_time'] else None,
  126. "updated_by": row['updated_by'],
  127. "updated_time": row['updated_time'].isoformat() if row['updated_time'] else None,
  128. "created_by_name": row['created_by_name'],
  129. "updated_by_name": row['updated_by_name']
  130. }
  131. return item
  132. except Exception as e:
  133. logger.exception("获取字典项详情错误")
  134. return None
  135. finally:
  136. cursor.close()
  137. conn.close()
  138. async def create_item(self, item_data: Dict[str, Any], creator_id: str) -> Tuple[bool, str, Optional[int]]:
  139. """创建字典项"""
  140. conn = get_db_connection()
  141. if not conn:
  142. return False, "数据库连接失败", None
  143. cursor = conn.cursor()
  144. try:
  145. # 检查字典类型是否存在
  146. cursor.execute("SELECT category_id FROM t_sys_dict_category WHERE category_id = %s AND del_flag = '0'",
  147. (item_data['category_id'],))
  148. if not cursor.fetchone():
  149. return False, "字典类型不存在", None
  150. # 创建字典项
  151. cursor.execute("""
  152. INSERT INTO t_sys_dict_category_item
  153. (dict_name, dict_value, dict_desc, category_id, enable_flag, del_flag, sort,
  154. created_by, created_time, updated_time)
  155. VALUES (%s, %s, %s, %s, %s, '0', %s, %s, NOW(), NOW())
  156. """, (
  157. item_data['dict_name'],
  158. item_data['dict_value'],
  159. item_data.get('dict_desc'),
  160. item_data['category_id'],
  161. item_data.get('enable_flag', '1'),
  162. item_data.get('sort'),
  163. creator_id
  164. ))
  165. dict_id = cursor.lastrowid
  166. conn.commit()
  167. return True, "字典项创建成功", dict_id
  168. except Exception as e:
  169. logger.exception("创建字典项错误")
  170. conn.rollback()
  171. return False, "服务器内部错误", None
  172. finally:
  173. cursor.close()
  174. conn.close()
  175. async def update_item(self, dict_id: int, item_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
  176. """更新字典项"""
  177. conn = get_db_connection()
  178. if not conn:
  179. return False, "数据库连接失败"
  180. cursor = conn.cursor()
  181. try:
  182. # 检查字典项是否存在
  183. cursor.execute("SELECT dict_id FROM t_sys_dict_category_item WHERE dict_id = %s AND del_flag = '0'", (dict_id,))
  184. if not cursor.fetchone():
  185. return False, "字典项不存在"
  186. # 如果更新了category_id,检查字典类型是否存在
  187. if 'category_id' in item_data and item_data['category_id']:
  188. cursor.execute("SELECT category_id FROM t_sys_dict_category WHERE category_id = %s AND del_flag = '0'",
  189. (item_data['category_id'],))
  190. if not cursor.fetchone():
  191. return False, "字典类型不存在"
  192. # 更新字典项
  193. update_fields = []
  194. update_values = []
  195. for field in ['dict_name', 'dict_value', 'dict_desc', 'category_id', 'enable_flag', 'sort']:
  196. if field in item_data and item_data[field] is not None:
  197. update_fields.append(f'{field} = %s')
  198. update_values.append(item_data[field])
  199. if update_fields:
  200. update_values.extend([updater_id, dict_id])
  201. cursor.execute(f"""
  202. UPDATE t_sys_dict_category_item
  203. SET {', '.join(update_fields)}, updated_by = %s, updated_time = NOW()
  204. WHERE dict_id = %s
  205. """, update_values)
  206. conn.commit()
  207. return True, "字典项更新成功"
  208. except Exception as e:
  209. logger.exception("更新字典项错误")
  210. conn.rollback()
  211. return False, "服务器内部错误"
  212. finally:
  213. cursor.close()
  214. conn.close()
  215. async def delete_item(self, dict_id: int) -> Tuple[bool, str]:
  216. """删除字典项(逻辑删除)"""
  217. conn = get_db_connection()
  218. if not conn:
  219. return False, "数据库连接失败"
  220. cursor = conn.cursor()
  221. try:
  222. # 检查字典项是否存在
  223. cursor.execute("SELECT dict_id FROM t_sys_dict_category_item WHERE dict_id = %s AND del_flag = '0'", (dict_id,))
  224. if not cursor.fetchone():
  225. return False, "字典项不存在"
  226. # 逻辑删除
  227. cursor.execute("""
  228. UPDATE t_sys_dict_category_item
  229. SET del_flag = '1', updated_time = NOW()
  230. WHERE dict_id = %s
  231. """, (dict_id,))
  232. conn.commit()
  233. return True, "字典项删除成功"
  234. except Exception as e:
  235. logger.exception("删除字典项错误")
  236. conn.rollback()
  237. return False, "服务器内部错误"
  238. finally:
  239. cursor.close()
  240. conn.close()
  241. async def batch_delete_items(self, dict_ids: List[int]) -> Tuple[bool, str]:
  242. """批量删除字典项(逻辑删除)"""
  243. conn = get_db_connection()
  244. if not conn:
  245. return False, "数据库连接失败"
  246. cursor = conn.cursor()
  247. try:
  248. if not dict_ids:
  249. return False, "字典项ID列表不能为空"
  250. # 批量逻辑删除
  251. placeholders = ','.join(['%s'] * len(dict_ids))
  252. cursor.execute(f"""
  253. UPDATE t_sys_dict_category_item
  254. SET del_flag = '1', updated_time = NOW()
  255. WHERE dict_id IN ({placeholders}) AND del_flag = '0'
  256. """, dict_ids)
  257. affected_rows = cursor.rowcount
  258. conn.commit()
  259. if affected_rows == 0:
  260. return False, "没有找到可删除的字典项"
  261. return True, f"成功删除{affected_rows}个字典项"
  262. except Exception as e:
  263. logger.exception("批量删除字典项错误")
  264. conn.rollback()
  265. return False, "服务器内部错误"
  266. finally:
  267. cursor.close()
  268. conn.close()
  269. async def toggle_item_status(self, dict_id: int, enable_flag: str, updater_id: str) -> Tuple[bool, str]:
  270. """切换字典项启用状态"""
  271. conn = get_db_connection()
  272. if not conn:
  273. return False, "数据库连接失败"
  274. cursor = conn.cursor()
  275. try:
  276. # 检查字典项是否存在
  277. cursor.execute("SELECT dict_id FROM t_sys_dict_category_item WHERE dict_id = %s AND del_flag = '0'", (dict_id,))
  278. if not cursor.fetchone():
  279. return False, "字典项不存在"
  280. # 更新状态
  281. cursor.execute("""
  282. UPDATE t_sys_dict_category_item
  283. SET enable_flag = %s, updated_by = %s, updated_time = NOW()
  284. WHERE dict_id = %s
  285. """, (enable_flag, updater_id, dict_id))
  286. conn.commit()
  287. status_text = "启用" if enable_flag == "1" else "禁用"
  288. return True, f"字典项{status_text}成功"
  289. except Exception as e:
  290. logger.exception("切换字典项状态错误")
  291. conn.rollback()
  292. return False, "服务器内部错误"
  293. finally:
  294. cursor.close()
  295. conn.close()