sample_service.py 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982
  1. """
  2. 样本中心服务层
  3. 从 sample_view.py 提取的SQL查询逻辑
  4. """
  5. import logging
  6. import uuid
  7. from datetime import datetime
  8. from typing import Optional, List, Dict, Any, Tuple
  9. from app.base.async_mysql_connection import get_db_connection
  10. from app.base.minio_connection import get_minio_manager
  11. from app.core.config import config_handler
  12. from app.services.milvus_service import milvus_service
  13. from app.services.task_service import task_service
  14. logger = logging.getLogger(__name__)
  15. # 表名映射
  16. TABLE_MAP = {
  17. 'basis': 't_samp_standard_base_info',
  18. 'work': 't_samp_construction_plan_base_info',
  19. 'job': 't_samp_office_regulations'
  20. }
  21. def get_table_name(source_type: str) -> Optional[str]:
  22. """根据source_type获取表名"""
  23. return TABLE_MAP.get(source_type)
  24. class SampleService:
  25. """样本中心服务类 - 使用 SQL 查询方式"""
  26. def __init__(self):
  27. """初始化服务"""
  28. # 使用统一的 MinIO 管理器
  29. self.minio_manager = get_minio_manager()
  30. # 使用全局 Milvus 服务
  31. self.milvus_service = milvus_service
  32. # 确保集合已创建
  33. try:
  34. self.milvus_service.ensure_collections()
  35. except Exception as e:
  36. logger.error(f"初始化 Milvus 集合失败: {e}")
  37. async def get_upload_url(self, filename: str, content_type: str) -> Tuple[bool, str, Dict[str, Any]]:
  38. """获取 MinIO 预签名上传 URL"""
  39. try:
  40. data = self.minio_manager.get_upload_url(filename, content_type)
  41. return True, "成功获取上传链接", data
  42. except Exception as e:
  43. logger.exception("生成上传链接失败")
  44. return False, f"生成上传链接失败: {str(e)}", {}
  45. # ==================== 文档管理 ====================
  46. async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str) -> Tuple[int, str]:
  47. """批量将文档入库到知识库
  48. Args:
  49. doc_ids: 文档ID列表
  50. username: 操作人
  51. """
  52. conn = get_db_connection()
  53. if not conn:
  54. return 0, "数据库连接失败,请检查数据库服务状态"
  55. cursor = conn.cursor()
  56. success_count = 0
  57. skipped_count = 0
  58. failed_count = 0
  59. error_details = []
  60. try:
  61. # 1. 获取所有选中选中的文档详情
  62. placeholders = ','.join(['%s']*len(doc_ids))
  63. fetch_sql = f"""
  64. SELECT id, title, source_type, md_url, conversion_status, created_time
  65. FROM t_samp_document_main
  66. WHERE id IN ({placeholders})
  67. """
  68. cursor.execute(fetch_sql, tuple(doc_ids))
  69. selected_docs = cursor.fetchall()
  70. if not selected_docs:
  71. return 0, "选中的文档在数据库中不存在"
  72. # 2. 逐份处理
  73. for doc in selected_docs:
  74. doc_id = doc['id']
  75. title = doc.get('title', '未命名文档')
  76. status = doc.get('conversion_status')
  77. md_url = doc.get('md_url')
  78. # A. 检查转换状态
  79. if status != 2:
  80. reason = "尚未转换成功" if status == 0 else "正在转换中" if status == 1 else "转换失败"
  81. logger.warning(f"文档 {title}({doc_id}) 状态为 {status},跳过入库: {reason}")
  82. skipped_count += 1
  83. error_details.append(f"· {title}: {reason}")
  84. continue
  85. if not md_url:
  86. logger.warning(f"文档 {title}({doc_id}) 缺少 md_url,跳过入库")
  87. skipped_count += 1
  88. error_details.append(f"· {title}: 转换结果地址丢失")
  89. continue
  90. # B. 从 MinIO 获取 Markdown 内容
  91. try:
  92. md_content = self.minio_manager.get_object_content(md_url)
  93. if not md_content:
  94. raise ValueError(f"无法从 MinIO 读取内容 (URL: {md_url})")
  95. except Exception as minio_err:
  96. logger.error(f"读取文档 {title} 内容失败: {minio_err}")
  97. failed_count += 1
  98. error_details.append(f"· {title}: 读取云端文件失败")
  99. continue
  100. # C. 调用 MilvusService 进行切分和入库
  101. try:
  102. # 准备元数据
  103. doc_info = {
  104. "doc_id": doc_id,
  105. "doc_name": title,
  106. "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else 20260127,
  107. "tags": doc.get('source_type') or 'unknown'
  108. }
  109. await self.milvus_service.insert_knowledge(md_content, doc_info)
  110. # D. 添加到任务管理中心 (类型为 data)
  111. try:
  112. await task_service.add_task(doc_id, 'data')
  113. except Exception as task_err:
  114. logger.error(f"添加文档 {title} 到任务中心失败: {task_err}")
  115. # E. 更新数据库状态
  116. update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 1, updated_by = %s, updated_time = NOW() WHERE id = %s"
  117. cursor.execute(update_sql, (username, doc_id))
  118. success_count += 1
  119. except Exception as milvus_err:
  120. logger.exception(f"文档 {title} 写入向量库失败")
  121. failed_count += 1
  122. error_details.append(f"· {title}: 写入向量库失败 ({str(milvus_err)})")
  123. continue
  124. conn.commit()
  125. # 构造详细的消息
  126. msg = f"入库完成:成功 {success_count} 份"
  127. if skipped_count > 0:
  128. msg += f",跳过 {skipped_count} 份"
  129. if failed_count > 0:
  130. msg += f",失败 {failed_count} 份"
  131. if error_details:
  132. detailed_msg = msg + "\n\n详情:\n" + "\n".join(error_details)
  133. return success_count, detailed_msg
  134. return success_count, msg
  135. except Exception as e:
  136. logger.exception(f"文档批量入库异常: {e}")
  137. conn.rollback()
  138. return 0, f"操作异常: {str(e)}"
  139. finally:
  140. cursor.close()
  141. conn.close()
  142. async def batch_delete_documents(self, doc_ids: List[str]) -> Tuple[int, str]:
  143. """批量删除文档"""
  144. conn = get_db_connection()
  145. if not conn:
  146. return 0, "数据库连接失败"
  147. cursor = conn.cursor()
  148. try:
  149. if not doc_ids:
  150. return 0, "未指定要删除的文档 ID"
  151. placeholders = ', '.join(['%s'] * len(doc_ids))
  152. # 尝试同步删除子表中的数据
  153. try:
  154. for sub_table in TABLE_MAP.values():
  155. sub_sql = f"DELETE FROM {sub_table} WHERE id IN ({placeholders})"
  156. try:
  157. cursor.execute(sub_sql, doc_ids)
  158. except Exception as sub_e:
  159. logger.error(f"删除子表 {sub_table} 数据失败: {sub_e}")
  160. except Exception as sync_e:
  161. logger.error(f"同步删除子表数据失败: {sync_e}")
  162. # 删除主表 t_samp_document_main 中的数据
  163. sql_main = f"DELETE FROM t_samp_document_main WHERE id IN ({placeholders})"
  164. cursor.execute(sql_main, doc_ids)
  165. affected_rows = cursor.rowcount
  166. # 同步删除任务管理中心的数据
  167. try:
  168. for doc_id in doc_ids:
  169. await task_service.delete_task(doc_id)
  170. except Exception as task_err:
  171. logger.error(f"同步删除任务中心数据失败: {task_err}")
  172. conn.commit()
  173. return affected_rows, f"成功删除 {affected_rows} 条文档数据"
  174. except Exception as e:
  175. logger.exception("批量删除失败")
  176. conn.rollback()
  177. return 0, f"批量删除失败: {str(e)}"
  178. finally:
  179. cursor.close()
  180. conn.close()
  181. async def get_document_list(
  182. self,
  183. whether_to_enter: Optional[int] = None,
  184. keyword: Optional[str] = None,
  185. table_type: Optional[str] = None,
  186. plan_category: Optional[str] = None,
  187. level_2_classification: Optional[str] = None,
  188. level_3_classification: Optional[str] = None,
  189. level_4_classification: Optional[str] = None,
  190. page: int = 1,
  191. size: int = 50
  192. ) -> Tuple[List[Dict[str, Any]], int, int, int]:
  193. """获取文档列表 (支持关联查询子表)"""
  194. conn = get_db_connection()
  195. if not conn:
  196. return [], 0, 0, 0
  197. cursor = conn.cursor()
  198. try:
  199. where_clauses = []
  200. params = []
  201. # 基础查询
  202. if table_type and table_type in TABLE_MAP:
  203. # 如果指定了类型,使用 LEFT JOIN 关联查询,以便搜索子表字段
  204. sub_table = TABLE_MAP[table_type]
  205. from_sql = f"t_samp_document_main m LEFT JOIN {sub_table} s ON m.id = s.id"
  206. fields_sql = "m.*, s.*" # 获取所有字段,包括子表字段
  207. where_clauses.append("m.source_type = %s")
  208. params.append(table_type)
  209. order_sql = "m.created_time DESC"
  210. title_field = "m.title"
  211. # 施工方案特有的过滤字段
  212. if table_type == 'work':
  213. if plan_category:
  214. where_clauses.append("s.plan_category = %s")
  215. params.append(plan_category)
  216. if level_2_classification:
  217. where_clauses.append("s.level_2_classification = %s")
  218. params.append(level_2_classification)
  219. if level_3_classification:
  220. where_clauses.append("s.level_3_classification = %s")
  221. params.append(level_3_classification)
  222. if level_4_classification:
  223. where_clauses.append("s.level_4_classification = %s")
  224. params.append(level_4_classification)
  225. # 特殊处理 id 冲突,确保返回的是主表 m.id
  226. fields_sql = "m.*, s.*, m.id as id"
  227. else:
  228. from_sql = "t_samp_document_main"
  229. fields_sql = "*"
  230. order_sql = "created_time DESC"
  231. title_field = "title"
  232. if whether_to_enter is not None:
  233. # 按照 search_replace_blocks 的逻辑,这里使用 conversion_status 过滤
  234. where_clauses.append("conversion_status = %s")
  235. params.append(whether_to_enter)
  236. if keyword:
  237. where_clauses.append(f"{title_field} LIKE %s")
  238. params.append(f"%{keyword}%")
  239. where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
  240. offset = (page - 1) * size
  241. sql = f"SELECT {fields_sql} FROM {from_sql} {where_sql} ORDER BY {order_sql} LIMIT %s OFFSET %s"
  242. params.extend([size, offset])
  243. logger.info(f"Executing SQL: {sql} with params: {params}")
  244. cursor.execute(sql, tuple(params))
  245. items = []
  246. for row in cursor.fetchall():
  247. item = row
  248. # 处理 URL 转换
  249. for key in ['file_url', 'md_url', 'json_url']:
  250. if item.get(key):
  251. item[key] = self.minio_manager.get_full_url(item[key])
  252. # 映射字段以适配前端通用显示
  253. source_type = item.get('source_type')
  254. if source_type == 'work':
  255. item['issuing_authority'] = item.get('compiling_unit')
  256. item['release_date'] = item.get('compiling_date')
  257. elif source_type == 'job':
  258. item['issuing_authority'] = item.get('issuing_department')
  259. item['release_date'] = item.get('publish_date')
  260. # 格式化时间
  261. for key in ['created_time', 'updated_time', 'release_date', 'publish_date', 'compiling_date']:
  262. if item.get(key) and hasattr(item[key], 'isoformat'):
  263. item[key] = item[key].isoformat()
  264. elif item.get(key) is not None:
  265. item[key] = str(item[key])
  266. # 增加格式化文件名供前端显示
  267. if item.get('conversion_status') == 2:
  268. title = item.get('title', 'document')
  269. item['md_display_name'] = f"{title}.md"
  270. item['json_display_name'] = f"{title}.json"
  271. items.append(item)
  272. # 总数
  273. count_sql = f"SELECT COUNT(*) as count FROM {from_sql} {where_sql}"
  274. cursor.execute(count_sql, tuple(params[:-2]))
  275. res = cursor.fetchone()
  276. total = res['count'] if res else 0
  277. # 统计数据
  278. cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main")
  279. res = cursor.fetchone()
  280. all_total = res['count'] if res else 0
  281. cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main WHERE conversion_status = 2")
  282. res = cursor.fetchone()
  283. total_entered = res['count'] if res else 0
  284. return items, total, all_total, total_entered
  285. except Exception as e:
  286. logger.exception("获取文档列表失败")
  287. return [], 0, 0, 0
  288. finally:
  289. cursor.close()
  290. conn.close()
  291. async def get_document_detail(self, doc_id: str) -> Optional[Dict[str, Any]]:
  292. """获取文档详情 (关联查询子表)"""
  293. conn = get_db_connection()
  294. if not conn:
  295. return None
  296. cursor = conn.cursor()
  297. try:
  298. # 1. 查询主表
  299. cursor.execute("SELECT * FROM t_samp_document_main WHERE id = %s", (doc_id,))
  300. doc = cursor.fetchone()
  301. if not doc:
  302. return None
  303. # 2. 根据 source_type 查询对应的子表信息
  304. source_type = doc.get('source_type')
  305. table_name = TABLE_MAP.get(source_type)
  306. if table_name:
  307. # 关联子表的所有字段
  308. sub_sql = f"SELECT * FROM {table_name} WHERE id = %s"
  309. cursor.execute(sub_sql, (doc_id,))
  310. sub_data = cursor.fetchone()
  311. if sub_data:
  312. # 将子表字段合并到 doc 中,方便前端使用
  313. # 注意:如果字段名冲突,子表字段会覆盖主表字段(除了 id)
  314. sub_data.pop('id', None)
  315. # 特殊处理一些前端需要的映射字段
  316. if source_type == 'basis':
  317. doc['standard_no'] = sub_data.get('standard_number')
  318. elif source_type == 'work':
  319. doc['issuing_authority'] = sub_data.get('compiling_unit')
  320. doc['release_date'] = sub_data.get('compiling_date')
  321. elif source_type == 'job':
  322. doc['issuing_authority'] = sub_data.get('issuing_department')
  323. doc['release_date'] = sub_data.get('publish_date')
  324. doc.update(sub_data)
  325. # 格式化时间
  326. for key in ['created_time', 'updated_time', 'release_date', 'publish_date', 'compiling_date', 'implementation_date']:
  327. val = doc.get(key)
  328. if val and hasattr(val, 'isoformat'):
  329. doc[key] = val.isoformat()
  330. elif val is not None:
  331. doc[key] = str(val)
  332. # 处理 URL 转换
  333. for key in ['file_url', 'md_url', 'json_url']:
  334. if doc.get(key):
  335. doc[key] = self.minio_manager.get_full_url(doc[key])
  336. # 增加格式化文件名供前端显示
  337. if doc.get('conversion_status') == 2:
  338. title = doc.get('title', 'document')
  339. doc['md_display_name'] = f"{title}.md"
  340. doc['json_display_name'] = f"{title}.json"
  341. return doc
  342. except Exception as e:
  343. logger.exception("获取文档详情失败")
  344. return None
  345. finally:
  346. cursor.close()
  347. conn.close()
  348. def _to_int(self, value: Any) -> Optional[int]:
  349. """安全转换为整数"""
  350. if value is None or value == '' or str(value).lower() == 'null':
  351. return None
  352. try:
  353. return int(value)
  354. except (ValueError, TypeError):
  355. return None
  356. def _to_date(self, value: Any) -> Optional[str]:
  357. """安全处理日期字符串"""
  358. if value is None or value == '' or str(value).lower() == 'null':
  359. return None
  360. # 如果已经是 datetime.date 或 datetime.datetime 对象
  361. if hasattr(value, 'strftime'):
  362. return value.strftime('%Y-%m-%d')
  363. return str(value)
  364. async def add_document(self, doc_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]:
  365. """添加新文档(先主表后子表,解耦触发器)"""
  366. conn = get_db_connection()
  367. if not conn:
  368. return False, "数据库连接失败", None
  369. cursor = conn.cursor()
  370. try:
  371. doc_id = str(uuid.uuid4())
  372. table_type = doc_data.get('table_type', 'basis')
  373. table_name = TABLE_MAP.get(table_type)
  374. # 安全转换字段
  375. release_date = self._to_date(doc_data.get('release_date'))
  376. # 处理 URL 存储(转为相对路径)
  377. file_url = self.minio_manager.get_relative_path(doc_data.get('file_url'))
  378. # 1. 插入主表 (作为资产中心)
  379. cursor.execute(
  380. """
  381. INSERT INTO t_samp_document_main (
  382. id, title, source_type, file_url,
  383. file_extension, created_by, updated_by, created_time, updated_time,
  384. conversion_status
  385. ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0)
  386. """,
  387. (
  388. doc_id, doc_data.get('title'), table_type, file_url,
  389. doc_data.get('file_extension'), user_id, user_id
  390. )
  391. )
  392. # 2. 插入子表 (仅存储业务字段)
  393. if table_type == 'basis':
  394. cursor.execute(
  395. f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
  396. (doc_id, doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), doc_data.get('note'), user_id)
  397. )
  398. elif table_type == 'work':
  399. cursor.execute(
  400. f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, plan_summary, compilation_basis, plan_category, level_1_classification, level_2_classification, level_3_classification, level_4_classification, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
  401. (doc_id, doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), doc_data.get('plan_category'), doc_data.get('level_1_classification'), doc_data.get('level_2_classification'), doc_data.get('level_3_classification'), doc_data.get('level_4_classification'), doc_data.get('note'), user_id)
  402. )
  403. elif table_type == 'job':
  404. cursor.execute(
  405. f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
  406. (doc_id, doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, doc_data.get('note'), user_id)
  407. )
  408. # 3. 添加到任务管理中心 (类型为 data)
  409. try:
  410. await task_service.add_task(doc_id, 'data')
  411. except Exception as task_err:
  412. logger.error(f"添加文档 {doc_data.get('title')} 到任务中心失败: {task_err}")
  413. conn.commit()
  414. return True, "文档添加成功", doc_id
  415. except Exception as e:
  416. logger.exception("添加文档失败")
  417. conn.rollback()
  418. return False, str(e), None
  419. finally:
  420. cursor.close()
  421. conn.close()
  422. async def edit_document(self, doc_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
  423. """编辑文档(同步主表和子表,解耦触发器)"""
  424. conn = get_db_connection()
  425. if not conn:
  426. return False, "数据库连接失败"
  427. cursor = conn.cursor()
  428. try:
  429. doc_id = doc_data.get('id')
  430. table_type = doc_data.get('table_type', 'basis')
  431. table_name = TABLE_MAP.get(table_type)
  432. # 安全转换字段
  433. release_date = self._to_date(doc_data.get('release_date'))
  434. # 处理 URL 存储(转为相对路径)
  435. file_url = self.minio_manager.get_relative_path(doc_data.get('file_url'))
  436. # 1. 更新主表
  437. cursor.execute(
  438. """
  439. UPDATE t_samp_document_main
  440. SET title = %s, file_url = %s, file_extension = %s,
  441. updated_by = %s, updated_time = NOW()
  442. WHERE id = %s
  443. """,
  444. (
  445. doc_data.get('title'), file_url, doc_data.get('file_extension'),
  446. updater_id, doc_id
  447. )
  448. )
  449. # 2. 更新子表
  450. if table_type == 'basis':
  451. cursor.execute(
  452. f"UPDATE {table_name} SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s, document_type = %s, professional_field = %s, validity = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
  453. (doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), doc_data.get('note'), updater_id, doc_id)
  454. )
  455. elif table_type == 'work':
  456. cursor.execute(
  457. f"UPDATE {table_name} SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s, level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
  458. (doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), doc_data.get('plan_category'), doc_data.get('level_1_classification'), doc_data.get('level_2_classification'), doc_data.get('level_3_classification'), doc_data.get('level_4_classification'), doc_data.get('note'), updater_id, doc_id)
  459. )
  460. elif table_type == 'job':
  461. cursor.execute(
  462. f"UPDATE {table_name} SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
  463. (doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, doc_data.get('note'), updater_id, doc_id)
  464. )
  465. conn.commit()
  466. return True, "文档更新成功"
  467. except Exception as e:
  468. logger.exception("编辑文档失败")
  469. conn.rollback()
  470. return False, str(e)
  471. finally:
  472. cursor.close()
  473. conn.close()
  474. async def enter_document(self, doc_id: str, username: str) -> Tuple[bool, str]:
  475. """文档入库(单个)"""
  476. affected_rows, message = await self.batch_enter_knowledge_base([doc_id], username)
  477. return affected_rows > 0, message
  478. async def get_basic_info_list(
  479. self,
  480. type: str,
  481. page: int,
  482. size: int,
  483. keyword: Optional[str] = None,
  484. **filters
  485. ) -> Tuple[List[Dict[str, Any]], int]:
  486. """获取基本信息列表(关联主表获取文件和转换状态)"""
  487. conn = get_db_connection()
  488. if not conn:
  489. return [], 0
  490. cursor = conn.cursor()
  491. try:
  492. # 根据类型选择表名和字段映射
  493. if type == 'basis':
  494. table_name = "t_samp_standard_base_info"
  495. # 关联主表字段:file_url, conversion_status, md_url, json_url
  496. fields = """
  497. s.id, s.chinese_name as title, s.standard_number as standard_no,
  498. s.issuing_authority, s.release_date, s.document_type,
  499. s.professional_field, s.validity, s.note, s.created_by, s.created_time,
  500. s.updated_by, s.updated_time,
  501. m.file_url, m.conversion_status, m.md_url, m.json_url
  502. """
  503. field_map = {
  504. 'title': 's.chinese_name',
  505. 'standard_no': 's.standard_number',
  506. 'issuing_authority': 's.issuing_authority',
  507. 'release_date': 's.release_date',
  508. 'document_type': 's.document_type',
  509. 'professional_field': 's.professional_field',
  510. 'validity': 's.validity'
  511. }
  512. elif type == 'work':
  513. table_name = "t_samp_construction_plan_base_info"
  514. fields = """
  515. s.id, s.plan_name as title, NULL as standard_no,
  516. s.project_name, s.project_section,
  517. s.compiling_unit as issuing_authority, s.compiling_date as release_date,
  518. NULL as document_type, NULL as professional_field, NULL as validity,
  519. s.plan_summary, s.compilation_basis,
  520. s.plan_category, s.level_1_classification, s.level_2_classification,
  521. s.level_3_classification, s.level_4_classification,
  522. s.note, s.created_by, s.created_time, s.updated_by, s.updated_time,
  523. m.file_url, m.conversion_status, m.md_url, m.json_url
  524. """
  525. field_map = {
  526. 'title': 's.plan_name',
  527. 'issuing_authority': 's.compiling_unit',
  528. 'release_date': 's.compiling_date',
  529. 'plan_category': 's.plan_category',
  530. 'level_1_classification': 's.level_1_classification',
  531. 'level_2_classification': 's.level_2_classification',
  532. 'level_3_classification': 's.level_3_classification',
  533. 'level_4_classification': 's.level_4_classification'
  534. }
  535. elif type == 'job':
  536. table_name = "t_samp_office_regulations"
  537. fields = """
  538. s.id, s.file_name as title, NULL as standard_no,
  539. s.issuing_department as issuing_authority, s.publish_date as release_date,
  540. s.document_type, NULL as professional_field, NULL as validity,
  541. s.note, s.created_by, s.created_time, s.updated_by, s.updated_time,
  542. m.file_url, m.conversion_status, m.md_url, m.json_url
  543. """
  544. field_map = {
  545. 'title': 's.file_name',
  546. 'issuing_authority': 's.issuing_department',
  547. 'release_date': 's.publish_date',
  548. 'document_type': 's.document_type'
  549. }
  550. else:
  551. return [], 0
  552. where_clauses = []
  553. params = []
  554. # 统一关键字搜索
  555. if keyword:
  556. if type == 'basis':
  557. where_clauses.append("(s.chinese_name LIKE %s OR s.standard_number LIKE %s)")
  558. params.extend([f"%{keyword}%", f"%{keyword}%"])
  559. elif type == 'work':
  560. where_clauses.append("s.plan_name LIKE %s")
  561. params.append(f"%{keyword}%")
  562. elif type == 'job':
  563. where_clauses.append("s.file_name LIKE %s")
  564. params.append(f"%{keyword}%")
  565. # 精细化检索
  566. for filter_key, filter_value in filters.items():
  567. if not filter_value:
  568. continue
  569. db_field = field_map.get(filter_key)
  570. if db_field:
  571. where_clauses.append(f"{db_field} = %s")
  572. params.append(filter_value)
  573. where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
  574. offset = (page - 1) * size
  575. # 使用 LEFT JOIN 关联主表
  576. sql = f"""
  577. SELECT {fields}
  578. FROM {table_name} s
  579. LEFT JOIN t_samp_document_main m ON s.id = m.id
  580. {where_sql}
  581. ORDER BY s.created_time DESC
  582. LIMIT %s OFFSET %s
  583. """
  584. params = params + [size, offset]
  585. cursor.execute(sql, tuple(params))
  586. items = cursor.fetchall()
  587. # 处理 URL 转换
  588. for item in items:
  589. for key in ['file_url', 'md_url', 'json_url']:
  590. if item.get(key):
  591. item[key] = self.minio_manager.get_full_url(item[key])
  592. # 总数
  593. count_sql = f"SELECT COUNT(*) as count FROM {table_name} s {where_sql}"
  594. cursor.execute(count_sql, tuple(params[:-2]))
  595. res = cursor.fetchone()
  596. total = res['count'] if res else 0
  597. return items, total
  598. except Exception as e:
  599. logger.exception(f"获取 {type} 列表失败")
  600. return [], 0
  601. finally:
  602. cursor.close()
  603. conn.close()
  604. # ==================== 文档转换 ====================
  605. async def get_document_source_type(self, doc_id: str) -> Optional[str]:
  606. """获取文档的source_type"""
  607. conn = get_db_connection()
  608. if not conn:
  609. return None
  610. cursor = conn.cursor()
  611. try:
  612. cursor.execute("SELECT source_type FROM t_samp_document_main WHERE id = %s", (doc_id,))
  613. res = cursor.fetchone()
  614. return res['source_type'] if res else None
  615. except Exception as e:
  616. logger.exception(f"获取文档source_type失败: {e}")
  617. return None
  618. finally:
  619. cursor.close()
  620. conn.close()
  621. async def get_document_title(self, doc_id: str) -> str:
  622. """获取文档标题"""
  623. conn = get_db_connection()
  624. if not conn:
  625. return "文档"
  626. cursor = conn.cursor()
  627. try:
  628. cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (doc_id,))
  629. res = cursor.fetchone()
  630. return res['title'] if res else "文档"
  631. except Exception as e:
  632. logger.exception(f"获取文档标题失败: {e}")
  633. return "文档"
  634. finally:
  635. cursor.close()
  636. conn.close()
  637. async def update_conversion_status(self, doc_id: str, status: int,
  638. md_url: Optional[str] = None,
  639. json_url: Optional[str] = None,
  640. error_message: Optional[str] = None) -> bool:
  641. """更新文档转换状态
  642. Args:
  643. doc_id: 文档ID
  644. status: 转换状态 (0=未转换, 1=转换中, 2=已完成, 3=失败)
  645. md_url: Markdown文件URL
  646. json_url: JSON文件URL
  647. error_message: 错误信息
  648. """
  649. conn = get_db_connection()
  650. if not conn:
  651. return False
  652. cursor = conn.cursor()
  653. try:
  654. update_clauses = ["conversion_status = %s"]
  655. params = [status]
  656. if error_message:
  657. update_clauses.append("conversion_error = %s")
  658. params.append(error_message)
  659. if md_url:
  660. update_clauses.append("md_url = %s")
  661. params.append(md_url)
  662. if json_url:
  663. update_clauses.append("json_url = %s")
  664. params.append(json_url)
  665. sql = f"UPDATE t_samp_document_main SET {', '.join(update_clauses)}, updated_time = NOW() WHERE id = %s"
  666. params.append(doc_id)
  667. cursor.execute(sql, tuple(params))
  668. conn.commit()
  669. return True
  670. except Exception as e:
  671. logger.exception(f"更新转换进度失败: {e}")
  672. conn.rollback()
  673. return False
  674. finally:
  675. cursor.close()
  676. conn.close()
  677. # ==================== 基础信息管理 ====================
  678. async def add_basic_info(self, type: str, data: Dict[str, Any], user_id: str) -> Tuple[bool, str]:
  679. """新增基本信息"""
  680. logger.info(f"Adding basic info for type {type}: {data}")
  681. conn = get_db_connection()
  682. if not conn:
  683. return False, "数据库连接失败"
  684. cursor = conn.cursor()
  685. try:
  686. table_name = TABLE_MAP.get(type)
  687. if not table_name:
  688. return False, "无效的类型"
  689. doc_id = str(uuid.uuid4())
  690. file_url = data.get('file_url')
  691. file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
  692. # 1. 插入主表 (解耦触发器,手动同步)
  693. cursor.execute(
  694. """
  695. INSERT INTO t_samp_document_main (
  696. id, title, source_type, file_url,
  697. file_extension, created_by, updated_by, created_time, updated_time,
  698. conversion_status
  699. ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0)
  700. """,
  701. (
  702. doc_id, data.get('title'), type, file_url,
  703. file_extension, user_id, user_id
  704. )
  705. )
  706. # 2. 插入子表 (移除 file_url,因为它现在只存储在主表中)
  707. if type == 'basis':
  708. sql = f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())"
  709. params = (doc_id, data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('document_type'), data.get('professional_field'), data.get('validity', '现行'), data.get('note'), user_id)
  710. elif type == 'work':
  711. sql = f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, plan_summary, compilation_basis, plan_category, level_1_classification, level_2_classification, level_3_classification, level_4_classification, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())"
  712. params = (doc_id, data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('plan_summary'), data.get('compilation_basis'), data.get('plan_category'), data.get('level_1_classification'), data.get('level_2_classification'), data.get('level_3_classification'), data.get('level_4_classification'), data.get('note'), user_id)
  713. elif type == 'job':
  714. sql = f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())"
  715. params = (doc_id, data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')), data.get('note'), user_id)
  716. else:
  717. return False, "不支持的类型"
  718. cursor.execute(sql, params)
  719. # 3. 添加到任务管理中心 (类型为 data)
  720. try:
  721. await task_service.add_task(doc_id, 'data')
  722. except Exception as task_err:
  723. logger.error(f"添加基本信息 {data.get('title')} 到任务中心失败: {task_err}")
  724. conn.commit()
  725. return True, "新增成功", doc_id
  726. except Exception as e:
  727. logger.exception("新增基本信息失败")
  728. conn.rollback()
  729. return False, str(e)
  730. finally:
  731. cursor.close()
  732. conn.close()
  733. async def edit_basic_info(self, type: str, info_id: str, data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
  734. """编辑基本信息"""
  735. logger.info(f"Editing basic info for type {type}, id {info_id}: {data}")
  736. conn = get_db_connection()
  737. if not conn:
  738. return False, "数据库连接失败"
  739. cursor = conn.cursor()
  740. try:
  741. table_name = TABLE_MAP.get(type)
  742. if not table_name:
  743. return False, "无效的类型"
  744. # 处理 URL 存储(转为相对路径)
  745. file_url = self.minio_manager.get_relative_path(data.get('file_url'))
  746. file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
  747. # 1. 更新主表 (解耦触发器)
  748. cursor.execute(
  749. """
  750. UPDATE t_samp_document_main
  751. SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW()
  752. WHERE id = %s
  753. """,
  754. (data.get('title'), file_url, file_extension, updater_id, info_id)
  755. )
  756. # 2. 更新子表 (移除 file_url)
  757. if type == 'basis':
  758. sql = f"""
  759. UPDATE {table_name}
  760. SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s,
  761. document_type = %s, professional_field = %s, validity = %s,
  762. english_name = %s, implementation_date = %s, drafting_unit = %s,
  763. approving_department = %s, participating_units = %s, engineering_phase = %s,
  764. reference_basis = %s, source_url = %s, note = %s,
  765. updated_by = %s, updated_time = NOW()
  766. WHERE id = %s
  767. """
  768. params = (
  769. data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
  770. data.get('document_type'), data.get('professional_field'), data.get('validity'),
  771. data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'),
  772. data.get('approving_department'), data.get('participating_units'), data.get('engineering_phase'),
  773. data.get('reference_basis'), data.get('source_url'), data.get('note'),
  774. updater_id, info_id
  775. )
  776. elif type == 'work':
  777. sql = f"""
  778. UPDATE {table_name}
  779. SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s,
  780. plan_summary = %s, compilation_basis = %s, plan_category = %s,
  781. level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s,
  782. note = %s, updated_by = %s, updated_time = NOW()
  783. WHERE id = %s
  784. """
  785. params = (
  786. data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
  787. data.get('plan_summary'), data.get('compilation_basis'), data.get('plan_category'),
  788. data.get('level_1_classification'), data.get('level_2_classification'), data.get('level_3_classification'), data.get('level_4_classification'),
  789. data.get('note'), updater_id, info_id
  790. )
  791. elif type == 'job':
  792. sql = f"""
  793. UPDATE {table_name}
  794. SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s,
  795. effective_start_date = %s, effective_end_date = %s, note = %s,
  796. updated_by = %s, updated_time = NOW()
  797. WHERE id = %s
  798. """
  799. params = (
  800. data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')),
  801. self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')), data.get('note'),
  802. updater_id, info_id
  803. )
  804. else:
  805. return False, "不支持的类型"
  806. cursor.execute(sql, params)
  807. conn.commit()
  808. return True, "编辑成功"
  809. except Exception as e:
  810. logger.exception("编辑基本信息失败")
  811. conn.rollback()
  812. return False, str(e)
  813. finally:
  814. cursor.close()
  815. conn.close()
  816. async def delete_basic_info(self, type: str, info_id: str) -> Tuple[bool, str]:
  817. """删除基本信息"""
  818. conn = get_db_connection()
  819. if not conn:
  820. return False, "数据库连接失败"
  821. cursor = conn.cursor()
  822. try:
  823. table_name = TABLE_MAP.get(type)
  824. if not table_name:
  825. return False, "无效的类型"
  826. # 1. 删除主表记录 (由于设置了 ON DELETE CASCADE,子表记录会自动删除)
  827. cursor.execute("DELETE FROM t_samp_document_main WHERE id = %s", (info_id,))
  828. # 同步删除任务管理中心的数据
  829. try:
  830. await task_service.delete_task(info_id)
  831. except Exception as task_err:
  832. logger.error(f"同步删除任务中心数据失败 (ID: {info_id}): {task_err}")
  833. conn.commit()
  834. return True, "删除成功"
  835. except Exception as e:
  836. logger.exception("删除基本信息失败")
  837. conn.rollback()
  838. return False, str(e)
  839. finally:
  840. cursor.close()
  841. conn.close()