sample_service.py 87 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913
  1. """
  2. 样本中心服务层
  3. 从 sample_view.py 提取的SQL查询逻辑
  4. """
  5. import logging
  6. import uuid
  7. from datetime import datetime
  8. from typing import Optional, List, Dict, Any, Tuple
  9. from app.base.async_mysql_connection import get_db_connection
  10. from app.base.minio_connection import get_minio_manager
  11. from app.core.config import config_handler
  12. from app.services.milvus_service import milvus_service
  13. from app.services.task_service import task_service
  14. logger = logging.getLogger(__name__)
  15. # 表名映射
  16. TABLE_MAP = {
  17. 'standard': 't_samp_standard_base_info',
  18. 'construction_plan': 't_samp_construction_plan_base_info',
  19. 'regulation': 't_samp_office_regulations'
  20. }
  21. # 文档类型映射 (前端显示 -> 数据库存储)
  22. DOCUMENT_TYPE_MAP = {
  23. "国家标准": "GB",
  24. "行业标准": "HY",
  25. "部门规章": "BM",
  26. "地方标准": "DB",
  27. "企业标准": "QY",
  28. "管理制度": "GL",
  29. "技术规范": "GF",
  30. "团体标准": "TT",
  31. "国际标准": "GJ",
  32. "国家法律": "FL",
  33. "地方法规": "LR",
  34. "其他": "QT"
  35. }
  36. # 文档类型反向映射 (数据库存储 -> 前端显示)
  37. # 包含旧版本简写以保证存量数据兼容性
  38. DOCUMENT_TYPE_REVERSE_MAP = {v: k for k, v in DOCUMENT_TYPE_MAP.items()}
  39. DOCUMENT_TYPE_REVERSE_MAP.update({
  40. "HB": "行业标准",
  41. "QB": "企业标准",
  42. "GLZD": "管理制度",
  43. "JSGF": "技术规范",
  44. "TB": "团体标准",
  45. "IB": "国际标准",
  46. "OTHER": "其他"
  47. })
  48. # 专业领域映射 (前端显示 -> 数据库存储)
  49. PROFESSIONAL_FIELD_MAP = {
  50. "法律法规": "FL",
  51. "通用标准": "TY",
  52. "勘察钻探": "KC",
  53. "地基基础": "DJ",
  54. "路基路面": "LJ",
  55. "桥梁工程": "QL",
  56. "隧道工程": "SD",
  57. "交通工程": "JT",
  58. "建筑工程": "JZ",
  59. "市政工程": "SZ",
  60. "机电安装": "JD",
  61. "路桥工程": "LB",
  62. "装饰装修": "ZS",
  63. "港口航道": "GK",
  64. "铁路工程": "TL",
  65. "房建工程": "FJ",
  66. "水利电力": "SL",
  67. "信息化": "XX",
  68. "试验检测": "SY",
  69. "安全环保": "AQ",
  70. "其他": "QT"
  71. }
  72. # 专业领域反向映射 (数据库存储 -> 前端显示)
  73. PROFESSIONAL_FIELD_REVERSE_MAP = {v: k for k, v in PROFESSIONAL_FIELD_MAP.items()}
  74. # 方案类别映射 (前端显示 -> 数据库存储)
  75. PLAN_CATEGORY_MAP = {
  76. "超危大方案": "CH",
  77. "超危大方案较大Ⅱ级": "CH2",
  78. "超危大方案特大Ⅳ级": "CH4",
  79. "超危大方案一般Ⅰ级": "CH1",
  80. "超危大方案重大Ⅲ级": "CH3",
  81. "危大方案": "WD",
  82. "一般方案": "YB"
  83. }
  84. # 方案类别反向映射 (数据库存储 -> 前端显示)
  85. PLAN_CATEGORY_REVERSE_MAP = {v: k for k, v in PLAN_CATEGORY_MAP.items()}
  86. # 一级分类映射 (前端显示 -> 数据库存储)
  87. FIRST_LEVEL_MAP = {
  88. "施工方案": "SC"
  89. }
  90. # 一级分类反向映射 (数据库存储 -> 前端显示)
  91. FIRST_LEVEL_REVERSE_MAP = {v: k for k, v in FIRST_LEVEL_MAP.items()}
  92. # 二级分类映射 (前端显示 -> 数据库存储)
  93. SECOND_LEVEL_MAP = {
  94. "临建工程": "LZ",
  95. "路基工程": "LJ",
  96. "桥梁工程": "QL",
  97. "隧道工程": "SD",
  98. "其他": "QT"
  99. }
  100. # 二级分类反向映射 (数据库存储 -> 前端显示)
  101. SECOND_LEVEL_REVERSE_MAP = {v: k for k, v in SECOND_LEVEL_MAP.items()}
  102. # 三级分类映射 (前端显示 -> 数据库存储)
  103. THIRD_LEVEL_MAP = {
  104. "TBM施工": "TM",
  105. "拌和站安、拆施工": "BH",
  106. "不良地质隧道施工": "BL",
  107. "常规桥梁": "CG",
  108. "挡土墙工程类": "DT",
  109. "辅助坑道施工": "FB",
  110. "复杂洞口工程施工": "FD",
  111. "钢筋加工场安、拆": "GG",
  112. "钢栈桥施工": "GZ",
  113. "拱桥": "GH",
  114. "涵洞工程类": "HD",
  115. "滑坡体处理类": "HP",
  116. "路堤": "LT",
  117. "路堑": "LQ",
  118. "其他": "QT",
  119. "深基坑": "JK",
  120. "隧道总体施工": "ZT",
  121. "特殊结构隧道": "TS",
  122. "斜拉桥": "XL",
  123. "悬索桥": "XS"
  124. }
  125. # 三级分类反向映射 (数据库存储 -> 前端显示)
  126. THIRD_LEVEL_REVERSE_MAP = {v: k for k, v in THIRD_LEVEL_MAP.items()}
  127. # 四级分类映射 (前端显示 -> 数据库存储)
  128. FOURTH_LEVEL_MAP = {
  129. "挡土墙": "DT",
  130. "顶管": "DG",
  131. "断层破碎带及软弱围岩": "DL",
  132. "钢筋砼箱涵": "GX",
  133. "高填路堤": "GT",
  134. "抗滑桩": "KH",
  135. "软岩大变形隧道": "RY",
  136. "上部结构": "SB",
  137. "深基坑开挖与支护": "JK",
  138. "深挖路堑": "LC",
  139. "隧道TBM": "TM",
  140. "隧道进洞": "JD",
  141. "隧道竖井": "SJ",
  142. "隧道斜井": "XJ",
  143. "特种设备": "TZ",
  144. "瓦斯隧道": "WS",
  145. "下部结构": "XB",
  146. "小净距隧道": "NJ",
  147. "岩爆隧道": "YB",
  148. "岩溶隧道": "YR",
  149. "涌水突泥隧道": "YN",
  150. "桩基础": "ZJ",
  151. "其他": "QT"
  152. }
  153. # 四级分类反向映射 (数据库存储 -> 前端显示)
  154. FOURTH_LEVEL_REVERSE_MAP = {v: k for k, v in FOURTH_LEVEL_MAP.items()}
  155. def get_table_name(source_type: str) -> Optional[str]:
  156. """根据source_type获取表名"""
  157. return TABLE_MAP.get(source_type)
  158. class SampleService:
  159. """样本中心服务类 - 使用 SQL 查询方式"""
  160. _initialized = False
  161. def __init__(self):
  162. """初始化服务"""
  163. # 使用统一的 MinIO 管理器
  164. self.minio_manager = get_minio_manager()
  165. # 使用全局 Milvus 服务
  166. self.milvus_service = milvus_service
  167. # 确保集合已创建 (仅在首次初始化时执行)
  168. if not SampleService._initialized:
  169. try:
  170. self.milvus_service.ensure_collections()
  171. SampleService._initialized = True
  172. except Exception as e:
  173. logger.error(f"初始化 Milvus 集合失败: {e}")
  174. def _format_document_row(self, item: Dict[str, Any]) -> Dict[str, Any]:
  175. """格式化文档行数据,处理URL、日期映射和文件名显示"""
  176. if not item:
  177. return item
  178. # 1. 处理 URL 转换
  179. for key in ['file_url', 'md_url', 'json_url']:
  180. if item.get(key):
  181. item[key] = self.minio_manager.get_full_url(item[key])
  182. # 2. 映射字段以适配前端通用显示
  183. source_type = item.get('source_type')
  184. # 处理文档类型显示 (简写 -> 中文)
  185. doc_type_code = item.get('document_type')
  186. if doc_type_code in DOCUMENT_TYPE_REVERSE_MAP:
  187. item['document_type'] = DOCUMENT_TYPE_REVERSE_MAP[doc_type_code]
  188. # 处理专业领域显示 (简写 -> 中文)
  189. prof_field_code = item.get('professional_field')
  190. if prof_field_code in PROFESSIONAL_FIELD_REVERSE_MAP:
  191. item['professional_field'] = PROFESSIONAL_FIELD_REVERSE_MAP[prof_field_code]
  192. # 处理一级分类显示 (简写 -> 中文)
  193. level_1_code = item.get('level_1_classification')
  194. if level_1_code in FIRST_LEVEL_REVERSE_MAP:
  195. item['level_1_classification'] = FIRST_LEVEL_REVERSE_MAP[level_1_code]
  196. # 处理二级分类显示 (简写 -> 中文)
  197. level_2_code = item.get('level_2_classification')
  198. if level_2_code in SECOND_LEVEL_REVERSE_MAP:
  199. item['level_2_classification'] = SECOND_LEVEL_REVERSE_MAP[level_2_code]
  200. # 处理三级分类显示 (简写 -> 中文)
  201. level_3_code = item.get('level_3_classification')
  202. if level_3_code in THIRD_LEVEL_REVERSE_MAP:
  203. item['level_3_classification'] = THIRD_LEVEL_REVERSE_MAP[level_3_code]
  204. # 处理四级分类显示 (简写 -> 中文)
  205. level_4_code = item.get('level_4_classification')
  206. if level_4_code in FOURTH_LEVEL_REVERSE_MAP:
  207. item['level_4_classification'] = FOURTH_LEVEL_REVERSE_MAP[level_4_code]
  208. # 处理方案类别显示 (简写 -> 中文)
  209. plan_category_code = item.get('plan_category')
  210. if plan_category_code in PLAN_CATEGORY_REVERSE_MAP:
  211. item['plan_category'] = PLAN_CATEGORY_REVERSE_MAP[plan_category_code]
  212. if source_type == 'standard':
  213. # 标准规范特有映射
  214. if 'standard_number' in item:
  215. item['standard_no'] = item.get('standard_number')
  216. elif source_type == 'construction_plan':
  217. # 施工方案特有映射
  218. item['issuing_authority'] = item.get('compiling_unit')
  219. item['release_date'] = item.get('compiling_date')
  220. elif source_type == 'regulation':
  221. # 办公制度特有映射
  222. item['issuing_authority'] = item.get('issuing_department')
  223. item['release_date'] = item.get('publish_date')
  224. # 3. 格式化时间
  225. date_keys = [
  226. 'created_time', 'updated_time', 'release_date',
  227. 'publish_date', 'compiling_date', 'implementation_date',
  228. 'effective_start_date', 'effective_end_date'
  229. ]
  230. for key in date_keys:
  231. val = item.get(key)
  232. if val and hasattr(val, 'isoformat'):
  233. item[key] = val.isoformat()
  234. elif val is not None:
  235. item[key] = str(val)
  236. # 4. 增加格式化文件名供前端显示
  237. if item.get('conversion_status') == 2:
  238. title = item.get('title', 'document')
  239. item['md_display_name'] = f"{title}.md"
  240. item['json_display_name'] = f"{title}.json"
  241. return item
  242. async def get_upload_url(self, filename: str, content_type: str, prefix: str = None) -> Tuple[bool, str, Dict[str, Any]]:
  243. """获取 MinIO 预签名上传 URL"""
  244. try:
  245. data = self.minio_manager.get_upload_url(filename, content_type, prefix=prefix)
  246. return True, "成功获取上传链接", data
  247. except Exception as e:
  248. logger.exception("生成上传链接失败")
  249. return False, f"生成上传链接失败: {str(e)}", {}
  250. # ==================== 文档管理 ====================
  251. async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str, kb_method: str = "general", chunk_size: int = 500, separator: str = "。") -> Tuple[int, str]:
  252. """批量将文档入库到知识库
  253. Args:
  254. doc_ids: 文档ID列表
  255. username: 操作人
  256. kb_method: 切分方法
  257. chunk_size: 切分长度
  258. separator: 切分符号
  259. """
  260. conn = get_db_connection()
  261. if not conn:
  262. return 0, "数据库连接失败,请检查数据库服务状态"
  263. cursor = conn.cursor()
  264. success_count = 0
  265. already_entered_count = 0
  266. failed_count = 0
  267. error_details = []
  268. try:
  269. # 1. 获取所有选中选中的文档详情
  270. placeholders = ','.join(['%s']*len(doc_ids))
  271. fetch_sql = f"""
  272. SELECT id, title, source_type, md_url, conversion_status, whether_to_enter, created_time, kb_id, file_url
  273. FROM t_samp_document_main
  274. WHERE id IN ({placeholders})
  275. """
  276. cursor.execute(fetch_sql, tuple(doc_ids))
  277. selected_docs = cursor.fetchall()
  278. if not selected_docs:
  279. return 0, "选中的文档在数据库中不存在"
  280. # 2. 逐份处理
  281. for doc in selected_docs:
  282. doc_id = doc['id']
  283. title = doc.get('title', '未命名文档')
  284. status = doc.get('conversion_status')
  285. whether_to_enter = doc.get('whether_to_enter', 0)
  286. md_url = doc.get('md_url')
  287. source_type = doc.get('source_type')
  288. # A. 检查是否已入库
  289. if whether_to_enter == 1:
  290. logger.info(f"文档 {title}({doc_id}) 已入库,跳过二次入库")
  291. already_entered_count += 1
  292. error_details.append(f"· {title}: 已在知识库中,无需重复入库")
  293. continue
  294. # B. 检查转换状态
  295. if status != 2:
  296. reason = "尚未转换成功" if status == 0 else "正在转换中" if status == 1 else "转换失败"
  297. logger.warning(f"文档 {title}({doc_id}) 状态为 {status},入库失败: {reason}")
  298. failed_count += 1
  299. error_details.append(f"· {title}: {reason}")
  300. continue
  301. if not md_url:
  302. logger.warning(f"文档 {title}({doc_id}) 缺少 md_url,入库失败")
  303. failed_count += 1
  304. error_details.append(f"· {title}: 转换结果地址丢失")
  305. continue
  306. # C. 确定入库策略 (从数据库读取已绑定的知识库)
  307. current_kb_id = doc.get('kb_id')
  308. current_kb_method = kb_method # 直接使用前端传来的切分方式
  309. if not current_kb_id:
  310. logger.warning(f"文档 {title}({doc_id}) 未指定知识库,跳过入库")
  311. failed_count += 1
  312. error_details.append(f"· {title}: 未指定目标知识库")
  313. continue
  314. if not current_kb_method:
  315. logger.warning(f"文档 {title}({doc_id}) 未指定切分方式,跳过入库")
  316. failed_count += 1
  317. error_details.append(f"· {title}: 未指定切分策略")
  318. continue
  319. # 获取知识库信息 (collection_name_parent, collection_name_children)
  320. kb_sql = "SELECT collection_name_parent, collection_name_children FROM t_samp_knowledge_base WHERE id = %s AND is_deleted = 0"
  321. cursor.execute(kb_sql, (current_kb_id,))
  322. kb_res = cursor.fetchone()
  323. if not kb_res:
  324. logger.warning(f"找不到指定的知识库: id={current_kb_id}")
  325. failed_count += 1
  326. error_details.append(f"· {title}: 指定的知识库不存在或已被删除")
  327. continue
  328. collection_name_parent = kb_res['collection_name_parent']
  329. collection_name_children = kb_res['collection_name_children']
  330. # D. 从 MinIO 获取 Markdown 内容
  331. try:
  332. md_content = self.minio_manager.get_object_content(md_url)
  333. if not md_content:
  334. raise ValueError(f"无法从 MinIO 读取内容 (URL: {md_url})")
  335. except Exception as minio_err:
  336. logger.error(f"读取文档 {title} 内容失败: {minio_err}")
  337. failed_count += 1
  338. error_details.append(f"· {title}: 读取云端文件失败")
  339. continue
  340. # E. 调用 MilvusService 进行切分和入库
  341. try:
  342. # 获取业务元数据
  343. business_metadata = {}
  344. if source_type in ['standard', 'basis']:
  345. std_sql = """
  346. SELECT id as basic_info_id, chinese_name, standard_number, issuing_authority,
  347. document_type, professional_field, validity
  348. FROM t_samp_standard_base_info
  349. WHERE id = %s
  350. """
  351. cursor.execute(std_sql, (doc_id,))
  352. business_metadata = cursor.fetchone() or {}
  353. elif source_type == 'construction_plan':
  354. plan_sql = """
  355. SELECT id as basic_info_id, plan_name, project_name, project_section, compiling_unit,
  356. compiling_date, plan_summary, plan_category,
  357. level_1_classification, level_2_classification,
  358. level_3_classification, level_4_classification
  359. FROM t_samp_construction_plan_base_info
  360. WHERE id = %s
  361. """
  362. cursor.execute(plan_sql, (doc_id,))
  363. business_metadata = cursor.fetchone() or {}
  364. elif source_type == 'regulation':
  365. reg_sql = """
  366. SELECT id as basic_info_id, file_name, issuing_department as issuing_authority,
  367. document_type, publish_date as release_date, note
  368. FROM t_samp_office_regulations
  369. WHERE id = %s
  370. """
  371. cursor.execute(reg_sql, (doc_id,))
  372. business_metadata = cursor.fetchone() or {}
  373. # 强制转换 document_type 为简写 (处理存量数据或异常情况)
  374. if 'document_type' in business_metadata and business_metadata['document_type']:
  375. doc_type = business_metadata['document_type']
  376. business_metadata['document_type'] = DOCUMENT_TYPE_MAP.get(doc_type, doc_type)
  377. # 强制转换 professional_field 为简写
  378. if 'professional_field' in business_metadata and business_metadata['professional_field']:
  379. prof_field = business_metadata['professional_field']
  380. business_metadata['professional_field'] = PROFESSIONAL_FIELD_MAP.get(prof_field, prof_field)
  381. # 强制转换 plan_category 为简写
  382. if 'plan_category' in business_metadata and business_metadata['plan_category']:
  383. plan_cat = business_metadata['plan_category']
  384. business_metadata['plan_category'] = PLAN_CATEGORY_MAP.get(plan_cat, plan_cat)
  385. # 强制转换 level_1_classification 为简写
  386. if 'level_1_classification' in business_metadata and business_metadata['level_1_classification']:
  387. l1_class = business_metadata['level_1_classification']
  388. business_metadata['level_1_classification'] = FIRST_LEVEL_MAP.get(l1_class, l1_class)
  389. # 强制转换 level_2_classification 为简写
  390. if 'level_2_classification' in business_metadata and business_metadata['level_2_classification']:
  391. l2_class = business_metadata['level_2_classification']
  392. business_metadata['level_2_classification'] = SECOND_LEVEL_MAP.get(l2_class, l2_class)
  393. # 强制转换 level_3_classification 为简写
  394. if 'level_3_classification' in business_metadata and business_metadata['level_3_classification']:
  395. l3_class = business_metadata['level_3_classification']
  396. business_metadata['level_3_classification'] = THIRD_LEVEL_MAP.get(l3_class, l3_class)
  397. # 强制转换 level_4_classification 为简写
  398. if 'level_4_classification' in business_metadata and business_metadata['level_4_classification']:
  399. l4_class = business_metadata['level_4_classification']
  400. business_metadata['level_4_classification'] = FOURTH_LEVEL_MAP.get(l4_class, l4_class)
  401. # 统一使用主表的 file_url,确保数据来源一致,入库时使用完整 URL
  402. raw_file_url = doc.get('file_url', '')
  403. business_metadata['file_url'] = self.minio_manager.get_full_url(raw_file_url) if raw_file_url else ''
  404. # 准备元数据
  405. current_date = int(datetime.now().strftime('%Y%m%d'))
  406. doc_info = {
  407. "doc_id": doc_id,
  408. "file_name": title,
  409. "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else current_date,
  410. "tags": "",
  411. "user_id": username, # 传递操作人作为 created_by
  412. "kb_id": current_kb_id,
  413. "kb_method": current_kb_method,
  414. "collection_name_parent": collection_name_parent,
  415. "collection_name_children": collection_name_children,
  416. "chunk_size": chunk_size,
  417. "separator": separator,
  418. "source_type": source_type,
  419. "business_metadata": business_metadata # 注入业务元数据
  420. }
  421. await self.milvus_service.insert_knowledge(md_content, doc_info)
  422. # F. 添加到任务管理中心 (类型为 data)
  423. try:
  424. await task_service.add_task(doc_id, 'data')
  425. except Exception as task_err:
  426. logger.error(f"添加文档 {title} 到任务中心失败: {task_err}")
  427. # G. 更新数据库状态
  428. update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 1, kb_id = %s, kb_method = %s, updated_by = %s, updated_time = NOW() WHERE id = %s"
  429. cursor.execute(update_sql, (current_kb_id, current_kb_method, username, doc_id))
  430. success_count += 1
  431. except Exception as milvus_err:
  432. logger.exception(f"文档 {title} 写入向量库失败")
  433. failed_count += 1
  434. error_details.append(f"· {title}: 写入向量库失败 ({str(milvus_err)})")
  435. continue
  436. conn.commit()
  437. # 构造详细的消息
  438. if success_count == len(doc_ids) and failed_count == 0 and already_entered_count == 0:
  439. msg = f"✅ 入库成功!共处理 {success_count} 份文档。"
  440. else:
  441. msg = f"📊 入库处理完成:\n· 成功:{success_count} 份\n"
  442. if already_entered_count > 0:
  443. msg += f"· 跳过:{already_entered_count} 份 (已入库)\n"
  444. if failed_count > 0:
  445. msg += f"· 失败:{failed_count} 份\n"
  446. if error_details:
  447. detailed_msg = msg + "\n⚠️ 详情说明:\n" + "\n".join(error_details)
  448. return success_count, detailed_msg
  449. return success_count, msg
  450. except Exception as e:
  451. logger.exception(f"文档批量入库异常: {e}")
  452. conn.rollback()
  453. return 0, f"操作异常: {str(e)}"
  454. finally:
  455. cursor.close()
  456. conn.close()
  457. async def batch_add_to_task(self, doc_ids: List[str], username: str, project_name: str, task_tags: Optional[List[str]] = None) -> Tuple[bool, str]:
  458. """批量将文档加入标注任务中心 (单表化)
  459. Args:
  460. doc_ids: 文档ID列表
  461. username: 操作人
  462. project_name: 项目名称 (作为 project_id)
  463. task_tags: 标注任务标签列表 (例如 ["标签1", "标签2"])
  464. """
  465. conn = get_db_connection()
  466. if not conn:
  467. return False, "数据库连接失败"
  468. cursor = conn.cursor()
  469. try:
  470. if not doc_ids:
  471. return False, "未指定要加入任务的文档 ID"
  472. # 0. 每次点击都生成一个新的项目 UUID (不再检查重名复用)
  473. import uuid
  474. project_id = str(uuid.uuid4())
  475. # 处理标签:转换为 JSON 字符串存储
  476. import json
  477. tag_str = json.dumps(task_tags, ensure_ascii=False) if task_tags else None
  478. # 1. 过滤掉未入库的文档
  479. placeholders = ', '.join(['%s'] * len(doc_ids))
  480. check_entered_sql = f"SELECT id FROM t_samp_document_main WHERE id IN ({placeholders}) AND whether_to_enter = 1"
  481. cursor.execute(check_entered_sql, doc_ids)
  482. entered_ids = [row['id'] for row in cursor.fetchall()]
  483. unentered_count = len(doc_ids) - len(entered_ids)
  484. if not entered_ids:
  485. return False, "所选文档均未入库,无法加入标注任务中心"
  486. # 2. 更新 whether_to_task 状态
  487. ids_to_add = entered_ids
  488. add_placeholders = ', '.join(['%s'] * len(ids_to_add))
  489. sql = f"UPDATE t_samp_document_main SET whether_to_task = 1, updated_by = %s, updated_time = NOW() WHERE id IN ({add_placeholders})"
  490. cursor.execute(sql, (username, *ids_to_add))
  491. # 3. 写入任务管理表 (单表逻辑)
  492. for doc_id in ids_to_add:
  493. try:
  494. # 获取业务元数据
  495. metadata_dict = {}
  496. try:
  497. # 定义需要过滤掉的非业务/内部状态字段
  498. EXCLUDE_FIELDS = {
  499. 'id', 'created_time', 'updated_time', 'created_by', 'updated_by',
  500. 'conversion_status', 'whether_to_enter', 'whether_to_task',
  501. 'kb_method', 'whether_to_delete'
  502. }
  503. # 查询主表和子表信息
  504. cursor.execute("SELECT * FROM t_samp_document_main WHERE id = %s", (doc_id,))
  505. doc_main = cursor.fetchone()
  506. if doc_main:
  507. # 基础元数据 (仅保留标题和来源类型等核心信息)
  508. for k, v in doc_main.items():
  509. if v is not None and v != '' and k not in EXCLUDE_FIELDS:
  510. metadata_dict[k] = v
  511. # 子表元数据
  512. source_type = doc_main.get('source_type')
  513. table_name = TABLE_MAP.get(source_type)
  514. if table_name:
  515. cursor.execute(f"SELECT * FROM {table_name} WHERE id = %s", (doc_id,))
  516. sub_data = cursor.fetchone()
  517. if sub_data:
  518. for k, v in sub_data.items():
  519. if v is not None and v != '' and k not in EXCLUDE_FIELDS:
  520. metadata_dict[k] = v
  521. # 递归格式化时间
  522. metadata_dict = task_service._serialize_datetime(metadata_dict)
  523. except Exception as meta_err:
  524. logger.warning(f"获取文档 {doc_id} 元数据失败: {meta_err}")
  525. await task_service.add_task(
  526. business_id=doc_id,
  527. task_type='data',
  528. project_id=project_id,
  529. project_name=project_name,
  530. tag=tag_str,
  531. metadata=json.dumps(metadata_dict, ensure_ascii=False) if metadata_dict else None
  532. )
  533. except Exception as e:
  534. logger.exception(f"添加文档 {doc_id} 到任务中心失败: {e}")
  535. conn.commit()
  536. # 4. 自动推送至外部标注平台
  537. push_success, push_msg = await task_service.send_to_external_platform(project_id)
  538. msg = f"成功将 {len(ids_to_add)} 份文档加入项目: {project_name}"
  539. if push_success:
  540. msg += f" (已推送: {push_msg})"
  541. else:
  542. msg += f" (推送失败: {push_msg})"
  543. if unentered_count > 0:
  544. msg += f",{unentered_count} 份文档因未入库被跳过"
  545. return True, msg
  546. except Exception as e:
  547. logger.exception("批量加入任务失败")
  548. if conn:
  549. conn.rollback()
  550. return False, f"操作失败: {str(e)}"
  551. finally:
  552. if cursor:
  553. cursor.close()
  554. if conn:
  555. conn.close()
  556. async def clear_knowledge_base_data(self, doc_ids: List[str], username: str) -> Tuple[int, str]:
  557. """批量清空文档在知识库(Milvus)中的数据片段"""
  558. conn = get_db_connection()
  559. if not conn:
  560. return 0, "数据库连接失败"
  561. cursor = conn.cursor()
  562. success_count = 0
  563. error_details = []
  564. try:
  565. # 1. 获取文档的知识库信息
  566. placeholders = ', '.join(['%s'] * len(doc_ids))
  567. sql = f"""
  568. SELECT id, title, kb_id, whether_to_enter
  569. FROM t_samp_document_main
  570. WHERE id IN ({placeholders})
  571. """
  572. cursor.execute(sql, doc_ids)
  573. docs = cursor.fetchall()
  574. for doc in docs:
  575. doc_id = doc['id']
  576. title = doc.get('title', '未命名')
  577. kb_id = doc.get('kb_id')
  578. # 只有入库了的才需要清空
  579. if doc.get('whether_to_enter') != 1:
  580. continue
  581. if not kb_id:
  582. error_details.append(f"· {title}: 未关联知识库,无法清空")
  583. continue
  584. # 获取集合名称
  585. cursor.execute("SELECT collection_name_parent, collection_name_children FROM t_samp_knowledge_base WHERE id = %s", (kb_id,))
  586. kb_info = cursor.fetchone()
  587. if not kb_info:
  588. error_details.append(f"· {title}: 找不到关联的知识库配置")
  589. continue
  590. # 2. 从 Milvus 删除
  591. try:
  592. collections = [kb_info.get('collection_name_parent'), kb_info.get('collection_name_children')]
  593. for coll_name in collections:
  594. if coll_name and self.milvus_service.client.has_collection(coll_name):
  595. # 使用 document_id 进行过滤删除
  596. self.milvus_service.client.delete(
  597. collection_name=coll_name,
  598. filter=f'document_id == "{doc_id}"'
  599. )
  600. # 3. 更新数据库状态
  601. update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 0, updated_by = %s, updated_time = NOW() WHERE id = %s"
  602. cursor.execute(update_sql, (username, doc_id))
  603. success_count += 1
  604. except Exception as milvus_err:
  605. logger.error(f"清空文档 {title} Milvus 数据失败: {milvus_err}")
  606. error_details.append(f"· {title}: 清空向量库数据失败")
  607. conn.commit()
  608. msg = f"成功清空 {success_count} 份文档的知识库片段"
  609. if error_details:
  610. msg += "\n部分操作受限:\n" + "\n".join(error_details)
  611. return success_count, msg
  612. except Exception as e:
  613. logger.exception("批量清空知识库数据失败")
  614. conn.rollback()
  615. return 0, f"操作失败: {str(e)}"
  616. finally:
  617. cursor.close()
  618. conn.close()
  619. async def batch_delete_documents(self, doc_ids: List[str]) -> Tuple[int, str]:
  620. """批量删除文档 (仅限未入库或已清空的文档)"""
  621. conn = get_db_connection()
  622. if not conn:
  623. return 0, "数据库连接失败"
  624. cursor = conn.cursor()
  625. try:
  626. if not doc_ids:
  627. return 0, "未指定要删除的文档 ID"
  628. # 1. 检查文档状态:已入库的文档不允许直接删除
  629. placeholders = ', '.join(['%s'] * len(doc_ids))
  630. check_sql = f"SELECT id, title FROM t_samp_document_main WHERE id IN ({placeholders}) AND whether_to_enter = 1"
  631. cursor.execute(check_sql, doc_ids)
  632. entered_docs = cursor.fetchall()
  633. if entered_docs:
  634. titles = [d['title'] for d in entered_docs]
  635. return 0, f"删除失败:以下文档已入库,请先执行‘清空数据’操作后再删除:\n{', '.join(titles[:5])}{'...' if len(titles)>5 else ''}"
  636. # 2. 执行物理删除
  637. # 尝试同步删除子表中的数据
  638. try:
  639. for sub_table in TABLE_MAP.values():
  640. sub_sql = f"DELETE FROM {sub_table} WHERE id IN ({placeholders})"
  641. try:
  642. cursor.execute(sub_sql, doc_ids)
  643. except Exception as sub_e:
  644. logger.error(f"删除子表 {sub_table} 数据失败: {sub_e}")
  645. except Exception as sync_e:
  646. logger.error(f"同步删除子表数据失败: {sync_e}")
  647. # 删除主表 t_samp_document_main 中的数据
  648. sql_main = f"DELETE FROM t_samp_document_main WHERE id IN ({placeholders})"
  649. cursor.execute(sql_main, doc_ids)
  650. affected_rows = cursor.rowcount
  651. # 同步删除任务管理中心的数据
  652. try:
  653. from app.services.task_service import task_service
  654. for doc_id in doc_ids:
  655. # 注意:task_service 的删除方法可能叫 delete_task_by_business_id 或类似,这里假设存在
  656. # 如果没有,我们需要查出主键 ID 再删
  657. cursor.execute("DELETE FROM t_task_management WHERE business_id = %s", (doc_id,))
  658. except Exception as task_err:
  659. logger.error(f"同步删除任务中心数据失败: {task_err}")
  660. conn.commit()
  661. return affected_rows, f"成功删除 {affected_rows} 条文档数据"
  662. except Exception as e:
  663. logger.exception("批量删除失败")
  664. conn.rollback()
  665. return 0, f"批量删除失败: {str(e)}"
  666. finally:
  667. cursor.close()
  668. conn.close()
  669. async def get_tag_tree(self) -> List[Dict[str, Any]]:
  670. """获取标签层级树 (从 t_samp_tag_category 查询)"""
  671. conn = get_db_connection()
  672. if not conn:
  673. return []
  674. cursor = conn.cursor()
  675. try:
  676. sql = """
  677. SELECT id, parent_id, name, level, type
  678. FROM t_samp_tag_category
  679. WHERE is_deleted = 0 AND status = 1
  680. ORDER BY level ASC, sort_no ASC
  681. """
  682. cursor.execute(sql)
  683. tags = cursor.fetchall()
  684. # 构建树形结构
  685. tag_dict = {tag['id']: {**tag, 'children': []} for tag in tags}
  686. tree = []
  687. for tag_id, tag_item in tag_dict.items():
  688. parent_id = tag_item['parent_id']
  689. if parent_id == 0:
  690. tree.append(tag_item)
  691. elif parent_id in tag_dict:
  692. tag_dict[parent_id]['children'].append(tag_item)
  693. return tree
  694. except Exception as e:
  695. logger.error(f"获取标签树失败: {e}")
  696. return []
  697. finally:
  698. cursor.close()
  699. conn.close()
  700. async def get_document_list(
  701. self,
  702. whether_to_enter: Optional[int] = None,
  703. conversion_status: Optional[int] = None,
  704. keyword: Optional[str] = None,
  705. table_type: Optional[str] = None,
  706. plan_category: Optional[str] = None,
  707. level_2_classification: Optional[str] = None,
  708. level_3_classification: Optional[str] = None,
  709. level_4_classification: Optional[str] = None,
  710. page: int = 1,
  711. size: int = 50
  712. ) -> Tuple[List[Dict[str, Any]], int, int, int]:
  713. """获取文档列表 (支持关联查询子表)"""
  714. conn = get_db_connection()
  715. if not conn:
  716. return [], 0, 0, 0
  717. cursor = conn.cursor()
  718. try:
  719. where_clauses = []
  720. params = []
  721. # 基础查询
  722. if table_type:
  723. where_clauses.append("m.source_type = %s")
  724. params.append(table_type)
  725. if table_type in TABLE_MAP:
  726. # 如果指定了类型且在 TABLE_MAP 中,使用 LEFT JOIN 关联查询,以便搜索子表字段
  727. sub_table = TABLE_MAP[table_type]
  728. from_sql = f"""
  729. t_samp_document_main m
  730. LEFT JOIN {sub_table} s ON m.id = s.id
  731. LEFT JOIN t_sys_user u1 ON m.created_by = u1.id
  732. LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id
  733. LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id
  734. """
  735. fields_sql = "m.*, s.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name, m.id as id"
  736. # 施工方案特有的过滤字段
  737. if table_type == 'construction_plan':
  738. if plan_category:
  739. where_clauses.append("s.plan_category = %s")
  740. params.append(plan_category)
  741. if level_2_classification:
  742. where_clauses.append("s.level_2_classification = %s")
  743. params.append(SECOND_LEVEL_MAP.get(level_2_classification, level_2_classification))
  744. if level_3_classification:
  745. where_clauses.append("s.level_3_classification = %s")
  746. params.append(THIRD_LEVEL_MAP.get(level_3_classification, level_3_classification))
  747. if level_4_classification:
  748. where_clauses.append("s.level_4_classification = %s")
  749. params.append(FOURTH_LEVEL_MAP.get(level_4_classification, level_4_classification))
  750. else:
  751. from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id"
  752. fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name"
  753. else:
  754. from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id"
  755. fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name"
  756. order_sql = "m.created_time DESC"
  757. title_field = "m.title"
  758. # 分离 whether_to_enter 与 conversion_status 的过滤逻辑
  759. if whether_to_enter is not None:
  760. where_clauses.append("m.whether_to_enter = %s")
  761. params.append(whether_to_enter)
  762. if conversion_status is not None:
  763. where_clauses.append("m.conversion_status = %s")
  764. params.append(conversion_status)
  765. if keyword:
  766. where_clauses.append(f"{title_field} LIKE %s")
  767. params.append(f"%{keyword}%")
  768. where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
  769. offset = (page - 1) * size
  770. sql = f"SELECT {fields_sql} FROM {from_sql} {where_sql} ORDER BY {order_sql} LIMIT %s OFFSET %s"
  771. params.extend([size, offset])
  772. cursor.execute(sql, tuple(params))
  773. items = [self._format_document_row(row) for row in cursor.fetchall()]
  774. # 总数
  775. count_sql = f"SELECT COUNT(*) as count FROM {from_sql} {where_sql}"
  776. cursor.execute(count_sql, tuple(params[:-2]))
  777. res = cursor.fetchone()
  778. total = res['count'] if res else 0
  779. # 统计数据
  780. cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main")
  781. res = cursor.fetchone()
  782. all_total = res['count'] if res else 0
  783. cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main WHERE whether_to_enter = 1")
  784. res = cursor.fetchone()
  785. total_entered = res['count'] if res else 0
  786. return items, total, all_total, total_entered
  787. except Exception as e:
  788. logger.exception("获取文档列表失败")
  789. return [], 0, 0, 0
  790. finally:
  791. cursor.close()
  792. conn.close()
  793. async def get_document_detail(self, doc_id: str) -> Optional[Dict[str, Any]]:
  794. """获取文档详情 (关联查询子表)"""
  795. conn = get_db_connection()
  796. if not conn:
  797. return None
  798. cursor = conn.cursor()
  799. try:
  800. # 1. 查询主表 (关联用户表获取姓名)
  801. cursor.execute("""
  802. SELECT m.*, u1.username as creator_name, u2.username as updater_name
  803. FROM t_samp_document_main m
  804. LEFT JOIN t_sys_user u1 ON m.created_by = u1.id
  805. LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id
  806. WHERE m.id = %s
  807. """, (doc_id,))
  808. doc = cursor.fetchone()
  809. if not doc:
  810. return None
  811. # 2. 根据 source_type 查询对应的子表信息
  812. source_type = doc.get('source_type')
  813. table_name = TABLE_MAP.get(source_type)
  814. if table_name:
  815. # 关联子表的所有字段
  816. sub_sql = f"SELECT * FROM {table_name} WHERE id = %s"
  817. cursor.execute(sub_sql, (doc_id,))
  818. sub_data = cursor.fetchone()
  819. if sub_data:
  820. # 将子表字段合并到 doc 中,方便前端使用
  821. # 注意:如果字段名冲突,子表字段会覆盖主表字段(除了 id)
  822. sub_data.pop('id', None)
  823. doc.update(sub_data)
  824. return self._format_document_row(doc)
  825. except Exception as e:
  826. logger.exception("获取文档详情失败")
  827. return None
  828. finally:
  829. cursor.close()
  830. conn.close()
  831. def _to_int(self, value: Any) -> Optional[int]:
  832. """安全转换为整数"""
  833. if value is None or value == '' or str(value).lower() == 'null':
  834. return None
  835. try:
  836. return int(value)
  837. except (ValueError, TypeError):
  838. return None
  839. def _to_date(self, value: Any) -> Optional[str]:
  840. """安全处理日期字符串"""
  841. if value is None or value == '' or str(value).lower() == 'null':
  842. return None
  843. # 如果已经是 datetime.date 或 datetime.datetime 对象
  844. if hasattr(value, 'strftime'):
  845. return value.strftime('%Y-%m-%d')
  846. return str(value)
  847. async def add_document(self, doc_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]:
  848. """添加新文档(先主表后子表,解耦触发器)"""
  849. conn = get_db_connection()
  850. if not conn:
  851. return False, "数据库连接失败", None
  852. cursor = conn.cursor()
  853. try:
  854. doc_id = str(uuid.uuid4())
  855. table_type = doc_data.get('table_type', 'standard')
  856. table_name = TABLE_MAP.get(table_type)
  857. # 安全转换字段
  858. release_date = self._to_date(doc_data.get('release_date'))
  859. # 处理 URL 存储(转为相对路径)
  860. file_url = self.minio_manager.get_relative_path(doc_data.get('file_url'))
  861. # 处理文档类型 (中文 -> 简写)
  862. doc_type_cn = doc_data.get('document_type')
  863. doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn) # 找不到则保持原样
  864. # 处理专业领域 (中文 -> 简写)
  865. prof_field_cn = doc_data.get('professional_field')
  866. prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
  867. # 处理方案类别 (中文 -> 简写)
  868. plan_category_cn = doc_data.get('plan_category')
  869. plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
  870. # 处理一级分类 (中文 -> 简写)
  871. level_1_cn = doc_data.get('level_1_classification')
  872. level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
  873. # 处理二级分类 (中文 -> 简写)
  874. level_2_cn = doc_data.get('level_2_classification')
  875. level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
  876. # 处理三级分类 (中文 -> 简写)
  877. level_3_cn = doc_data.get('level_3_classification')
  878. level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
  879. # 处理四级分类 (中文 -> 简写)
  880. level_4_cn = doc_data.get('level_4_classification')
  881. level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
  882. # 处理四级分类 (中文 -> 简写)
  883. level_4_cn = doc_data.get('level_4_classification')
  884. level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
  885. # 处理四级分类 (中文 -> 简写)
  886. level_4_cn = doc_data.get('level_4_classification')
  887. level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
  888. # 1. 插入主表 (作为资产中心)
  889. cursor.execute(
  890. """
  891. INSERT INTO t_samp_document_main (
  892. id, title, source_type, file_url,
  893. file_extension, created_by, updated_by, created_time, updated_time,
  894. conversion_status, whether_to_task, kb_id
  895. ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s)
  896. """,
  897. (
  898. doc_id, doc_data.get('title'), table_type, file_url,
  899. doc_data.get('file_extension'), user_id, user_id,
  900. doc_data.get('kb_id')
  901. )
  902. )
  903. # 2. 插入子表 (仅存储业务字段)
  904. if table_type == 'standard':
  905. cursor.execute(
  906. f"""
  907. INSERT INTO {table_name} (
  908. id, chinese_name, english_name, standard_number, issuing_authority,
  909. release_date, implementation_date, drafting_unit, approving_department,
  910. participating_units, document_type, professional_field, engineering_phase,
  911. validity, reference_basis, source_url, note,
  912. created_by, updated_by, created_time, updated_time
  913. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  914. """,
  915. (
  916. doc_id, doc_data.get('title'), doc_data.get('english_name'), doc_data.get('standard_no'),
  917. doc_data.get('issuing_authority'), release_date, self._to_date(doc_data.get('implementation_date')),
  918. doc_data.get('drafting_unit'), doc_data.get('approving_department'), doc_data.get('participating_units'),
  919. doc_type_code, prof_field_code, doc_data.get('engineering_phase'),
  920. doc_data.get('validity', '现行'), doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'),
  921. user_id, user_id
  922. )
  923. )
  924. elif table_type == 'construction_plan':
  925. cursor.execute(
  926. f"""
  927. INSERT INTO {table_name} (
  928. id, plan_name, project_name, project_section, compiling_unit,
  929. compiling_date, plan_summary, compilation_basis, plan_category,
  930. level_1_classification, level_2_classification, level_3_classification, level_4_classification,
  931. note, created_by, updated_by, created_time, updated_time
  932. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  933. """,
  934. (
  935. doc_id, doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'),
  936. doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'),
  937. doc_data.get('compilation_basis'), plan_category_code, level_1_code or 'SC',
  938. level_2_code, level_3_code, level_4_code,
  939. doc_data.get('note'), user_id, user_id
  940. )
  941. )
  942. elif table_type == 'regulation':
  943. cursor.execute(
  944. f"""
  945. INSERT INTO {table_name} (
  946. id, file_name, issuing_department, document_type, publish_date,
  947. effective_start_date, effective_end_date, note,
  948. created_by, updated_by, created_time, updated_time
  949. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  950. """,
  951. (
  952. doc_id, doc_data.get('title'), doc_data.get('issuing_authority'), doc_type_code,
  953. release_date, self._to_date(doc_data.get('effective_start_date')), self._to_date(doc_data.get('effective_end_date')),
  954. doc_data.get('note'), user_id, user_id
  955. )
  956. )
  957. # 3. 添加到任务管理中心 (类型为 data)
  958. try:
  959. await task_service.add_task(doc_id, 'data')
  960. except Exception as task_err:
  961. logger.error(f"添加文档 {doc_data.get('title')} 到任务中心失败: {task_err}")
  962. conn.commit()
  963. return True, "文档添加成功", doc_id
  964. except Exception as e:
  965. logger.exception("添加文档失败")
  966. conn.rollback()
  967. return False, str(e), None
  968. finally:
  969. cursor.close()
  970. conn.close()
  971. async def edit_document(self, doc_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
  972. """编辑文档(同步主表和子表,解耦触发器)"""
  973. conn = get_db_connection()
  974. if not conn:
  975. return False, "数据库连接失败"
  976. cursor = conn.cursor()
  977. try:
  978. doc_id = doc_data.get('id')
  979. table_type = doc_data.get('table_type', 'standard')
  980. table_name = TABLE_MAP.get(table_type)
  981. # 安全转换字段
  982. release_date = self._to_date(doc_data.get('release_date'))
  983. # 处理 URL 存储(转为相对路径)
  984. file_url = self.minio_manager.get_relative_path(doc_data.get('file_url'))
  985. # 处理文档类型 (中文 -> 简写)
  986. doc_type_cn = doc_data.get('document_type')
  987. doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn) # 找不到则保持原样
  988. # 处理专业领域 (中文 -> 简写)
  989. prof_field_cn = doc_data.get('professional_field')
  990. prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
  991. # 处理方案类别 (中文 -> 简写)
  992. plan_category_cn = doc_data.get('plan_category')
  993. plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
  994. # 处理一级分类 (中文 -> 简写)
  995. level_1_cn = doc_data.get('level_1_classification')
  996. level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
  997. # 处理二级分类 (中文 -> 简写)
  998. level_2_cn = doc_data.get('level_2_classification')
  999. level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
  1000. # 处理三级分类 (中文 -> 简写)
  1001. level_3_cn = doc_data.get('level_3_classification')
  1002. level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
  1003. # 处理四级分类 (中文 -> 简写)
  1004. level_4_cn = doc_data.get('level_4_classification')
  1005. level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
  1006. # 1. 更新主表
  1007. cursor.execute(
  1008. """
  1009. UPDATE t_samp_document_main SET
  1010. title = %s, file_url = %s, file_extension = %s,
  1011. updated_by = %s, updated_time = NOW(), kb_id = %s
  1012. WHERE id = %s
  1013. """,
  1014. (
  1015. doc_data.get('title'), file_url, doc_data.get('file_extension'),
  1016. updater_id, doc_data.get('kb_id'), doc_id
  1017. )
  1018. )
  1019. # 2. 更新子表
  1020. if table_type == 'standard':
  1021. cursor.execute(
  1022. f"""
  1023. UPDATE {table_name} SET
  1024. chinese_name = %s, english_name = %s, standard_number = %s, issuing_authority = %s,
  1025. release_date = %s, implementation_date = %s, drafting_unit = %s, approving_department = %s,
  1026. participating_units = %s, document_type = %s, professional_field = %s, engineering_phase = %s,
  1027. validity = %s, reference_basis = %s, source_url = %s, note = %s,
  1028. updated_by = %s, updated_time = NOW()
  1029. WHERE id = %s
  1030. """,
  1031. (
  1032. doc_data.get('title'), doc_data.get('english_name'), doc_data.get('standard_no'), doc_data.get('issuing_authority'),
  1033. release_date, self._to_date(doc_data.get('implementation_date')), doc_data.get('drafting_unit'), doc_data.get('approving_department'),
  1034. doc_data.get('participating_units'), doc_type_code, prof_field_code, doc_data.get('engineering_phase'),
  1035. doc_data.get('validity'), doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'),
  1036. updater_id, doc_id
  1037. )
  1038. )
  1039. elif table_type == 'construction_plan':
  1040. cursor.execute(
  1041. f"""
  1042. UPDATE {table_name} SET
  1043. plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s,
  1044. compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s,
  1045. level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s,
  1046. note = %s, updated_by = %s, updated_time = NOW()
  1047. WHERE id = %s
  1048. """,
  1049. (
  1050. doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'),
  1051. release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), plan_category_code,
  1052. level_1_code, level_2_code, level_3_code, level_4_code,
  1053. doc_data.get('note'), updater_id, doc_id
  1054. )
  1055. )
  1056. elif table_type == 'regulation':
  1057. cursor.execute(
  1058. f"""
  1059. UPDATE {table_name} SET
  1060. file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s,
  1061. effective_start_date = %s, effective_end_date = %s, note = %s,
  1062. updated_by = %s, updated_time = NOW()
  1063. WHERE id = %s
  1064. """,
  1065. (
  1066. doc_data.get('title'), doc_data.get('issuing_authority'), doc_type_code, release_date,
  1067. self._to_date(doc_data.get('effective_start_date')), self._to_date(doc_data.get('effective_end_date')), doc_data.get('note'),
  1068. updater_id, doc_id
  1069. )
  1070. )
  1071. conn.commit()
  1072. return True, "文档更新成功"
  1073. except Exception as e:
  1074. logger.exception("编辑文档失败")
  1075. conn.rollback()
  1076. return False, str(e)
  1077. finally:
  1078. cursor.close()
  1079. conn.close()
  1080. async def enter_document(self, doc_id: str, username: str) -> Tuple[bool, str]:
  1081. """文档入库(单个)"""
  1082. affected_rows, message = await self.batch_enter_knowledge_base([doc_id], username)
  1083. return affected_rows > 0, message
  1084. async def get_basic_info_list(
  1085. self,
  1086. type: str,
  1087. page: int,
  1088. size: int,
  1089. keyword: Optional[str] = None,
  1090. **filters
  1091. ) -> Tuple[List[Dict[str, Any]], int]:
  1092. """获取基本信息列表(关联主表获取文件和转换状态)"""
  1093. conn = get_db_connection()
  1094. if not conn:
  1095. return [], 0
  1096. cursor = conn.cursor()
  1097. try:
  1098. # 根据类型选择表名和字段映射
  1099. if type == 'standard':
  1100. table_name = "t_samp_standard_base_info"
  1101. # 关联主表字段:file_url, conversion_status, md_url, json_url
  1102. fields = """
  1103. s.id, s.chinese_name as title, s.standard_number as standard_no,
  1104. s.issuing_authority, s.release_date, s.document_type,
  1105. s.professional_field, s.validity, s.note,
  1106. s.participating_units, s.reference_basis,
  1107. s.created_by, u1.username as creator_name, s.created_time,
  1108. s.updated_by, u2.username as updater_name, s.updated_time,
  1109. m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
  1110. """
  1111. field_map = {
  1112. 'title': 's.chinese_name',
  1113. 'standard_no': 's.standard_number',
  1114. 'issuing_authority': 's.issuing_authority',
  1115. 'release_date': 's.release_date',
  1116. 'document_type': 's.document_type',
  1117. 'professional_field': 's.professional_field',
  1118. 'validity': 's.validity'
  1119. }
  1120. elif type == 'construction_plan':
  1121. table_name = "t_samp_construction_plan_base_info"
  1122. fields = """
  1123. s.id, s.plan_name as title, NULL as standard_no,
  1124. s.project_name, s.project_section,
  1125. s.compiling_unit as issuing_authority, s.compiling_date as release_date,
  1126. NULL as document_type, NULL as professional_field, NULL as validity,
  1127. s.plan_summary, s.compilation_basis,
  1128. s.plan_category, s.level_1_classification, s.level_2_classification,
  1129. s.level_3_classification, s.level_4_classification,
  1130. s.note,
  1131. s.created_by, u1.username as creator_name, s.created_time,
  1132. s.updated_by, u2.username as updater_name, s.updated_time,
  1133. m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
  1134. """
  1135. field_map = {
  1136. 'title': 's.plan_name',
  1137. 'issuing_authority': 's.compiling_unit',
  1138. 'release_date': 's.compiling_date',
  1139. 'plan_category': 's.plan_category',
  1140. 'level_1_classification': 's.level_1_classification',
  1141. 'level_2_classification': 's.level_2_classification',
  1142. 'level_3_classification': 's.level_3_classification',
  1143. 'level_4_classification': 's.level_4_classification'
  1144. }
  1145. elif type == 'regulation':
  1146. table_name = "t_samp_office_regulations"
  1147. fields = """
  1148. s.id, s.file_name as title, NULL as standard_no,
  1149. s.issuing_department as issuing_authority, s.publish_date as release_date,
  1150. s.document_type, NULL as professional_field, NULL as validity,
  1151. s.note,
  1152. s.created_by, u1.username as creator_name, s.created_time,
  1153. s.updated_by, u2.username as updater_name, s.updated_time,
  1154. m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
  1155. """
  1156. field_map = {
  1157. 'title': 's.file_name',
  1158. 'issuing_authority': 's.issuing_department',
  1159. 'release_date': 's.publish_date',
  1160. 'document_type': 's.document_type'
  1161. }
  1162. else:
  1163. return [], 0
  1164. where_clauses = []
  1165. params = []
  1166. # 统一关键字搜索
  1167. if keyword:
  1168. if type == 'standard':
  1169. where_clauses.append("(s.chinese_name LIKE %s OR s.standard_number LIKE %s)")
  1170. params.extend([f"%{keyword}%", f"%{keyword}%"])
  1171. elif type == 'construction_plan':
  1172. where_clauses.append("s.plan_name LIKE %s")
  1173. params.append(f"%{keyword}%")
  1174. field_map['issuing_authority'] = 's.compiling_unit'
  1175. field_map['release_date'] = 's.compiling_date'
  1176. elif type == 'regulation':
  1177. where_clauses.append("s.file_name LIKE %s")
  1178. params.append(f"%{keyword}%")
  1179. field_map['issuing_authority'] = 's.issuing_department'
  1180. field_map['release_date'] = 's.publish_date'
  1181. # 精细化检索
  1182. for filter_key, filter_value in filters.items():
  1183. if not filter_value:
  1184. continue
  1185. # 特殊处理日期范围
  1186. date_field = field_map.get('release_date', 's.release_date')
  1187. if filter_key == 'release_date_start':
  1188. where_clauses.append(f"{date_field} >= %s")
  1189. params.append(filter_value)
  1190. continue
  1191. if filter_key == 'release_date_end':
  1192. where_clauses.append(f"{date_field} <= %s")
  1193. params.append(filter_value)
  1194. continue
  1195. db_field = field_map.get(filter_key)
  1196. if db_field:
  1197. # 处理文档类型查询 (前端传中文 -> 数据库查简写)
  1198. if filter_key == 'document_type':
  1199. filter_value = DOCUMENT_TYPE_MAP.get(filter_value, filter_value)
  1200. # 处理专业领域查询 (前端传中文 -> 数据库查简写)
  1201. if filter_key == 'professional_field':
  1202. filter_value = PROFESSIONAL_FIELD_MAP.get(filter_value, filter_value)
  1203. # 处理方案类别查询 (前端传中文 -> 数据库查简写)
  1204. if filter_key == 'plan_category':
  1205. filter_value = PLAN_CATEGORY_MAP.get(filter_value, filter_value)
  1206. # 处理一级分类查询 (前端传中文 -> 数据库查简写)
  1207. if filter_key == 'level_1_classification':
  1208. filter_value = FIRST_LEVEL_MAP.get(filter_value, filter_value)
  1209. # 处理二级分类查询 (前端传中文 -> 数据库查简写)
  1210. if filter_key == 'level_2_classification':
  1211. filter_value = SECOND_LEVEL_MAP.get(filter_value, filter_value)
  1212. # 处理三级分类查询 (前端传中文 -> 数据库查简写)
  1213. if filter_key == 'level_3_classification':
  1214. filter_value = THIRD_LEVEL_MAP.get(filter_value, filter_value)
  1215. # 处理四级分类查询 (前端传中文 -> 数据库查简写)
  1216. if filter_key == 'level_4_classification':
  1217. filter_value = FOURTH_LEVEL_MAP.get(filter_value, filter_value)
  1218. # 如果是 title, standard_no, issuing_authority,支持模糊查询
  1219. if filter_key in ['title', 'standard_no', 'issuing_authority']:
  1220. where_clauses.append(f"{db_field} LIKE %s")
  1221. params.append(f"%{filter_value}%")
  1222. else:
  1223. where_clauses.append(f"{db_field} = %s")
  1224. params.append(filter_value)
  1225. where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
  1226. offset = (page - 1) * size
  1227. # 使用 LEFT JOIN 关联主表和用户表获取姓名
  1228. sql = f"""
  1229. SELECT {fields}, kb.name as kb_name
  1230. FROM {table_name} s
  1231. LEFT JOIN t_samp_document_main m ON s.id = m.id
  1232. LEFT JOIN t_sys_user u1 ON s.created_by = u1.id
  1233. LEFT JOIN t_sys_user u2 ON s.updated_by = u2.id
  1234. LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id
  1235. {where_sql}
  1236. ORDER BY s.created_time DESC
  1237. LIMIT %s OFFSET %s
  1238. """
  1239. params = params + [size, offset]
  1240. cursor.execute(sql, tuple(params))
  1241. items = cursor.fetchall()
  1242. # 处理 URL 转换
  1243. for item in items:
  1244. # 处理文档类型显示 (简写 -> 中文)
  1245. doc_type_code = item.get('document_type')
  1246. if doc_type_code in DOCUMENT_TYPE_REVERSE_MAP:
  1247. item['document_type'] = DOCUMENT_TYPE_REVERSE_MAP[doc_type_code]
  1248. # 处理专业领域显示 (简写 -> 中文)
  1249. prof_field_code = item.get('professional_field')
  1250. if prof_field_code in PROFESSIONAL_FIELD_REVERSE_MAP:
  1251. item['professional_field'] = PROFESSIONAL_FIELD_REVERSE_MAP[prof_field_code]
  1252. # 处理方案类别显示 (简写 -> 中文)
  1253. plan_category_code = item.get('plan_category')
  1254. if plan_category_code in PLAN_CATEGORY_REVERSE_MAP:
  1255. item['plan_category'] = PLAN_CATEGORY_REVERSE_MAP[plan_category_code]
  1256. # 处理一级分类显示 (简写 -> 中文)
  1257. level_1_code = item.get('level_1_classification')
  1258. if level_1_code in FIRST_LEVEL_REVERSE_MAP:
  1259. item['level_1_classification'] = FIRST_LEVEL_REVERSE_MAP[level_1_code]
  1260. # 处理二级分类显示 (简写 -> 中文)
  1261. level_2_code = item.get('level_2_classification')
  1262. if level_2_code in SECOND_LEVEL_REVERSE_MAP:
  1263. item['level_2_classification'] = SECOND_LEVEL_REVERSE_MAP[level_2_code]
  1264. # 处理三级分类显示 (简写 -> 中文)
  1265. level_3_code = item.get('level_3_classification')
  1266. if level_3_code in THIRD_LEVEL_REVERSE_MAP:
  1267. item['level_3_classification'] = THIRD_LEVEL_REVERSE_MAP[level_3_code]
  1268. # 处理四级分类显示 (简写 -> 中文)
  1269. level_4_code = item.get('level_4_classification')
  1270. if level_4_code in FOURTH_LEVEL_REVERSE_MAP:
  1271. item['level_4_classification'] = FOURTH_LEVEL_REVERSE_MAP[level_4_code]
  1272. for key in ['file_url', 'md_url', 'json_url']:
  1273. if item.get(key):
  1274. item[key] = self.minio_manager.get_full_url(item[key])
  1275. # 总数
  1276. count_sql = f"SELECT COUNT(*) as count FROM {table_name} s {where_sql}"
  1277. cursor.execute(count_sql, tuple(params[:-2]))
  1278. res = cursor.fetchone()
  1279. total = res['count'] if res else 0
  1280. return items, total
  1281. except Exception as e:
  1282. logger.exception(f"获取 {type} 列表失败")
  1283. return [], 0
  1284. finally:
  1285. cursor.close()
  1286. conn.close()
  1287. # ==================== 文档转换 ====================
  1288. async def get_document_source_type(self, doc_id: str) -> Optional[str]:
  1289. """获取文档的source_type"""
  1290. conn = get_db_connection()
  1291. if not conn:
  1292. return None
  1293. cursor = conn.cursor()
  1294. try:
  1295. cursor.execute("SELECT source_type FROM t_samp_document_main WHERE id = %s", (doc_id,))
  1296. res = cursor.fetchone()
  1297. return res['source_type'] if res else None
  1298. except Exception as e:
  1299. logger.exception(f"获取文档source_type失败: {e}")
  1300. return None
  1301. finally:
  1302. cursor.close()
  1303. conn.close()
  1304. async def get_document_title(self, doc_id: str) -> str:
  1305. """获取文档标题"""
  1306. conn = get_db_connection()
  1307. if not conn:
  1308. return "文档"
  1309. cursor = conn.cursor()
  1310. try:
  1311. cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (doc_id,))
  1312. res = cursor.fetchone()
  1313. return res['title'] if res else "文档"
  1314. except Exception as e:
  1315. logger.exception(f"获取文档标题失败: {e}")
  1316. return "文档"
  1317. finally:
  1318. cursor.close()
  1319. conn.close()
  1320. async def update_conversion_status(self, doc_id: str, status: int,
  1321. md_url: Optional[str] = None,
  1322. json_url: Optional[str] = None,
  1323. error_message: Optional[str] = None) -> bool:
  1324. """更新文档转换状态
  1325. Args:
  1326. doc_id: 文档ID
  1327. status: 转换状态 (0=未转换, 1=转换中, 2=已完成, 3=失败)
  1328. md_url: Markdown文件URL
  1329. json_url: JSON文件URL
  1330. error_message: 错误信息
  1331. """
  1332. conn = get_db_connection()
  1333. if not conn:
  1334. return False
  1335. cursor = conn.cursor()
  1336. try:
  1337. update_clauses = ["conversion_status = %s"]
  1338. params = [status]
  1339. if error_message:
  1340. update_clauses.append("conversion_error = %s")
  1341. params.append(error_message)
  1342. if md_url:
  1343. update_clauses.append("md_url = %s")
  1344. params.append(md_url)
  1345. if json_url:
  1346. update_clauses.append("json_url = %s")
  1347. params.append(json_url)
  1348. sql = f"UPDATE t_samp_document_main SET {', '.join(update_clauses)}, updated_time = NOW() WHERE id = %s"
  1349. params.append(doc_id)
  1350. cursor.execute(sql, tuple(params))
  1351. conn.commit()
  1352. return True
  1353. except Exception as e:
  1354. logger.exception(f"更新转换进度失败: {e}")
  1355. conn.rollback()
  1356. return False
  1357. finally:
  1358. cursor.close()
  1359. conn.close()
  1360. # ==================== 基础信息管理 ====================
  1361. async def add_basic_info(self, type: str, data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]:
  1362. """新增基本信息"""
  1363. logger.info(f"Adding basic info for type {type}: {data}")
  1364. conn = get_db_connection()
  1365. if not conn:
  1366. return False, "数据库连接失败", None
  1367. cursor = conn.cursor()
  1368. try:
  1369. table_name = TABLE_MAP.get(type)
  1370. if not table_name:
  1371. return False, "无效的类型", None
  1372. doc_id = str(uuid.uuid4())
  1373. file_url = data.get('file_url')
  1374. file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
  1375. # 处理文档类型 (中文 -> 简写)
  1376. doc_type_cn = data.get('document_type')
  1377. doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn)
  1378. # 处理专业领域 (中文 -> 简写)
  1379. prof_field_cn = data.get('professional_field')
  1380. prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
  1381. # 处理方案类别 (中文 -> 简写)
  1382. plan_category_cn = data.get('plan_category')
  1383. plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
  1384. # 处理一级分类 (中文 -> 简写)
  1385. level_1_cn = data.get('level_1_classification')
  1386. level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
  1387. # 处理二级分类 (中文 -> 简写)
  1388. level_2_cn = data.get('level_2_classification')
  1389. level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
  1390. # 处理三级分类 (中文 -> 简写)
  1391. level_3_cn = data.get('level_3_classification')
  1392. level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
  1393. # 处理四级分类 (中文 -> 简写)
  1394. level_4_cn = data.get('level_4_classification')
  1395. level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
  1396. # 处理四级分类 (中文 -> 简写)
  1397. level_4_cn = data.get('level_4_classification')
  1398. level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
  1399. # 1. 插入主表 (解耦触发器,手动同步)
  1400. cursor.execute(
  1401. """
  1402. INSERT INTO t_samp_document_main (
  1403. id, title, source_type, file_url,
  1404. file_extension, created_by, updated_by, created_time, updated_time,
  1405. conversion_status, whether_to_task, kb_id
  1406. ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s)
  1407. """,
  1408. (
  1409. doc_id, data.get('title'), type, file_url,
  1410. file_extension, user_id, user_id, data.get('kb_id')
  1411. )
  1412. )
  1413. # 2. 插入子表 (移除 file_url,因为它现在只存储 in 主表中)
  1414. if type == 'standard':
  1415. sql = f"""
  1416. INSERT INTO {table_name} (
  1417. id, chinese_name, english_name, standard_number, issuing_authority,
  1418. release_date, implementation_date, drafting_unit, approving_department,
  1419. participating_units,
  1420. document_type, professional_field, engineering_phase,
  1421. validity, reference_basis, source_url, note,
  1422. created_by, updated_by, created_time, updated_time
  1423. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  1424. """
  1425. params = (
  1426. doc_id, data.get('title'), data.get('english_name'), data.get('standard_no'),
  1427. data.get('issuing_authority'), self._to_date(data.get('release_date')), self._to_date(data.get('implementation_date')),
  1428. data.get('drafting_unit'), data.get('approving_department'),
  1429. data.get('participating_units'),
  1430. doc_type_code, prof_field_code, data.get('engineering_phase'),
  1431. data.get('validity', '现行'), data.get('reference_basis'), data.get('source_url'), data.get('note'),
  1432. user_id, user_id
  1433. )
  1434. elif type == 'construction_plan':
  1435. sql = f"""
  1436. INSERT INTO {table_name} (
  1437. id, plan_name, project_name, project_section, compiling_unit,
  1438. compiling_date, plan_summary, compilation_basis, plan_category,
  1439. level_1_classification, level_2_classification, level_3_classification, level_4_classification,
  1440. note, created_by, updated_by, created_time, updated_time
  1441. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  1442. """
  1443. params = (
  1444. doc_id, data.get('title'), data.get('project_name'), data.get('project_section'),
  1445. data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('plan_summary'),
  1446. data.get('compilation_basis'), plan_category_code, level_1_code or 'SC',
  1447. level_2_code, level_3_code, level_4_code,
  1448. data.get('note'), user_id, user_id
  1449. )
  1450. elif type == 'regulation':
  1451. sql = f"""
  1452. INSERT INTO {table_name} (
  1453. id, file_name, issuing_department, document_type, publish_date,
  1454. effective_start_date, effective_end_date, note,
  1455. created_by, updated_by, created_time, updated_time
  1456. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
  1457. """
  1458. params = (
  1459. doc_id, data.get('title'), data.get('issuing_authority'), doc_type_code,
  1460. self._to_date(data.get('release_date')), self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')),
  1461. data.get('note'), user_id, user_id
  1462. )
  1463. else:
  1464. return False, "不支持的类型", None
  1465. cursor.execute(sql, params)
  1466. # 3. 添加到任务管理中心 (类型为 data)
  1467. try:
  1468. # 尝试调用异步方法,如果报错则记录日志
  1469. import asyncio
  1470. try:
  1471. # 检查是否有正在运行的事件循环
  1472. loop = asyncio.get_event_loop()
  1473. if loop.is_running():
  1474. # 在运行的循环中创建任务
  1475. loop.create_task(task_service.add_task(doc_id, 'data'))
  1476. else:
  1477. # 否则使用 run 运行(不推荐在 web 环境下这样做,但这里是兜底)
  1478. loop.run_until_complete(task_service.add_task(doc_id, 'data'))
  1479. except RuntimeError:
  1480. # 没有事件循环时
  1481. asyncio.run(task_service.add_task(doc_id, 'data'))
  1482. except Exception as task_err:
  1483. logger.error(f"添加基本信息 {data.get('title')} 到任务中心失败: {task_err}")
  1484. conn.commit()
  1485. return True, "新增成功", doc_id
  1486. except Exception as e:
  1487. logger.exception("新增基本信息失败")
  1488. conn.rollback()
  1489. return False, str(e), None
  1490. finally:
  1491. cursor.close()
  1492. conn.close()
  1493. async def edit_basic_info(self, type: str, doc_id: str, data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
  1494. """编辑基本信息"""
  1495. logger.info(f"Editing basic info for type {type}, id {doc_id}: {data}")
  1496. conn = get_db_connection()
  1497. if not conn:
  1498. return False, "数据库连接失败"
  1499. cursor = conn.cursor()
  1500. try:
  1501. table_name = TABLE_MAP.get(type)
  1502. if not table_name:
  1503. return False, "无效的类型"
  1504. # 处理 URL 存储(转为相对路径)
  1505. file_url = self.minio_manager.get_relative_path(data.get('file_url'))
  1506. file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
  1507. # 处理文档类型 (中文 -> 简写)
  1508. doc_type_cn = data.get('document_type')
  1509. doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn)
  1510. # 处理专业领域 (中文 -> 简写)
  1511. prof_field_cn = data.get('professional_field')
  1512. prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
  1513. # 处理方案类别 (中文 -> 简写)
  1514. plan_category_cn = data.get('plan_category')
  1515. plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
  1516. # 处理一级分类 (中文 -> 简写)
  1517. level_1_cn = data.get('level_1_classification')
  1518. level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
  1519. # 处理二级分类 (中文 -> 简写)
  1520. level_2_cn = data.get('level_2_classification')
  1521. level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
  1522. # 处理三级分类 (中文 -> 简写)
  1523. level_3_cn = data.get('level_3_classification')
  1524. level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
  1525. # 处理四级分类 (中文 -> 简写)
  1526. level_4_cn = data.get('level_4_classification')
  1527. level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
  1528. # 1. 更新主表 (解耦触发器)
  1529. cursor.execute(
  1530. """
  1531. UPDATE t_samp_document_main
  1532. SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW(), kb_id = %s
  1533. WHERE id = %s
  1534. """,
  1535. (data.get('title'), file_url, file_extension, updater_id, data.get('kb_id'), doc_id)
  1536. )
  1537. # 2. 更新子表 (移除 file_url)
  1538. if type == 'standard':
  1539. sql = f"""
  1540. UPDATE {table_name}
  1541. SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s,
  1542. document_type = %s, professional_field = %s, validity = %s,
  1543. english_name = %s, implementation_date = %s, drafting_unit = %s,
  1544. approving_department = %s, engineering_phase = %s,
  1545. participating_units = %s,
  1546. reference_basis = %s,
  1547. source_url = %s, note = %s,
  1548. updated_by = %s, updated_time = NOW()
  1549. WHERE id = %s
  1550. """
  1551. params = (
  1552. data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
  1553. doc_type_code, prof_field_code, data.get('validity'),
  1554. data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'),
  1555. data.get('approving_department'), data.get('engineering_phase'),
  1556. data.get('participating_units'),
  1557. data.get('reference_basis'),
  1558. data.get('source_url'), data.get('note'),
  1559. updater_id, doc_id
  1560. )
  1561. elif type == 'construction_plan':
  1562. sql = f"""
  1563. UPDATE {table_name}
  1564. SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s,
  1565. compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s,
  1566. level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s,
  1567. note = %s, updated_by = %s, updated_time = NOW()
  1568. WHERE id = %s
  1569. """
  1570. params = (
  1571. data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'),
  1572. self._to_date(data.get('release_date')), data.get('plan_summary'), data.get('compilation_basis'), plan_category_code,
  1573. level_1_code, level_2_code, level_3_code, level_4_code,
  1574. data.get('note'), updater_id, doc_id
  1575. )
  1576. elif type == 'regulation':
  1577. sql = f"""
  1578. UPDATE {table_name}
  1579. SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s,
  1580. effective_start_date = %s, effective_end_date = %s, note = %s,
  1581. updated_by = %s, updated_time = NOW()
  1582. WHERE id = %s
  1583. """
  1584. params = (
  1585. data.get('title'), data.get('issuing_authority'), doc_type_code, self._to_date(data.get('release_date')),
  1586. self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')), data.get('note'),
  1587. updater_id, doc_id
  1588. )
  1589. else:
  1590. return False, "不支持的类型"
  1591. cursor.execute(sql, params)
  1592. conn.commit()
  1593. return True, "编辑成功"
  1594. except Exception as e:
  1595. logger.exception("编辑基本信息失败")
  1596. conn.rollback()
  1597. return False, str(e)
  1598. finally:
  1599. cursor.close()
  1600. conn.close()
  1601. async def delete_basic_info(self, type: str, doc_id: str) -> Tuple[bool, str]:
  1602. """删除基本信息"""
  1603. if not doc_id:
  1604. return False, "缺少 ID 参数"
  1605. logger.info(f"Deleting basic info: type={type}, id={doc_id}")
  1606. conn = get_db_connection()
  1607. if not conn:
  1608. return False, "数据库连接失败"
  1609. cursor = conn.cursor()
  1610. try:
  1611. table_name = TABLE_MAP.get(type)
  1612. if not table_name:
  1613. return False, "无效的类型"
  1614. # 1. 显式删除子表记录 (防止 CASCADE 未生效)
  1615. try:
  1616. cursor.execute(f"DELETE FROM {table_name} WHERE id = %s", (doc_id,))
  1617. logger.info(f"Deleted from sub-table {table_name}, affected: {cursor.rowcount}")
  1618. except Exception as sub_e:
  1619. logger.warning(f"删除子表 {table_name} 记录失败 (可能不存在): {sub_e}")
  1620. # 2. 同步删除任务管理中心的数据 (优先删除关联数据)
  1621. try:
  1622. # 使用当前事务删除任务记录(如果 task_service 支持的话,目前它自建连接)
  1623. # 这里我们直接在当前 cursor 中也执行一次,确保事务一致性
  1624. cursor.execute("DELETE FROM t_task_management WHERE business_id = %s", (doc_id,))
  1625. logger.info(f"Deleted from t_task_management, affected: {cursor.rowcount}")
  1626. except Exception as task_e:
  1627. logger.warning(f"在主事务中删除任务记录失败: {task_e}")
  1628. # 3. 删除主表记录
  1629. cursor.execute("DELETE FROM t_samp_document_main WHERE id = %s", (doc_id,))
  1630. affected_main = cursor.rowcount
  1631. logger.info(f"Deleted from t_samp_document_main, affected: {affected_main}")
  1632. if affected_main == 0:
  1633. logger.warning(f"未找到主表记录: {doc_id}")
  1634. # 即使主表没找到,我们也 commit 之前的操作并返回成功(幂等性)
  1635. conn.commit()
  1636. # 4. 再次确保任务中心数据已删除 (调用原有服务)
  1637. try:
  1638. await task_service.delete_task(doc_id)
  1639. except Exception as task_err:
  1640. logger.error(f"调用 task_service 删除任务失败: {task_err}")
  1641. return True, "删除成功"
  1642. except Exception as e:
  1643. logger.exception(f"删除基本信息异常 (ID: {doc_id})")
  1644. conn.rollback()
  1645. return False, f"删除失败: {str(e)}"
  1646. finally:
  1647. cursor.close()
  1648. conn.close()