sample_service.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. """
  2. 样本中心服务层
  3. 从 sample_view.py 提取的SQL查询逻辑
  4. """
  5. import logging
  6. import uuid
  7. from typing import Optional, List, Dict, Any, Tuple
  8. from app.base.async_mysql_connection import get_db_connection
  9. from app.base.minio_connection import get_minio_manager
  10. from app.core.config import config_handler
  11. logger = logging.getLogger(__name__)
  12. # 表名映射
  13. TABLE_MAP = {
  14. 'basis': 't_samp_standard_base_info',
  15. 'work': 't_samp_construction_plan_base_info',
  16. 'job': 't_samp_office_regulations'
  17. }
  18. def get_table_name(source_type: str) -> Optional[str]:
  19. """根据source_type获取表名"""
  20. return TABLE_MAP.get(source_type)
  21. class SampleService:
  22. """样本中心服务类 - 使用 SQL 查询方式"""
  23. def __init__(self):
  24. """初始化服务"""
  25. # 使用统一的 MinIO 管理器
  26. self.minio_manager = get_minio_manager()
  27. async def get_upload_url(self, filename: str, content_type: str) -> Tuple[bool, str, Dict[str, Any]]:
  28. """获取 MinIO 预签名上传 URL"""
  29. try:
  30. data = self.minio_manager.get_upload_url(filename, content_type)
  31. return True, "成功获取上传链接", data
  32. except Exception as e:
  33. logger.exception("生成上传链接失败")
  34. return False, f"生成上传链接失败: {str(e)}", {}
  35. # ==================== 文档管理 ====================
  36. async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str) -> Tuple[int, str]:
  37. """批量将文档入库到知识库
  38. Args:
  39. doc_ids: 文档ID列表
  40. username: 操作人
  41. """
  42. conn = get_db_connection()
  43. if not conn:
  44. return 0, "数据库连接失败"
  45. cursor = conn.cursor()
  46. try:
  47. # 1. 严格检查转换状态:只有 conversion_status = 2 (转换成功) 且 whether_to_enter = 0 (未入库) 的才能入库
  48. check_sql = f"SELECT id, title FROM t_samp_document_main WHERE id IN ({','.join(['%s']*len(doc_ids))}) AND conversion_status = 2 AND whether_to_enter = 0"
  49. cursor.execute(check_sql, tuple(doc_ids))
  50. valid_docs = cursor.fetchall()
  51. valid_ids = [doc['id'] for doc in valid_docs]
  52. if not valid_ids:
  53. return 0, "选中的文档中没有满足入库条件(已转换成功且未入库)的记录"
  54. # 2. 更新状态为已入库
  55. update_sql = f"UPDATE t_samp_document_main SET whether_to_enter = 1, updated_by = %s, updated_time = NOW() WHERE id IN ({','.join(['%s']*len(valid_ids))})"
  56. cursor.execute(update_sql, (username, *valid_ids))
  57. affected_rows = cursor.rowcount
  58. conn.commit()
  59. return affected_rows, f"成功入库 {affected_rows} 份文档"
  60. except Exception as e:
  61. logger.exception(f"文档批量入库失败: {e}")
  62. conn.rollback()
  63. return 0, f"入库失败: {str(e)}"
  64. finally:
  65. cursor.close()
  66. conn.close()
  67. async def batch_delete_documents(self, doc_ids: List[str]) -> Tuple[int, str]:
  68. """批量删除文档"""
  69. conn = get_db_connection()
  70. if not conn:
  71. return 0, "数据库连接失败"
  72. cursor = conn.cursor()
  73. try:
  74. if not doc_ids:
  75. return 0, "未指定要删除的文档 ID"
  76. placeholders = ', '.join(['%s'] * len(doc_ids))
  77. # 尝试同步删除子表中的数据
  78. try:
  79. cursor.execute(f"SELECT source_type, source_id FROM t_samp_document_main WHERE id IN ({placeholders})", doc_ids)
  80. docs = cursor.fetchall()
  81. for doc_row in docs:
  82. s_type = doc_row['source_type']
  83. s_id = doc_row['source_id']
  84. if s_type and s_id:
  85. sub_table = get_table_name(s_type)
  86. if sub_table:
  87. sub_sql = f"DELETE FROM {sub_table} WHERE id = %s"
  88. try:
  89. cursor.execute(sub_sql, (s_id,))
  90. except Exception as sub_e:
  91. logger.error(f"删除子表 {sub_table} 数据失败: {sub_e}")
  92. except Exception as sync_e:
  93. logger.error(f"同步删除子表数据失败: {sync_e}")
  94. # 删除主表 t_samp_document_main 中的数据
  95. sql_main = f"DELETE FROM t_samp_document_main WHERE id IN ({placeholders})"
  96. cursor.execute(sql_main, doc_ids)
  97. affected_rows = cursor.rowcount
  98. conn.commit()
  99. return affected_rows, f"成功删除 {affected_rows} 条文档数据"
  100. except Exception as e:
  101. logger.exception("批量删除失败")
  102. conn.rollback()
  103. return 0, f"批量删除失败: {str(e)}"
  104. finally:
  105. cursor.close()
  106. conn.close()
  107. async def get_document_list(
  108. self,
  109. whether_to_enter: Optional[int] = None,
  110. keyword: Optional[str] = None,
  111. table_type: Optional[str] = None,
  112. page: int = 1,
  113. size: int = 50
  114. ) -> Tuple[List[Dict[str, Any]], int, int, int]:
  115. """获取文档列表(从主表查询)"""
  116. conn = get_db_connection()
  117. if not conn:
  118. return [], 0, 0, 0
  119. cursor = conn.cursor()
  120. try:
  121. where_clauses = []
  122. params = []
  123. if table_type:
  124. where_clauses.append("source_type = %s")
  125. params.append(table_type)
  126. if whether_to_enter is not None:
  127. where_clauses.append("whether_to_enter = %s")
  128. params.append(whether_to_enter)
  129. if keyword:
  130. where_clauses.append("title LIKE %s")
  131. params.append(f"%{keyword}%")
  132. where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
  133. offset = (page - 1) * size
  134. sql = f"SELECT * FROM t_samp_document_main {where_sql} ORDER BY created_time DESC LIMIT %s OFFSET %s"
  135. params.extend([size, offset])
  136. logger.info(f"Executing SQL: {sql} with params: {params}")
  137. cursor.execute(sql, tuple(params))
  138. items = []
  139. for row in cursor.fetchall():
  140. item = row # DictCursor already returns dict
  141. # 格式化时间
  142. for key in ['created_time', 'updated_time']:
  143. if item.get(key) and hasattr(item[key], 'isoformat'):
  144. item[key] = item[key].isoformat()
  145. # 增加格式化文件名供前端显示
  146. if item.get('conversion_status') == 2:
  147. title = item.get('title', 'document')
  148. item['md_display_name'] = f"{title}.md"
  149. item['json_display_name'] = f"{title}.json"
  150. items.append(item)
  151. # 总数
  152. count_sql = f"SELECT COUNT(*) as count FROM t_samp_document_main {where_sql}"
  153. cursor.execute(count_sql, tuple(params[:-2]))
  154. res = cursor.fetchone()
  155. total = res['count'] if res else 0
  156. # 统计数据
  157. cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main")
  158. res = cursor.fetchone()
  159. all_total = res['count'] if res else 0
  160. cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main WHERE whether_to_enter = 1")
  161. res = cursor.fetchone()
  162. total_entered = res['count'] if res else 0
  163. return items, total, all_total, total_entered
  164. except Exception as e:
  165. logger.exception("获取文档列表失败")
  166. return [], 0, 0, 0
  167. finally:
  168. cursor.close()
  169. conn.close()
  170. async def get_document_detail(self, doc_id: str) -> Optional[Dict[str, Any]]:
  171. """获取文档详情"""
  172. conn = get_db_connection()
  173. if not conn:
  174. return None
  175. cursor = conn.cursor()
  176. try:
  177. # 查询主表
  178. cursor.execute("SELECT * FROM t_samp_document_main WHERE id = %s", (doc_id,))
  179. doc = cursor.fetchone()
  180. if not doc:
  181. return None
  182. # 格式化时间
  183. for key in ['created_time', 'updated_time']:
  184. if doc.get(key) and hasattr(doc[key], 'isoformat'):
  185. doc[key] = doc[key].isoformat()
  186. # 增加格式化文件名供前端显示
  187. if doc.get('conversion_status') == 2:
  188. title = doc.get('title', 'document')
  189. doc['md_display_name'] = f"{title}.md"
  190. doc['json_display_name'] = f"{title}.json"
  191. return doc
  192. except Exception as e:
  193. logger.exception("获取文档详情失败")
  194. return None
  195. finally:
  196. cursor.close()
  197. conn.close()
  198. def _to_int(self, value: Any) -> Optional[int]:
  199. """安全转换为整数"""
  200. if value is None or value == '' or str(value).lower() == 'null':
  201. return None
  202. try:
  203. return int(value)
  204. except (ValueError, TypeError):
  205. return None
  206. def _to_date(self, value: Any) -> Optional[str]:
  207. """安全处理日期字符串"""
  208. if value is None or value == '' or str(value).lower() == 'null':
  209. return None
  210. # 如果已经是 datetime.date 或 datetime.datetime 对象
  211. if hasattr(value, 'strftime'):
  212. return value.strftime('%Y-%m-%d')
  213. return str(value)
  214. async def add_document(self, doc_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]:
  215. """添加新文档(先主表后子表,解耦触发器)"""
  216. conn = get_db_connection()
  217. if not conn:
  218. return False, "数据库连接失败", None
  219. cursor = conn.cursor()
  220. try:
  221. doc_id = str(uuid.uuid4())
  222. source_id = str(uuid.uuid4())
  223. table_type = doc_data.get('table_type', 'basis')
  224. table_name = TABLE_MAP.get(table_type)
  225. # 安全转换字段
  226. release_date = self._to_date(doc_data.get('release_date'))
  227. # 1. 插入主表 (作为资产中心)
  228. cursor.execute(
  229. """
  230. INSERT INTO t_samp_document_main (
  231. id, title, source_type, source_id, file_url,
  232. file_extension, created_by, updated_by, created_time, updated_time,
  233. conversion_status
  234. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0)
  235. """,
  236. (
  237. doc_id, doc_data.get('title'), table_type, source_id, doc_data.get('file_url'),
  238. doc_data.get('file_extension'), user_id, user_id
  239. )
  240. )
  241. # 2. 插入子表 (仅存储业务字段)
  242. if table_type == 'basis':
  243. cursor.execute(
  244. f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
  245. (source_id, doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), user_id)
  246. )
  247. elif table_type == 'work':
  248. cursor.execute(
  249. f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
  250. (source_id, doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, user_id)
  251. )
  252. elif table_type == 'job':
  253. cursor.execute(
  254. f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())",
  255. (source_id, doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, user_id)
  256. )
  257. conn.commit()
  258. return True, "文档添加成功", doc_id
  259. except Exception as e:
  260. logger.exception("添加文档失败")
  261. conn.rollback()
  262. return False, str(e), None
  263. finally:
  264. cursor.close()
  265. conn.close()
  266. async def edit_document(self, doc_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
  267. """编辑文档(同步主表和子表,解耦触发器)"""
  268. conn = get_db_connection()
  269. if not conn:
  270. return False, "数据库连接失败"
  271. cursor = conn.cursor()
  272. try:
  273. doc_id = doc_data.get('id')
  274. source_id = doc_data.get('source_id')
  275. table_type = doc_data.get('table_type', 'basis')
  276. table_name = TABLE_MAP.get(table_type)
  277. # 安全转换字段
  278. release_date = self._to_date(doc_data.get('release_date'))
  279. # 1. 更新主表
  280. cursor.execute(
  281. """
  282. UPDATE t_samp_document_main
  283. SET title = %s, file_url = %s, file_extension = %s,
  284. updated_by = %s, updated_time = NOW()
  285. WHERE id = %s
  286. """,
  287. (
  288. doc_data.get('title'), doc_data.get('file_url'), doc_data.get('file_extension'),
  289. updater_id, doc_id
  290. )
  291. )
  292. # 2. 更新子表
  293. if table_type == 'basis':
  294. cursor.execute(
  295. f"UPDATE {table_name} SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s, document_type = %s, professional_field = %s, validity = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
  296. (doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), updater_id, source_id)
  297. )
  298. elif table_type == 'work':
  299. cursor.execute(
  300. f"UPDATE {table_name} SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
  301. (doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, updater_id, source_id)
  302. )
  303. elif table_type == 'job':
  304. cursor.execute(
  305. f"UPDATE {table_name} SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
  306. (doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, updater_id, source_id)
  307. )
  308. conn.commit()
  309. return True, "文档更新成功"
  310. except Exception as e:
  311. logger.exception("编辑文档失败")
  312. conn.rollback()
  313. return False, str(e)
  314. finally:
  315. cursor.close()
  316. conn.close()
  317. async def enter_document(self, doc_id: str, username: str) -> Tuple[bool, str]:
  318. """文档入库(单个)"""
  319. affected_rows, message = await self.batch_enter_knowledge_base([doc_id], username)
  320. return affected_rows > 0, message
  321. async def get_basic_info_list(
  322. self,
  323. type: str,
  324. page: int,
  325. size: int,
  326. keyword: Optional[str] = None,
  327. **filters
  328. ) -> Tuple[List[Dict[str, Any]], int]:
  329. """获取基本信息列表(关联主表获取文件和转换状态)"""
  330. conn = get_db_connection()
  331. if not conn:
  332. return [], 0
  333. cursor = conn.cursor()
  334. try:
  335. # 根据类型选择表名和字段映射
  336. if type == 'basis':
  337. table_name = "t_samp_standard_base_info"
  338. # 关联主表字段:file_url, conversion_status, md_url, json_url
  339. fields = """
  340. s.id, s.chinese_name as title, s.standard_number as standard_no,
  341. s.issuing_authority, s.release_date, s.document_type,
  342. s.professional_field, s.validity, s.created_by, s.created_time,
  343. m.file_url, m.conversion_status, m.md_url, m.json_url, m.id as doc_id
  344. """
  345. field_map = {
  346. 'title': 's.chinese_name',
  347. 'standard_no': 's.standard_number',
  348. 'issuing_authority': 's.issuing_authority',
  349. 'release_date': 's.release_date',
  350. 'document_type': 's.document_type',
  351. 'professional_field': 's.professional_field',
  352. 'validity': 's.validity'
  353. }
  354. elif type == 'work':
  355. table_name = "t_samp_construction_plan_base_info"
  356. fields = """
  357. s.id, s.plan_name as title, NULL as standard_no,
  358. s.compiling_unit as issuing_authority, s.compiling_date as release_date,
  359. NULL as document_type, NULL as professional_field, NULL as validity,
  360. s.created_by, s.created_time,
  361. m.file_url, m.conversion_status, m.md_url, m.json_url, m.id as doc_id
  362. """
  363. field_map = {
  364. 'title': 's.plan_name',
  365. 'issuing_authority': 's.compiling_unit',
  366. 'release_date': 's.compiling_date'
  367. }
  368. elif type == 'job':
  369. table_name = "t_samp_office_regulations"
  370. fields = """
  371. s.id, s.file_name as title, NULL as standard_no,
  372. s.issuing_department as issuing_authority, s.publish_date as release_date,
  373. s.document_type, NULL as professional_field, NULL as validity,
  374. s.created_by, s.created_time,
  375. m.file_url, m.conversion_status, m.md_url, m.json_url, m.id as doc_id
  376. """
  377. field_map = {
  378. 'title': 's.file_name',
  379. 'issuing_authority': 's.issuing_department',
  380. 'release_date': 's.publish_date',
  381. 'document_type': 's.document_type'
  382. }
  383. else:
  384. return [], 0
  385. where_clauses = []
  386. params = []
  387. # 统一关键字搜索
  388. if keyword:
  389. if type == 'basis':
  390. where_clauses.append("(s.chinese_name LIKE %s OR s.standard_number LIKE %s)")
  391. params.extend([f"%{keyword}%", f"%{keyword}%"])
  392. elif type == 'work':
  393. where_clauses.append("s.plan_name LIKE %s")
  394. params.append(f"%{keyword}%")
  395. elif type == 'job':
  396. where_clauses.append("s.file_name LIKE %s")
  397. params.append(f"%{keyword}%")
  398. # 精细化检索
  399. for filter_key, filter_value in filters.items():
  400. if not filter_value:
  401. continue
  402. db_field = field_map.get(filter_key)
  403. if db_field:
  404. where_clauses.append(f"{db_field} = %s")
  405. params.append(filter_value)
  406. where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
  407. offset = (page - 1) * size
  408. # 使用 LEFT JOIN 关联主表
  409. sql = f"""
  410. SELECT {fields}
  411. FROM {table_name} s
  412. LEFT JOIN t_samp_document_main m ON s.id = m.source_id AND m.source_type = %s
  413. {where_sql}
  414. ORDER BY s.created_time DESC
  415. LIMIT %s OFFSET %s
  416. """
  417. params = [type] + params + [size, offset]
  418. cursor.execute(sql, tuple(params))
  419. items = cursor.fetchall()
  420. # 总数
  421. count_sql = f"SELECT COUNT(*) as count FROM {table_name} s {where_sql}"
  422. cursor.execute(count_sql, tuple(params[1:-2]))
  423. res = cursor.fetchone()
  424. total = res['count'] if res else 0
  425. return items, total
  426. except Exception as e:
  427. logger.exception(f"获取 {type} 列表失败")
  428. return [], 0
  429. finally:
  430. cursor.close()
  431. conn.close()
  432. # ==================== 文档转换 ====================
  433. async def get_document_source_type(self, doc_id: str) -> Optional[str]:
  434. """获取文档的source_type"""
  435. conn = get_db_connection()
  436. if not conn:
  437. return None
  438. cursor = conn.cursor()
  439. try:
  440. cursor.execute("SELECT source_type FROM t_samp_document_main WHERE id = %s", (doc_id,))
  441. res = cursor.fetchone()
  442. return res['source_type'] if res else None
  443. except Exception as e:
  444. logger.exception(f"获取文档source_type失败: {e}")
  445. return None
  446. finally:
  447. cursor.close()
  448. conn.close()
  449. async def get_document_title(self, doc_id: str) -> str:
  450. """获取文档标题"""
  451. conn = get_db_connection()
  452. if not conn:
  453. return "文档"
  454. cursor = conn.cursor()
  455. try:
  456. cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (doc_id,))
  457. res = cursor.fetchone()
  458. return res['title'] if res else "文档"
  459. except Exception as e:
  460. logger.exception(f"获取文档标题失败: {e}")
  461. return "文档"
  462. finally:
  463. cursor.close()
  464. conn.close()
  465. async def update_conversion_status(self, doc_id: str, status: int,
  466. md_url: Optional[str] = None,
  467. json_url: Optional[str] = None,
  468. error_message: Optional[str] = None) -> bool:
  469. """更新文档转换状态
  470. Args:
  471. doc_id: 文档ID
  472. status: 转换状态 (0=未转换, 1=转换中, 2=已完成, 3=失败)
  473. md_url: Markdown文件URL
  474. json_url: JSON文件URL
  475. error_message: 错误信息
  476. """
  477. conn = get_db_connection()
  478. if not conn:
  479. return False
  480. cursor = conn.cursor()
  481. try:
  482. update_clauses = ["conversion_status = %s"]
  483. params = [status]
  484. if error_message:
  485. update_clauses.append("conversion_error = %s")
  486. params.append(error_message)
  487. if md_url:
  488. update_clauses.append("md_url = %s")
  489. params.append(md_url)
  490. if json_url:
  491. update_clauses.append("json_url = %s")
  492. params.append(json_url)
  493. sql = f"UPDATE t_samp_document_main SET {', '.join(update_clauses)}, updated_time = NOW() WHERE id = %s"
  494. params.append(doc_id)
  495. cursor.execute(sql, tuple(params))
  496. conn.commit()
  497. return True
  498. except Exception as e:
  499. logger.exception(f"更新转换进度失败: {e}")
  500. conn.rollback()
  501. return False
  502. finally:
  503. cursor.close()
  504. conn.close()
  505. # ==================== 基础信息管理 ====================
  506. async def add_basic_info(self, type: str, data: Dict[str, Any], user_id: str) -> Tuple[bool, str]:
  507. """新增基本信息"""
  508. conn = get_db_connection()
  509. if not conn:
  510. return False, "数据库连接失败"
  511. cursor = conn.cursor()
  512. try:
  513. table_name = TABLE_MAP.get(type)
  514. if not table_name:
  515. return False, "无效的类型"
  516. source_id = str(uuid.uuid4())
  517. doc_id = str(uuid.uuid4())
  518. file_url = data.get('file_url')
  519. file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
  520. # 1. 插入主表 (解耦触发器,手动同步)
  521. cursor.execute(
  522. """
  523. INSERT INTO t_samp_document_main (
  524. id, title, source_type, source_id, file_url,
  525. file_extension, created_by, updated_by, created_time, updated_time,
  526. conversion_status
  527. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0)
  528. """,
  529. (
  530. doc_id, data.get('title'), type, source_id, file_url,
  531. file_extension, user_id, user_id
  532. )
  533. )
  534. # 2. 插入子表 (移除 file_url,因为它现在只存储在主表中)
  535. if type == 'basis':
  536. sql = f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())"
  537. params = (source_id, data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('document_type'), data.get('professional_field'), data.get('validity', '现行'), user_id)
  538. elif type == 'work':
  539. sql = f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())"
  540. params = (source_id, data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')), user_id)
  541. elif type == 'job':
  542. sql = f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())"
  543. params = (source_id, data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')), user_id)
  544. else:
  545. return False, "不支持的类型"
  546. cursor.execute(sql, params)
  547. conn.commit()
  548. return True, "新增成功"
  549. except Exception as e:
  550. logger.exception("新增基本信息失败")
  551. conn.rollback()
  552. return False, str(e)
  553. finally:
  554. cursor.close()
  555. conn.close()
  556. async def edit_basic_info(self, type: str, info_id: str, data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
  557. """编辑基本信息"""
  558. conn = get_db_connection()
  559. if not conn:
  560. return False, "数据库连接失败"
  561. cursor = conn.cursor()
  562. try:
  563. table_name = TABLE_MAP.get(type)
  564. if not table_name:
  565. return False, "无效的类型"
  566. file_url = data.get('file_url')
  567. file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
  568. # 1. 更新主表 (解耦触发器)
  569. cursor.execute(
  570. """
  571. UPDATE t_samp_document_main
  572. SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW()
  573. WHERE source_id = %s AND source_type = %s
  574. """,
  575. (data.get('title'), file_url, file_extension, updater_id, info_id, type)
  576. )
  577. # 2. 更新子表 (移除 file_url)
  578. if type == 'basis':
  579. sql = f"""
  580. UPDATE {table_name}
  581. SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s,
  582. document_type = %s, professional_field = %s, validity = %s,
  583. english_name = %s, implementation_date = %s, drafting_unit = %s,
  584. approving_department = %s, participating_units = %s, engineering_phase = %s,
  585. reference_basis = %s, source_url = %s,
  586. updated_by = %s, updated_time = NOW()
  587. WHERE id = %s
  588. """
  589. params = (
  590. data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
  591. data.get('document_type'), data.get('professional_field'), data.get('validity'),
  592. data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'),
  593. data.get('approving_department'), data.get('participating_units'), data.get('engineering_phase'),
  594. data.get('reference_basis'), data.get('source_url'),
  595. updater_id, info_id
  596. )
  597. elif type == 'work':
  598. # 构造 compilation_basis 更新部分
  599. basis_updates = ", ".join([f"compilation_basis_{i} = %s" for i in range(1, 10)])
  600. sql = f"""
  601. UPDATE {table_name}
  602. SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s,
  603. plan_summary = %s, {basis_updates},
  604. updated_by = %s, updated_time = NOW()
  605. WHERE id = %s
  606. """
  607. # 准备 compilation_basis 参数
  608. basis_params = [data.get(f'compilation_basis_{i}') for i in range(1, 10)]
  609. params = [
  610. data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
  611. data.get('plan_summary')
  612. ] + basis_params + [updater_id, info_id]
  613. # 转换为 tuple
  614. params = tuple(params)
  615. elif type == 'job':
  616. sql = f"""
  617. UPDATE {table_name}
  618. SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s,
  619. effective_start_date = %s, effective_end_date = %s,
  620. updated_by = %s, updated_time = NOW()
  621. WHERE id = %s
  622. """
  623. params = (
  624. data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')),
  625. self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')),
  626. updater_id, info_id
  627. )
  628. else:
  629. return False, "不支持的类型"
  630. cursor.execute(sql, params)
  631. conn.commit()
  632. return True, "编辑成功"
  633. except Exception as e:
  634. logger.exception("编辑基本信息失败")
  635. conn.rollback()
  636. return False, str(e)
  637. finally:
  638. cursor.close()
  639. conn.close()
  640. async def delete_basic_info(self, type: str, info_id: str) -> Tuple[bool, str]:
  641. """删除基本信息"""
  642. conn = get_db_connection()
  643. if not conn:
  644. return False, "数据库连接失败"
  645. cursor = conn.cursor()
  646. try:
  647. table_name = TABLE_MAP.get(type)
  648. if not table_name:
  649. return False, "无效的类型"
  650. # 1. 删除子表记录 (触发器会自动删除主表记录)
  651. cursor.execute(f"DELETE FROM {table_name} WHERE id = %s", (info_id,))
  652. conn.commit()
  653. return True, "删除成功"
  654. except Exception as e:
  655. logger.exception("删除基本信息失败")
  656. conn.rollback()
  657. return False, str(e)
  658. finally:
  659. cursor.close()
  660. conn.close()