| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913 |
- """
- 样本中心服务层
- 从 sample_view.py 提取的SQL查询逻辑
- """
- import logging
- import uuid
- from datetime import datetime
- from typing import Optional, List, Dict, Any, Tuple
- from app.base.async_mysql_connection import get_db_connection
- from app.base.minio_connection import get_minio_manager
- from app.core.config import config_handler
- from app.services.milvus_service import milvus_service
- from app.services.task_service import task_service
- logger = logging.getLogger(__name__)
- # 表名映射
- TABLE_MAP = {
- 'standard': 't_samp_standard_base_info',
- 'construction_plan': 't_samp_construction_plan_base_info',
- 'regulation': 't_samp_office_regulations'
- }
- # 文档类型映射 (前端显示 -> 数据库存储)
- DOCUMENT_TYPE_MAP = {
- "国家标准": "GB",
- "行业标准": "HY",
- "部门规章": "BM",
- "地方标准": "DB",
- "企业标准": "QY",
- "管理制度": "GL",
- "技术规范": "GF",
- "团体标准": "TT",
- "国际标准": "GJ",
- "国家法律": "FL",
- "地方法规": "LR",
- "其他": "QT"
- }
- # 文档类型反向映射 (数据库存储 -> 前端显示)
- # 包含旧版本简写以保证存量数据兼容性
- DOCUMENT_TYPE_REVERSE_MAP = {v: k for k, v in DOCUMENT_TYPE_MAP.items()}
- DOCUMENT_TYPE_REVERSE_MAP.update({
- "HB": "行业标准",
- "QB": "企业标准",
- "GLZD": "管理制度",
- "JSGF": "技术规范",
- "TB": "团体标准",
- "IB": "国际标准",
- "OTHER": "其他"
- })
- # 专业领域映射 (前端显示 -> 数据库存储)
- PROFESSIONAL_FIELD_MAP = {
- "法律法规": "FL",
- "通用标准": "TY",
- "勘察钻探": "KC",
- "地基基础": "DJ",
- "路基路面": "LJ",
- "桥梁工程": "QL",
- "隧道工程": "SD",
- "交通工程": "JT",
- "建筑工程": "JZ",
- "市政工程": "SZ",
- "机电安装": "JD",
- "路桥工程": "LB",
- "装饰装修": "ZS",
- "港口航道": "GK",
- "铁路工程": "TL",
- "房建工程": "FJ",
- "水利电力": "SL",
- "信息化": "XX",
- "试验检测": "SY",
- "安全环保": "AQ",
- "其他": "QT"
- }
- # 专业领域反向映射 (数据库存储 -> 前端显示)
- PROFESSIONAL_FIELD_REVERSE_MAP = {v: k for k, v in PROFESSIONAL_FIELD_MAP.items()}
- # 方案类别映射 (前端显示 -> 数据库存储)
- PLAN_CATEGORY_MAP = {
- "超危大方案": "CH",
- "超危大方案较大Ⅱ级": "CH2",
- "超危大方案特大Ⅳ级": "CH4",
- "超危大方案一般Ⅰ级": "CH1",
- "超危大方案重大Ⅲ级": "CH3",
- "危大方案": "WD",
- "一般方案": "YB"
- }
- # 方案类别反向映射 (数据库存储 -> 前端显示)
- PLAN_CATEGORY_REVERSE_MAP = {v: k for k, v in PLAN_CATEGORY_MAP.items()}
- # 一级分类映射 (前端显示 -> 数据库存储)
- FIRST_LEVEL_MAP = {
- "施工方案": "SC"
- }
- # 一级分类反向映射 (数据库存储 -> 前端显示)
- FIRST_LEVEL_REVERSE_MAP = {v: k for k, v in FIRST_LEVEL_MAP.items()}
- # 二级分类映射 (前端显示 -> 数据库存储)
- SECOND_LEVEL_MAP = {
- "临建工程": "LZ",
- "路基工程": "LJ",
- "桥梁工程": "QL",
- "隧道工程": "SD",
- "其他": "QT"
- }
- # 二级分类反向映射 (数据库存储 -> 前端显示)
- SECOND_LEVEL_REVERSE_MAP = {v: k for k, v in SECOND_LEVEL_MAP.items()}
- # 三级分类映射 (前端显示 -> 数据库存储)
- THIRD_LEVEL_MAP = {
- "TBM施工": "TM",
- "拌和站安、拆施工": "BH",
- "不良地质隧道施工": "BL",
- "常规桥梁": "CG",
- "挡土墙工程类": "DT",
- "辅助坑道施工": "FB",
- "复杂洞口工程施工": "FD",
- "钢筋加工场安、拆": "GG",
- "钢栈桥施工": "GZ",
- "拱桥": "GH",
- "涵洞工程类": "HD",
- "滑坡体处理类": "HP",
- "路堤": "LT",
- "路堑": "LQ",
- "其他": "QT",
- "深基坑": "JK",
- "隧道总体施工": "ZT",
- "特殊结构隧道": "TS",
- "斜拉桥": "XL",
- "悬索桥": "XS"
- }
- # 三级分类反向映射 (数据库存储 -> 前端显示)
- THIRD_LEVEL_REVERSE_MAP = {v: k for k, v in THIRD_LEVEL_MAP.items()}
- # 四级分类映射 (前端显示 -> 数据库存储)
- FOURTH_LEVEL_MAP = {
- "挡土墙": "DT",
- "顶管": "DG",
- "断层破碎带及软弱围岩": "DL",
- "钢筋砼箱涵": "GX",
- "高填路堤": "GT",
- "抗滑桩": "KH",
- "软岩大变形隧道": "RY",
- "上部结构": "SB",
- "深基坑开挖与支护": "JK",
- "深挖路堑": "LC",
- "隧道TBM": "TM",
- "隧道进洞": "JD",
- "隧道竖井": "SJ",
- "隧道斜井": "XJ",
- "特种设备": "TZ",
- "瓦斯隧道": "WS",
- "下部结构": "XB",
- "小净距隧道": "NJ",
- "岩爆隧道": "YB",
- "岩溶隧道": "YR",
- "涌水突泥隧道": "YN",
- "桩基础": "ZJ",
- "其他": "QT"
- }
- # 四级分类反向映射 (数据库存储 -> 前端显示)
- FOURTH_LEVEL_REVERSE_MAP = {v: k for k, v in FOURTH_LEVEL_MAP.items()}
- def get_table_name(source_type: str) -> Optional[str]:
- """根据source_type获取表名"""
- return TABLE_MAP.get(source_type)
- class SampleService:
- """样本中心服务类 - 使用 SQL 查询方式"""
-
- _initialized = False
- def __init__(self):
- """初始化服务"""
- # 使用统一的 MinIO 管理器
- self.minio_manager = get_minio_manager()
-
- # 使用全局 Milvus 服务
- self.milvus_service = milvus_service
-
- # 确保集合已创建 (仅在首次初始化时执行)
- if not SampleService._initialized:
- try:
- self.milvus_service.ensure_collections()
- SampleService._initialized = True
- except Exception as e:
- logger.error(f"初始化 Milvus 集合失败: {e}")
- def _format_document_row(self, item: Dict[str, Any]) -> Dict[str, Any]:
- """格式化文档行数据,处理URL、日期映射和文件名显示"""
- if not item:
- return item
-
- # 1. 处理 URL 转换
- for key in ['file_url', 'md_url', 'json_url']:
- if item.get(key):
- item[key] = self.minio_manager.get_full_url(item[key])
-
- # 2. 映射字段以适配前端通用显示
- source_type = item.get('source_type')
-
- # 处理文档类型显示 (简写 -> 中文)
- doc_type_code = item.get('document_type')
- if doc_type_code in DOCUMENT_TYPE_REVERSE_MAP:
- item['document_type'] = DOCUMENT_TYPE_REVERSE_MAP[doc_type_code]
-
- # 处理专业领域显示 (简写 -> 中文)
- prof_field_code = item.get('professional_field')
- if prof_field_code in PROFESSIONAL_FIELD_REVERSE_MAP:
- item['professional_field'] = PROFESSIONAL_FIELD_REVERSE_MAP[prof_field_code]
- # 处理一级分类显示 (简写 -> 中文)
- level_1_code = item.get('level_1_classification')
- if level_1_code in FIRST_LEVEL_REVERSE_MAP:
- item['level_1_classification'] = FIRST_LEVEL_REVERSE_MAP[level_1_code]
- # 处理二级分类显示 (简写 -> 中文)
- level_2_code = item.get('level_2_classification')
- if level_2_code in SECOND_LEVEL_REVERSE_MAP:
- item['level_2_classification'] = SECOND_LEVEL_REVERSE_MAP[level_2_code]
- # 处理三级分类显示 (简写 -> 中文)
- level_3_code = item.get('level_3_classification')
- if level_3_code in THIRD_LEVEL_REVERSE_MAP:
- item['level_3_classification'] = THIRD_LEVEL_REVERSE_MAP[level_3_code]
- # 处理四级分类显示 (简写 -> 中文)
- level_4_code = item.get('level_4_classification')
- if level_4_code in FOURTH_LEVEL_REVERSE_MAP:
- item['level_4_classification'] = FOURTH_LEVEL_REVERSE_MAP[level_4_code]
- # 处理方案类别显示 (简写 -> 中文)
- plan_category_code = item.get('plan_category')
- if plan_category_code in PLAN_CATEGORY_REVERSE_MAP:
- item['plan_category'] = PLAN_CATEGORY_REVERSE_MAP[plan_category_code]
- if source_type == 'standard':
- # 标准规范特有映射
- if 'standard_number' in item:
- item['standard_no'] = item.get('standard_number')
- elif source_type == 'construction_plan':
- # 施工方案特有映射
- item['issuing_authority'] = item.get('compiling_unit')
- item['release_date'] = item.get('compiling_date')
- elif source_type == 'regulation':
- # 办公制度特有映射
- item['issuing_authority'] = item.get('issuing_department')
- item['release_date'] = item.get('publish_date')
-
- # 3. 格式化时间
- date_keys = [
- 'created_time', 'updated_time', 'release_date',
- 'publish_date', 'compiling_date', 'implementation_date',
- 'effective_start_date', 'effective_end_date'
- ]
- for key in date_keys:
- val = item.get(key)
- if val and hasattr(val, 'isoformat'):
- item[key] = val.isoformat()
- elif val is not None:
- item[key] = str(val)
-
- # 4. 增加格式化文件名供前端显示
- if item.get('conversion_status') == 2:
- title = item.get('title', 'document')
- item['md_display_name'] = f"{title}.md"
- item['json_display_name'] = f"{title}.json"
-
- return item
- async def get_upload_url(self, filename: str, content_type: str, prefix: str = None) -> Tuple[bool, str, Dict[str, Any]]:
- """获取 MinIO 预签名上传 URL"""
- try:
- data = self.minio_manager.get_upload_url(filename, content_type, prefix=prefix)
- return True, "成功获取上传链接", data
- except Exception as e:
- logger.exception("生成上传链接失败")
- return False, f"生成上传链接失败: {str(e)}", {}
- # ==================== 文档管理 ====================
-
- async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str, kb_method: str = "general", chunk_size: int = 500, separator: str = "。") -> Tuple[int, str]:
- """批量将文档入库到知识库
-
- Args:
- doc_ids: 文档ID列表
- username: 操作人
- kb_method: 切分方法
- chunk_size: 切分长度
- separator: 切分符号
- """
- conn = get_db_connection()
- if not conn:
- return 0, "数据库连接失败,请检查数据库服务状态"
-
- cursor = conn.cursor()
- success_count = 0
- already_entered_count = 0
- failed_count = 0
- error_details = []
-
- try:
- # 1. 获取所有选中选中的文档详情
- placeholders = ','.join(['%s']*len(doc_ids))
- fetch_sql = f"""
- SELECT id, title, source_type, md_url, conversion_status, whether_to_enter, created_time, kb_id, file_url
- FROM t_samp_document_main
- WHERE id IN ({placeholders})
- """
- cursor.execute(fetch_sql, tuple(doc_ids))
- selected_docs = cursor.fetchall()
-
- if not selected_docs:
- return 0, "选中的文档在数据库中不存在"
- # 2. 逐份处理
- for doc in selected_docs:
- doc_id = doc['id']
- title = doc.get('title', '未命名文档')
- status = doc.get('conversion_status')
- whether_to_enter = doc.get('whether_to_enter', 0)
- md_url = doc.get('md_url')
- source_type = doc.get('source_type')
-
- # A. 检查是否已入库
- if whether_to_enter == 1:
- logger.info(f"文档 {title}({doc_id}) 已入库,跳过二次入库")
- already_entered_count += 1
- error_details.append(f"· {title}: 已在知识库中,无需重复入库")
- continue
- # B. 检查转换状态
- if status != 2:
- reason = "尚未转换成功" if status == 0 else "正在转换中" if status == 1 else "转换失败"
- logger.warning(f"文档 {title}({doc_id}) 状态为 {status},入库失败: {reason}")
- failed_count += 1
- error_details.append(f"· {title}: {reason}")
- continue
-
- if not md_url:
- logger.warning(f"文档 {title}({doc_id}) 缺少 md_url,入库失败")
- failed_count += 1
- error_details.append(f"· {title}: 转换结果地址丢失")
- continue
-
- # C. 确定入库策略 (从数据库读取已绑定的知识库)
- current_kb_id = doc.get('kb_id')
- current_kb_method = kb_method # 直接使用前端传来的切分方式
- if not current_kb_id:
- logger.warning(f"文档 {title}({doc_id}) 未指定知识库,跳过入库")
- failed_count += 1
- error_details.append(f"· {title}: 未指定目标知识库")
- continue
- if not current_kb_method:
- logger.warning(f"文档 {title}({doc_id}) 未指定切分方式,跳过入库")
- failed_count += 1
- error_details.append(f"· {title}: 未指定切分策略")
- continue
- # 获取知识库信息 (collection_name_parent, collection_name_children)
- kb_sql = "SELECT collection_name_parent, collection_name_children FROM t_samp_knowledge_base WHERE id = %s AND is_deleted = 0"
- cursor.execute(kb_sql, (current_kb_id,))
- kb_res = cursor.fetchone()
-
- if not kb_res:
- logger.warning(f"找不到指定的知识库: id={current_kb_id}")
- failed_count += 1
- error_details.append(f"· {title}: 指定的知识库不存在或已被删除")
- continue
-
- collection_name_parent = kb_res['collection_name_parent']
- collection_name_children = kb_res['collection_name_children']
-
- # D. 从 MinIO 获取 Markdown 内容
- try:
- md_content = self.minio_manager.get_object_content(md_url)
- if not md_content:
- raise ValueError(f"无法从 MinIO 读取内容 (URL: {md_url})")
- except Exception as minio_err:
- logger.error(f"读取文档 {title} 内容失败: {minio_err}")
- failed_count += 1
- error_details.append(f"· {title}: 读取云端文件失败")
- continue
-
- # E. 调用 MilvusService 进行切分和入库
- try:
- # 获取业务元数据
- business_metadata = {}
- if source_type in ['standard', 'basis']:
- std_sql = """
- SELECT id as basic_info_id, chinese_name, standard_number, issuing_authority,
- document_type, professional_field, validity
- FROM t_samp_standard_base_info
- WHERE id = %s
- """
- cursor.execute(std_sql, (doc_id,))
- business_metadata = cursor.fetchone() or {}
- elif source_type == 'construction_plan':
- plan_sql = """
- SELECT id as basic_info_id, plan_name, project_name, project_section, compiling_unit,
- compiling_date, plan_summary, plan_category,
- level_1_classification, level_2_classification,
- level_3_classification, level_4_classification
- FROM t_samp_construction_plan_base_info
- WHERE id = %s
- """
- cursor.execute(plan_sql, (doc_id,))
- business_metadata = cursor.fetchone() or {}
- elif source_type == 'regulation':
- reg_sql = """
- SELECT id as basic_info_id, file_name, issuing_department as issuing_authority,
- document_type, publish_date as release_date, note
- FROM t_samp_office_regulations
- WHERE id = %s
- """
- cursor.execute(reg_sql, (doc_id,))
- business_metadata = cursor.fetchone() or {}
-
- # 强制转换 document_type 为简写 (处理存量数据或异常情况)
- if 'document_type' in business_metadata and business_metadata['document_type']:
- doc_type = business_metadata['document_type']
- business_metadata['document_type'] = DOCUMENT_TYPE_MAP.get(doc_type, doc_type)
- # 强制转换 professional_field 为简写
- if 'professional_field' in business_metadata and business_metadata['professional_field']:
- prof_field = business_metadata['professional_field']
- business_metadata['professional_field'] = PROFESSIONAL_FIELD_MAP.get(prof_field, prof_field)
- # 强制转换 plan_category 为简写
- if 'plan_category' in business_metadata and business_metadata['plan_category']:
- plan_cat = business_metadata['plan_category']
- business_metadata['plan_category'] = PLAN_CATEGORY_MAP.get(plan_cat, plan_cat)
- # 强制转换 level_1_classification 为简写
- if 'level_1_classification' in business_metadata and business_metadata['level_1_classification']:
- l1_class = business_metadata['level_1_classification']
- business_metadata['level_1_classification'] = FIRST_LEVEL_MAP.get(l1_class, l1_class)
- # 强制转换 level_2_classification 为简写
- if 'level_2_classification' in business_metadata and business_metadata['level_2_classification']:
- l2_class = business_metadata['level_2_classification']
- business_metadata['level_2_classification'] = SECOND_LEVEL_MAP.get(l2_class, l2_class)
- # 强制转换 level_3_classification 为简写
- if 'level_3_classification' in business_metadata and business_metadata['level_3_classification']:
- l3_class = business_metadata['level_3_classification']
- business_metadata['level_3_classification'] = THIRD_LEVEL_MAP.get(l3_class, l3_class)
- # 强制转换 level_4_classification 为简写
- if 'level_4_classification' in business_metadata and business_metadata['level_4_classification']:
- l4_class = business_metadata['level_4_classification']
- business_metadata['level_4_classification'] = FOURTH_LEVEL_MAP.get(l4_class, l4_class)
- # 统一使用主表的 file_url,确保数据来源一致,入库时使用完整 URL
- raw_file_url = doc.get('file_url', '')
- business_metadata['file_url'] = self.minio_manager.get_full_url(raw_file_url) if raw_file_url else ''
- # 准备元数据
- current_date = int(datetime.now().strftime('%Y%m%d'))
- doc_info = {
- "doc_id": doc_id,
- "file_name": title,
- "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else current_date,
- "tags": "",
- "user_id": username, # 传递操作人作为 created_by
- "kb_id": current_kb_id,
- "kb_method": current_kb_method,
- "collection_name_parent": collection_name_parent,
- "collection_name_children": collection_name_children,
- "chunk_size": chunk_size,
- "separator": separator,
- "source_type": source_type,
- "business_metadata": business_metadata # 注入业务元数据
- }
- await self.milvus_service.insert_knowledge(md_content, doc_info)
-
- # F. 添加到任务管理中心 (类型为 data)
- try:
- await task_service.add_task(doc_id, 'data')
- except Exception as task_err:
- logger.error(f"添加文档 {title} 到任务中心失败: {task_err}")
- # G. 更新数据库状态
- update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 1, kb_id = %s, kb_method = %s, updated_by = %s, updated_time = NOW() WHERE id = %s"
- cursor.execute(update_sql, (current_kb_id, current_kb_method, username, doc_id))
- success_count += 1
-
- except Exception as milvus_err:
- logger.exception(f"文档 {title} 写入向量库失败")
- failed_count += 1
- error_details.append(f"· {title}: 写入向量库失败 ({str(milvus_err)})")
- continue
-
- conn.commit()
-
- # 构造详细的消息
- if success_count == len(doc_ids) and failed_count == 0 and already_entered_count == 0:
- msg = f"✅ 入库成功!共处理 {success_count} 份文档。"
- else:
- msg = f"📊 入库处理完成:\n· 成功:{success_count} 份\n"
- if already_entered_count > 0:
- msg += f"· 跳过:{already_entered_count} 份 (已入库)\n"
- if failed_count > 0:
- msg += f"· 失败:{failed_count} 份\n"
-
- if error_details:
- detailed_msg = msg + "\n⚠️ 详情说明:\n" + "\n".join(error_details)
- return success_count, detailed_msg
-
- return success_count, msg
-
- except Exception as e:
- logger.exception(f"文档批量入库异常: {e}")
- conn.rollback()
- return 0, f"操作异常: {str(e)}"
- finally:
- cursor.close()
- conn.close()
-
- async def batch_add_to_task(self, doc_ids: List[str], username: str, project_name: str, task_tags: Optional[List[str]] = None) -> Tuple[bool, str]:
- """批量将文档加入标注任务中心 (单表化)
-
- Args:
- doc_ids: 文档ID列表
- username: 操作人
- project_name: 项目名称 (作为 project_id)
- task_tags: 标注任务标签列表 (例如 ["标签1", "标签2"])
- """
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- if not doc_ids:
- return False, "未指定要加入任务的文档 ID"
-
- # 0. 每次点击都生成一个新的项目 UUID (不再检查重名复用)
- import uuid
- project_id = str(uuid.uuid4())
-
- # 处理标签:转换为 JSON 字符串存储
- import json
- tag_str = json.dumps(task_tags, ensure_ascii=False) if task_tags else None
-
- # 1. 过滤掉未入库的文档
- placeholders = ', '.join(['%s'] * len(doc_ids))
- check_entered_sql = f"SELECT id FROM t_samp_document_main WHERE id IN ({placeholders}) AND whether_to_enter = 1"
- cursor.execute(check_entered_sql, doc_ids)
- entered_ids = [row['id'] for row in cursor.fetchall()]
-
- unentered_count = len(doc_ids) - len(entered_ids)
- if not entered_ids:
- return False, "所选文档均未入库,无法加入标注任务中心"
- # 2. 更新 whether_to_task 状态
- ids_to_add = entered_ids
- add_placeholders = ', '.join(['%s'] * len(ids_to_add))
- sql = f"UPDATE t_samp_document_main SET whether_to_task = 1, updated_by = %s, updated_time = NOW() WHERE id IN ({add_placeholders})"
- cursor.execute(sql, (username, *ids_to_add))
-
- # 3. 写入任务管理表 (单表逻辑)
- for doc_id in ids_to_add:
- try:
- # 获取业务元数据
- metadata_dict = {}
- try:
- # 定义需要过滤掉的非业务/内部状态字段
- EXCLUDE_FIELDS = {
- 'id', 'created_time', 'updated_time', 'created_by', 'updated_by',
- 'conversion_status', 'whether_to_enter', 'whether_to_task',
- 'kb_method', 'whether_to_delete'
- }
-
- # 查询主表和子表信息
- cursor.execute("SELECT * FROM t_samp_document_main WHERE id = %s", (doc_id,))
- doc_main = cursor.fetchone()
- if doc_main:
- # 基础元数据 (仅保留标题和来源类型等核心信息)
- for k, v in doc_main.items():
- if v is not None and v != '' and k not in EXCLUDE_FIELDS:
- metadata_dict[k] = v
-
- # 子表元数据
- source_type = doc_main.get('source_type')
- table_name = TABLE_MAP.get(source_type)
- if table_name:
- cursor.execute(f"SELECT * FROM {table_name} WHERE id = %s", (doc_id,))
- sub_data = cursor.fetchone()
- if sub_data:
- for k, v in sub_data.items():
- if v is not None and v != '' and k not in EXCLUDE_FIELDS:
- metadata_dict[k] = v
-
- # 递归格式化时间
- metadata_dict = task_service._serialize_datetime(metadata_dict)
- except Exception as meta_err:
- logger.warning(f"获取文档 {doc_id} 元数据失败: {meta_err}")
-
- await task_service.add_task(
- business_id=doc_id,
- task_type='data',
- project_id=project_id,
- project_name=project_name,
- tag=tag_str,
- metadata=json.dumps(metadata_dict, ensure_ascii=False) if metadata_dict else None
- )
- except Exception as e:
- logger.exception(f"添加文档 {doc_id} 到任务中心失败: {e}")
-
- conn.commit()
- # 4. 自动推送至外部标注平台
- push_success, push_msg = await task_service.send_to_external_platform(project_id)
-
- msg = f"成功将 {len(ids_to_add)} 份文档加入项目: {project_name}"
- if push_success:
- msg += f" (已推送: {push_msg})"
- else:
- msg += f" (推送失败: {push_msg})"
- if unentered_count > 0:
- msg += f",{unentered_count} 份文档因未入库被跳过"
- return True, msg
- except Exception as e:
- logger.exception("批量加入任务失败")
- if conn:
- conn.rollback()
- return False, f"操作失败: {str(e)}"
- finally:
- if cursor:
- cursor.close()
- if conn:
- conn.close()
- async def clear_knowledge_base_data(self, doc_ids: List[str], username: str) -> Tuple[int, str]:
- """批量清空文档在知识库(Milvus)中的数据片段"""
- conn = get_db_connection()
- if not conn:
- return 0, "数据库连接失败"
-
- cursor = conn.cursor()
- success_count = 0
- error_details = []
-
- try:
- # 1. 获取文档的知识库信息
- placeholders = ', '.join(['%s'] * len(doc_ids))
- sql = f"""
- SELECT id, title, kb_id, whether_to_enter
- FROM t_samp_document_main
- WHERE id IN ({placeholders})
- """
- cursor.execute(sql, doc_ids)
- docs = cursor.fetchall()
-
- for doc in docs:
- doc_id = doc['id']
- title = doc.get('title', '未命名')
- kb_id = doc.get('kb_id')
-
- # 只有入库了的才需要清空
- if doc.get('whether_to_enter') != 1:
- continue
-
- if not kb_id:
- error_details.append(f"· {title}: 未关联知识库,无法清空")
- continue
-
- # 获取集合名称
- cursor.execute("SELECT collection_name_parent, collection_name_children FROM t_samp_knowledge_base WHERE id = %s", (kb_id,))
- kb_info = cursor.fetchone()
-
- if not kb_info:
- error_details.append(f"· {title}: 找不到关联的知识库配置")
- continue
-
- # 2. 从 Milvus 删除
- try:
- collections = [kb_info.get('collection_name_parent'), kb_info.get('collection_name_children')]
- for coll_name in collections:
- if coll_name and self.milvus_service.client.has_collection(coll_name):
- # 使用 document_id 进行过滤删除
- self.milvus_service.client.delete(
- collection_name=coll_name,
- filter=f'document_id == "{doc_id}"'
- )
-
- # 3. 更新数据库状态
- update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 0, updated_by = %s, updated_time = NOW() WHERE id = %s"
- cursor.execute(update_sql, (username, doc_id))
- success_count += 1
- except Exception as milvus_err:
- logger.error(f"清空文档 {title} Milvus 数据失败: {milvus_err}")
- error_details.append(f"· {title}: 清空向量库数据失败")
-
- conn.commit()
-
- msg = f"成功清空 {success_count} 份文档的知识库片段"
- if error_details:
- msg += "\n部分操作受限:\n" + "\n".join(error_details)
-
- return success_count, msg
-
- except Exception as e:
- logger.exception("批量清空知识库数据失败")
- conn.rollback()
- return 0, f"操作失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
- async def batch_delete_documents(self, doc_ids: List[str]) -> Tuple[int, str]:
- """批量删除文档 (仅限未入库或已清空的文档)"""
- conn = get_db_connection()
- if not conn:
- return 0, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- if not doc_ids:
- return 0, "未指定要删除的文档 ID"
-
- # 1. 检查文档状态:已入库的文档不允许直接删除
- placeholders = ', '.join(['%s'] * len(doc_ids))
- check_sql = f"SELECT id, title FROM t_samp_document_main WHERE id IN ({placeholders}) AND whether_to_enter = 1"
- cursor.execute(check_sql, doc_ids)
- entered_docs = cursor.fetchall()
-
- if entered_docs:
- titles = [d['title'] for d in entered_docs]
- return 0, f"删除失败:以下文档已入库,请先执行‘清空数据’操作后再删除:\n{', '.join(titles[:5])}{'...' if len(titles)>5 else ''}"
-
- # 2. 执行物理删除
- # 尝试同步删除子表中的数据
- try:
- for sub_table in TABLE_MAP.values():
- sub_sql = f"DELETE FROM {sub_table} WHERE id IN ({placeholders})"
- try:
- cursor.execute(sub_sql, doc_ids)
- except Exception as sub_e:
- logger.error(f"删除子表 {sub_table} 数据失败: {sub_e}")
- except Exception as sync_e:
- logger.error(f"同步删除子表数据失败: {sync_e}")
-
- # 删除主表 t_samp_document_main 中的数据
- sql_main = f"DELETE FROM t_samp_document_main WHERE id IN ({placeholders})"
- cursor.execute(sql_main, doc_ids)
- affected_rows = cursor.rowcount
-
- # 同步删除任务管理中心的数据
- try:
- from app.services.task_service import task_service
- for doc_id in doc_ids:
- # 注意:task_service 的删除方法可能叫 delete_task_by_business_id 或类似,这里假设存在
- # 如果没有,我们需要查出主键 ID 再删
- cursor.execute("DELETE FROM t_task_management WHERE business_id = %s", (doc_id,))
- except Exception as task_err:
- logger.error(f"同步删除任务中心数据失败: {task_err}")
- conn.commit()
-
- return affected_rows, f"成功删除 {affected_rows} 条文档数据"
- except Exception as e:
- logger.exception("批量删除失败")
- conn.rollback()
- return 0, f"批量删除失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
- async def get_tag_tree(self) -> List[Dict[str, Any]]:
- """获取标签层级树 (从 t_samp_tag_category 查询)"""
- conn = get_db_connection()
- if not conn:
- return []
-
- cursor = conn.cursor()
- try:
- sql = """
- SELECT id, parent_id, name, level, type
- FROM t_samp_tag_category
- WHERE is_deleted = 0 AND status = 1
- ORDER BY level ASC, sort_no ASC
- """
- cursor.execute(sql)
- tags = cursor.fetchall()
-
- # 构建树形结构
- tag_dict = {tag['id']: {**tag, 'children': []} for tag in tags}
- tree = []
- for tag_id, tag_item in tag_dict.items():
- parent_id = tag_item['parent_id']
- if parent_id == 0:
- tree.append(tag_item)
- elif parent_id in tag_dict:
- tag_dict[parent_id]['children'].append(tag_item)
-
- return tree
- except Exception as e:
- logger.error(f"获取标签树失败: {e}")
- return []
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_list(
- self,
- whether_to_enter: Optional[int] = None,
- conversion_status: Optional[int] = None,
- keyword: Optional[str] = None,
- table_type: Optional[str] = None,
- plan_category: Optional[str] = None,
- level_2_classification: Optional[str] = None,
- level_3_classification: Optional[str] = None,
- level_4_classification: Optional[str] = None,
- page: int = 1,
- size: int = 50
- ) -> Tuple[List[Dict[str, Any]], int, int, int]:
- """获取文档列表 (支持关联查询子表)"""
- conn = get_db_connection()
- if not conn:
- return [], 0, 0, 0
-
- cursor = conn.cursor()
-
- try:
- where_clauses = []
- params = []
-
- # 基础查询
- if table_type:
- where_clauses.append("m.source_type = %s")
- params.append(table_type)
-
- if table_type in TABLE_MAP:
- # 如果指定了类型且在 TABLE_MAP 中,使用 LEFT JOIN 关联查询,以便搜索子表字段
- sub_table = TABLE_MAP[table_type]
- from_sql = f"""
- t_samp_document_main m
- LEFT JOIN {sub_table} s ON m.id = s.id
- LEFT JOIN t_sys_user u1 ON m.created_by = u1.id
- LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id
- LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id
- """
- fields_sql = "m.*, s.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name, m.id as id"
-
- # 施工方案特有的过滤字段
- if table_type == 'construction_plan':
- if plan_category:
- where_clauses.append("s.plan_category = %s")
- params.append(plan_category)
- if level_2_classification:
- where_clauses.append("s.level_2_classification = %s")
- params.append(SECOND_LEVEL_MAP.get(level_2_classification, level_2_classification))
- if level_3_classification:
- where_clauses.append("s.level_3_classification = %s")
- params.append(THIRD_LEVEL_MAP.get(level_3_classification, level_3_classification))
- if level_4_classification:
- where_clauses.append("s.level_4_classification = %s")
- params.append(FOURTH_LEVEL_MAP.get(level_4_classification, level_4_classification))
- else:
- from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id"
- fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name"
- else:
- from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id"
- fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name"
-
- order_sql = "m.created_time DESC"
- title_field = "m.title"
-
- # 分离 whether_to_enter 与 conversion_status 的过滤逻辑
- if whether_to_enter is not None:
- where_clauses.append("m.whether_to_enter = %s")
- params.append(whether_to_enter)
-
- if conversion_status is not None:
- where_clauses.append("m.conversion_status = %s")
- params.append(conversion_status)
-
- if keyword:
- where_clauses.append(f"{title_field} LIKE %s")
- params.append(f"%{keyword}%")
-
- where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
- offset = (page - 1) * size
-
- sql = f"SELECT {fields_sql} FROM {from_sql} {where_sql} ORDER BY {order_sql} LIMIT %s OFFSET %s"
- params.extend([size, offset])
-
- cursor.execute(sql, tuple(params))
- items = [self._format_document_row(row) for row in cursor.fetchall()]
-
- # 总数
- count_sql = f"SELECT COUNT(*) as count FROM {from_sql} {where_sql}"
- cursor.execute(count_sql, tuple(params[:-2]))
- res = cursor.fetchone()
- total = res['count'] if res else 0
-
- # 统计数据
- cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main")
- res = cursor.fetchone()
- all_total = res['count'] if res else 0
-
- cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main WHERE whether_to_enter = 1")
- res = cursor.fetchone()
- total_entered = res['count'] if res else 0
-
- return items, total, all_total, total_entered
- except Exception as e:
- logger.exception("获取文档列表失败")
- return [], 0, 0, 0
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_detail(self, doc_id: str) -> Optional[Dict[str, Any]]:
- """获取文档详情 (关联查询子表)"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- # 1. 查询主表 (关联用户表获取姓名)
- cursor.execute("""
- SELECT m.*, u1.username as creator_name, u2.username as updater_name
- FROM t_samp_document_main m
- LEFT JOIN t_sys_user u1 ON m.created_by = u1.id
- LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id
- WHERE m.id = %s
- """, (doc_id,))
- doc = cursor.fetchone()
- if not doc:
- return None
-
- # 2. 根据 source_type 查询对应的子表信息
- source_type = doc.get('source_type')
- table_name = TABLE_MAP.get(source_type)
-
- if table_name:
- # 关联子表的所有字段
- sub_sql = f"SELECT * FROM {table_name} WHERE id = %s"
- cursor.execute(sub_sql, (doc_id,))
- sub_data = cursor.fetchone()
-
- if sub_data:
- # 将子表字段合并到 doc 中,方便前端使用
- # 注意:如果字段名冲突,子表字段会覆盖主表字段(除了 id)
- sub_data.pop('id', None)
- doc.update(sub_data)
-
- return self._format_document_row(doc)
- except Exception as e:
- logger.exception("获取文档详情失败")
- return None
- finally:
- cursor.close()
- conn.close()
-
- def _to_int(self, value: Any) -> Optional[int]:
- """安全转换为整数"""
- if value is None or value == '' or str(value).lower() == 'null':
- return None
- try:
- return int(value)
- except (ValueError, TypeError):
- return None
- def _to_date(self, value: Any) -> Optional[str]:
- """安全处理日期字符串"""
- if value is None or value == '' or str(value).lower() == 'null':
- return None
- # 如果已经是 datetime.date 或 datetime.datetime 对象
- if hasattr(value, 'strftime'):
- return value.strftime('%Y-%m-%d')
- return str(value)
- async def add_document(self, doc_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]:
- """添加新文档(先主表后子表,解耦触发器)"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
-
- try:
- doc_id = str(uuid.uuid4())
- table_type = doc_data.get('table_type', 'standard')
- table_name = TABLE_MAP.get(table_type)
-
- # 安全转换字段
- release_date = self._to_date(doc_data.get('release_date'))
-
- # 处理 URL 存储(转为相对路径)
- file_url = self.minio_manager.get_relative_path(doc_data.get('file_url'))
-
- # 处理文档类型 (中文 -> 简写)
- doc_type_cn = doc_data.get('document_type')
- doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn) # 找不到则保持原样
- # 处理专业领域 (中文 -> 简写)
- prof_field_cn = doc_data.get('professional_field')
- prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
- # 处理方案类别 (中文 -> 简写)
- plan_category_cn = doc_data.get('plan_category')
- plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
- # 处理一级分类 (中文 -> 简写)
- level_1_cn = doc_data.get('level_1_classification')
- level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
- # 处理二级分类 (中文 -> 简写)
- level_2_cn = doc_data.get('level_2_classification')
- level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
- # 处理三级分类 (中文 -> 简写)
- level_3_cn = doc_data.get('level_3_classification')
- level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = doc_data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = doc_data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = doc_data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 1. 插入主表 (作为资产中心)
- cursor.execute(
- """
- INSERT INTO t_samp_document_main (
- id, title, source_type, file_url,
- file_extension, created_by, updated_by, created_time, updated_time,
- conversion_status, whether_to_task, kb_id
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s)
- """,
- (
- doc_id, doc_data.get('title'), table_type, file_url,
- doc_data.get('file_extension'), user_id, user_id,
- doc_data.get('kb_id')
- )
- )
- # 2. 插入子表 (仅存储业务字段)
- if table_type == 'standard':
- cursor.execute(
- f"""
- INSERT INTO {table_name} (
- id, chinese_name, english_name, standard_number, issuing_authority,
- release_date, implementation_date, drafting_unit, approving_department,
- participating_units, document_type, professional_field, engineering_phase,
- validity, reference_basis, source_url, note,
- created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """,
- (
- doc_id, doc_data.get('title'), doc_data.get('english_name'), doc_data.get('standard_no'),
- doc_data.get('issuing_authority'), release_date, self._to_date(doc_data.get('implementation_date')),
- doc_data.get('drafting_unit'), doc_data.get('approving_department'), doc_data.get('participating_units'),
- doc_type_code, prof_field_code, doc_data.get('engineering_phase'),
- doc_data.get('validity', '现行'), doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'),
- user_id, user_id
- )
- )
- elif table_type == 'construction_plan':
- cursor.execute(
- f"""
- INSERT INTO {table_name} (
- id, plan_name, project_name, project_section, compiling_unit,
- compiling_date, plan_summary, compilation_basis, plan_category,
- level_1_classification, level_2_classification, level_3_classification, level_4_classification,
- note, created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """,
- (
- doc_id, doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'),
- doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'),
- doc_data.get('compilation_basis'), plan_category_code, level_1_code or 'SC',
- level_2_code, level_3_code, level_4_code,
- doc_data.get('note'), user_id, user_id
- )
- )
- elif table_type == 'regulation':
- cursor.execute(
- f"""
- INSERT INTO {table_name} (
- id, file_name, issuing_department, document_type, publish_date,
- effective_start_date, effective_end_date, note,
- created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """,
- (
- doc_id, doc_data.get('title'), doc_data.get('issuing_authority'), doc_type_code,
- release_date, self._to_date(doc_data.get('effective_start_date')), self._to_date(doc_data.get('effective_end_date')),
- doc_data.get('note'), user_id, user_id
- )
- )
-
- # 3. 添加到任务管理中心 (类型为 data)
- try:
- await task_service.add_task(doc_id, 'data')
- except Exception as task_err:
- logger.error(f"添加文档 {doc_data.get('title')} 到任务中心失败: {task_err}")
- conn.commit()
- return True, "文档添加成功", doc_id
- except Exception as e:
- logger.exception("添加文档失败")
- conn.rollback()
- return False, str(e), None
- finally:
- cursor.close()
- conn.close()
- async def edit_document(self, doc_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
- """编辑文档(同步主表和子表,解耦触发器)"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- doc_id = doc_data.get('id')
- table_type = doc_data.get('table_type', 'standard')
- table_name = TABLE_MAP.get(table_type)
-
- # 安全转换字段
- release_date = self._to_date(doc_data.get('release_date'))
-
- # 处理 URL 存储(转为相对路径)
- file_url = self.minio_manager.get_relative_path(doc_data.get('file_url'))
-
- # 处理文档类型 (中文 -> 简写)
- doc_type_cn = doc_data.get('document_type')
- doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn) # 找不到则保持原样
- # 处理专业领域 (中文 -> 简写)
- prof_field_cn = doc_data.get('professional_field')
- prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
- # 处理方案类别 (中文 -> 简写)
- plan_category_cn = doc_data.get('plan_category')
- plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
- # 处理一级分类 (中文 -> 简写)
- level_1_cn = doc_data.get('level_1_classification')
- level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
- # 处理二级分类 (中文 -> 简写)
- level_2_cn = doc_data.get('level_2_classification')
- level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
- # 处理三级分类 (中文 -> 简写)
- level_3_cn = doc_data.get('level_3_classification')
- level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = doc_data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 1. 更新主表
- cursor.execute(
- """
- UPDATE t_samp_document_main SET
- title = %s, file_url = %s, file_extension = %s,
- updated_by = %s, updated_time = NOW(), kb_id = %s
- WHERE id = %s
- """,
- (
- doc_data.get('title'), file_url, doc_data.get('file_extension'),
- updater_id, doc_data.get('kb_id'), doc_id
- )
- )
- # 2. 更新子表
- if table_type == 'standard':
- cursor.execute(
- f"""
- UPDATE {table_name} SET
- chinese_name = %s, english_name = %s, standard_number = %s, issuing_authority = %s,
- release_date = %s, implementation_date = %s, drafting_unit = %s, approving_department = %s,
- participating_units = %s, document_type = %s, professional_field = %s, engineering_phase = %s,
- validity = %s, reference_basis = %s, source_url = %s, note = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """,
- (
- doc_data.get('title'), doc_data.get('english_name'), doc_data.get('standard_no'), doc_data.get('issuing_authority'),
- release_date, self._to_date(doc_data.get('implementation_date')), doc_data.get('drafting_unit'), doc_data.get('approving_department'),
- doc_data.get('participating_units'), doc_type_code, prof_field_code, doc_data.get('engineering_phase'),
- doc_data.get('validity'), doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'),
- updater_id, doc_id
- )
- )
- elif table_type == 'construction_plan':
- cursor.execute(
- f"""
- UPDATE {table_name} SET
- plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s,
- compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s,
- level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s,
- note = %s, updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """,
- (
- doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'),
- release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), plan_category_code,
- level_1_code, level_2_code, level_3_code, level_4_code,
- doc_data.get('note'), updater_id, doc_id
- )
- )
- elif table_type == 'regulation':
- cursor.execute(
- f"""
- UPDATE {table_name} SET
- file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s,
- effective_start_date = %s, effective_end_date = %s, note = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """,
- (
- doc_data.get('title'), doc_data.get('issuing_authority'), doc_type_code, release_date,
- self._to_date(doc_data.get('effective_start_date')), self._to_date(doc_data.get('effective_end_date')), doc_data.get('note'),
- updater_id, doc_id
- )
- )
- conn.commit()
- return True, "文档更新成功"
- except Exception as e:
- logger.exception("编辑文档失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
- async def enter_document(self, doc_id: str, username: str) -> Tuple[bool, str]:
- """文档入库(单个)"""
- affected_rows, message = await self.batch_enter_knowledge_base([doc_id], username)
- return affected_rows > 0, message
-
- async def get_basic_info_list(
- self,
- type: str,
- page: int,
- size: int,
- keyword: Optional[str] = None,
- **filters
- ) -> Tuple[List[Dict[str, Any]], int]:
- """获取基本信息列表(关联主表获取文件和转换状态)"""
- conn = get_db_connection()
- if not conn:
- return [], 0
-
- cursor = conn.cursor()
-
- try:
- # 根据类型选择表名和字段映射
- if type == 'standard':
- table_name = "t_samp_standard_base_info"
- # 关联主表字段:file_url, conversion_status, md_url, json_url
- fields = """
- s.id, s.chinese_name as title, s.standard_number as standard_no,
- s.issuing_authority, s.release_date, s.document_type,
- s.professional_field, s.validity, s.note,
- s.participating_units, s.reference_basis,
- s.created_by, u1.username as creator_name, s.created_time,
- s.updated_by, u2.username as updater_name, s.updated_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
- """
- field_map = {
- 'title': 's.chinese_name',
- 'standard_no': 's.standard_number',
- 'issuing_authority': 's.issuing_authority',
- 'release_date': 's.release_date',
- 'document_type': 's.document_type',
- 'professional_field': 's.professional_field',
- 'validity': 's.validity'
- }
- elif type == 'construction_plan':
- table_name = "t_samp_construction_plan_base_info"
- fields = """
- s.id, s.plan_name as title, NULL as standard_no,
- s.project_name, s.project_section,
- s.compiling_unit as issuing_authority, s.compiling_date as release_date,
- NULL as document_type, NULL as professional_field, NULL as validity,
- s.plan_summary, s.compilation_basis,
- s.plan_category, s.level_1_classification, s.level_2_classification,
- s.level_3_classification, s.level_4_classification,
- s.note,
- s.created_by, u1.username as creator_name, s.created_time,
- s.updated_by, u2.username as updater_name, s.updated_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
- """
- field_map = {
- 'title': 's.plan_name',
- 'issuing_authority': 's.compiling_unit',
- 'release_date': 's.compiling_date',
- 'plan_category': 's.plan_category',
- 'level_1_classification': 's.level_1_classification',
- 'level_2_classification': 's.level_2_classification',
- 'level_3_classification': 's.level_3_classification',
- 'level_4_classification': 's.level_4_classification'
- }
- elif type == 'regulation':
- table_name = "t_samp_office_regulations"
- fields = """
- s.id, s.file_name as title, NULL as standard_no,
- s.issuing_department as issuing_authority, s.publish_date as release_date,
- s.document_type, NULL as professional_field, NULL as validity,
- s.note,
- s.created_by, u1.username as creator_name, s.created_time,
- s.updated_by, u2.username as updater_name, s.updated_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
- """
- field_map = {
- 'title': 's.file_name',
- 'issuing_authority': 's.issuing_department',
- 'release_date': 's.publish_date',
- 'document_type': 's.document_type'
- }
- else:
- return [], 0
-
- where_clauses = []
- params = []
-
- # 统一关键字搜索
- if keyword:
- if type == 'standard':
- where_clauses.append("(s.chinese_name LIKE %s OR s.standard_number LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
- elif type == 'construction_plan':
- where_clauses.append("s.plan_name LIKE %s")
- params.append(f"%{keyword}%")
- field_map['issuing_authority'] = 's.compiling_unit'
- field_map['release_date'] = 's.compiling_date'
- elif type == 'regulation':
- where_clauses.append("s.file_name LIKE %s")
- params.append(f"%{keyword}%")
- field_map['issuing_authority'] = 's.issuing_department'
- field_map['release_date'] = 's.publish_date'
-
- # 精细化检索
- for filter_key, filter_value in filters.items():
- if not filter_value:
- continue
-
- # 特殊处理日期范围
- date_field = field_map.get('release_date', 's.release_date')
- if filter_key == 'release_date_start':
- where_clauses.append(f"{date_field} >= %s")
- params.append(filter_value)
- continue
- if filter_key == 'release_date_end':
- where_clauses.append(f"{date_field} <= %s")
- params.append(filter_value)
- continue
- db_field = field_map.get(filter_key)
- if db_field:
- # 处理文档类型查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'document_type':
- filter_value = DOCUMENT_TYPE_MAP.get(filter_value, filter_value)
-
- # 处理专业领域查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'professional_field':
- filter_value = PROFESSIONAL_FIELD_MAP.get(filter_value, filter_value)
-
- # 处理方案类别查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'plan_category':
- filter_value = PLAN_CATEGORY_MAP.get(filter_value, filter_value)
- # 处理一级分类查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'level_1_classification':
- filter_value = FIRST_LEVEL_MAP.get(filter_value, filter_value)
- # 处理二级分类查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'level_2_classification':
- filter_value = SECOND_LEVEL_MAP.get(filter_value, filter_value)
- # 处理三级分类查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'level_3_classification':
- filter_value = THIRD_LEVEL_MAP.get(filter_value, filter_value)
- # 处理四级分类查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'level_4_classification':
- filter_value = FOURTH_LEVEL_MAP.get(filter_value, filter_value)
- # 如果是 title, standard_no, issuing_authority,支持模糊查询
- if filter_key in ['title', 'standard_no', 'issuing_authority']:
- where_clauses.append(f"{db_field} LIKE %s")
- params.append(f"%{filter_value}%")
- else:
- where_clauses.append(f"{db_field} = %s")
- params.append(filter_value)
-
- where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
- offset = (page - 1) * size
-
- # 使用 LEFT JOIN 关联主表和用户表获取姓名
- sql = f"""
- SELECT {fields}, kb.name as kb_name
- FROM {table_name} s
- LEFT JOIN t_samp_document_main m ON s.id = m.id
- LEFT JOIN t_sys_user u1 ON s.created_by = u1.id
- LEFT JOIN t_sys_user u2 ON s.updated_by = u2.id
- LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id
- {where_sql}
- ORDER BY s.created_time DESC
- LIMIT %s OFFSET %s
- """
- params = params + [size, offset]
-
- cursor.execute(sql, tuple(params))
- items = cursor.fetchall()
-
- # 处理 URL 转换
- for item in items:
- # 处理文档类型显示 (简写 -> 中文)
- doc_type_code = item.get('document_type')
- if doc_type_code in DOCUMENT_TYPE_REVERSE_MAP:
- item['document_type'] = DOCUMENT_TYPE_REVERSE_MAP[doc_type_code]
-
- # 处理专业领域显示 (简写 -> 中文)
- prof_field_code = item.get('professional_field')
- if prof_field_code in PROFESSIONAL_FIELD_REVERSE_MAP:
- item['professional_field'] = PROFESSIONAL_FIELD_REVERSE_MAP[prof_field_code]
- # 处理方案类别显示 (简写 -> 中文)
- plan_category_code = item.get('plan_category')
- if plan_category_code in PLAN_CATEGORY_REVERSE_MAP:
- item['plan_category'] = PLAN_CATEGORY_REVERSE_MAP[plan_category_code]
- # 处理一级分类显示 (简写 -> 中文)
- level_1_code = item.get('level_1_classification')
- if level_1_code in FIRST_LEVEL_REVERSE_MAP:
- item['level_1_classification'] = FIRST_LEVEL_REVERSE_MAP[level_1_code]
- # 处理二级分类显示 (简写 -> 中文)
- level_2_code = item.get('level_2_classification')
- if level_2_code in SECOND_LEVEL_REVERSE_MAP:
- item['level_2_classification'] = SECOND_LEVEL_REVERSE_MAP[level_2_code]
- # 处理三级分类显示 (简写 -> 中文)
- level_3_code = item.get('level_3_classification')
- if level_3_code in THIRD_LEVEL_REVERSE_MAP:
- item['level_3_classification'] = THIRD_LEVEL_REVERSE_MAP[level_3_code]
- # 处理四级分类显示 (简写 -> 中文)
- level_4_code = item.get('level_4_classification')
- if level_4_code in FOURTH_LEVEL_REVERSE_MAP:
- item['level_4_classification'] = FOURTH_LEVEL_REVERSE_MAP[level_4_code]
- for key in ['file_url', 'md_url', 'json_url']:
- if item.get(key):
- item[key] = self.minio_manager.get_full_url(item[key])
-
- # 总数
- count_sql = f"SELECT COUNT(*) as count FROM {table_name} s {where_sql}"
- cursor.execute(count_sql, tuple(params[:-2]))
- res = cursor.fetchone()
- total = res['count'] if res else 0
-
- return items, total
- except Exception as e:
- logger.exception(f"获取 {type} 列表失败")
- return [], 0
- finally:
- cursor.close()
- conn.close()
- # ==================== 文档转换 ====================
-
- async def get_document_source_type(self, doc_id: str) -> Optional[str]:
- """获取文档的source_type"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("SELECT source_type FROM t_samp_document_main WHERE id = %s", (doc_id,))
- res = cursor.fetchone()
- return res['source_type'] if res else None
- except Exception as e:
- logger.exception(f"获取文档source_type失败: {e}")
- return None
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_title(self, doc_id: str) -> str:
- """获取文档标题"""
- conn = get_db_connection()
- if not conn:
- return "文档"
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (doc_id,))
- res = cursor.fetchone()
- return res['title'] if res else "文档"
- except Exception as e:
- logger.exception(f"获取文档标题失败: {e}")
- return "文档"
- finally:
- cursor.close()
- conn.close()
-
- async def update_conversion_status(self, doc_id: str, status: int,
- md_url: Optional[str] = None,
- json_url: Optional[str] = None,
- error_message: Optional[str] = None) -> bool:
- """更新文档转换状态
-
- Args:
- doc_id: 文档ID
- status: 转换状态 (0=未转换, 1=转换中, 2=已完成, 3=失败)
- md_url: Markdown文件URL
- json_url: JSON文件URL
- error_message: 错误信息
- """
- conn = get_db_connection()
- if not conn:
- return False
-
- cursor = conn.cursor()
-
- try:
- update_clauses = ["conversion_status = %s"]
- params = [status]
- if error_message:
- update_clauses.append("conversion_error = %s")
- params.append(error_message)
-
- if md_url:
- update_clauses.append("md_url = %s")
- params.append(md_url)
-
- if json_url:
- update_clauses.append("json_url = %s")
- params.append(json_url)
-
- sql = f"UPDATE t_samp_document_main SET {', '.join(update_clauses)}, updated_time = NOW() WHERE id = %s"
- params.append(doc_id)
-
- cursor.execute(sql, tuple(params))
- conn.commit()
- return True
- except Exception as e:
- logger.exception(f"更新转换进度失败: {e}")
- conn.rollback()
- return False
- finally:
- cursor.close()
- conn.close()
- # ==================== 基础信息管理 ====================
- async def add_basic_info(self, type: str, data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]:
- """新增基本信息"""
- logger.info(f"Adding basic info for type {type}: {data}")
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型", None
-
- doc_id = str(uuid.uuid4())
- file_url = data.get('file_url')
- file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
-
- # 处理文档类型 (中文 -> 简写)
- doc_type_cn = data.get('document_type')
- doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn)
-
- # 处理专业领域 (中文 -> 简写)
- prof_field_cn = data.get('professional_field')
- prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
- # 处理方案类别 (中文 -> 简写)
- plan_category_cn = data.get('plan_category')
- plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
- # 处理一级分类 (中文 -> 简写)
- level_1_cn = data.get('level_1_classification')
- level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
- # 处理二级分类 (中文 -> 简写)
- level_2_cn = data.get('level_2_classification')
- level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
- # 处理三级分类 (中文 -> 简写)
- level_3_cn = data.get('level_3_classification')
- level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 1. 插入主表 (解耦触发器,手动同步)
- cursor.execute(
- """
- INSERT INTO t_samp_document_main (
- id, title, source_type, file_url,
- file_extension, created_by, updated_by, created_time, updated_time,
- conversion_status, whether_to_task, kb_id
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s)
- """,
- (
- doc_id, data.get('title'), type, file_url,
- file_extension, user_id, user_id, data.get('kb_id')
- )
- )
-
- # 2. 插入子表 (移除 file_url,因为它现在只存储 in 主表中)
- if type == 'standard':
- sql = f"""
- INSERT INTO {table_name} (
- id, chinese_name, english_name, standard_number, issuing_authority,
- release_date, implementation_date, drafting_unit, approving_department,
- participating_units,
- document_type, professional_field, engineering_phase,
- validity, reference_basis, source_url, note,
- created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """
- params = (
- doc_id, data.get('title'), data.get('english_name'), data.get('standard_no'),
- data.get('issuing_authority'), self._to_date(data.get('release_date')), self._to_date(data.get('implementation_date')),
- data.get('drafting_unit'), data.get('approving_department'),
- data.get('participating_units'),
- doc_type_code, prof_field_code, data.get('engineering_phase'),
- data.get('validity', '现行'), data.get('reference_basis'), data.get('source_url'), data.get('note'),
- user_id, user_id
- )
- elif type == 'construction_plan':
- sql = f"""
- INSERT INTO {table_name} (
- id, plan_name, project_name, project_section, compiling_unit,
- compiling_date, plan_summary, compilation_basis, plan_category,
- level_1_classification, level_2_classification, level_3_classification, level_4_classification,
- note, created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """
- params = (
- doc_id, data.get('title'), data.get('project_name'), data.get('project_section'),
- data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('plan_summary'),
- data.get('compilation_basis'), plan_category_code, level_1_code or 'SC',
- level_2_code, level_3_code, level_4_code,
- data.get('note'), user_id, user_id
- )
- elif type == 'regulation':
- sql = f"""
- INSERT INTO {table_name} (
- id, file_name, issuing_department, document_type, publish_date,
- effective_start_date, effective_end_date, note,
- created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """
- params = (
- doc_id, data.get('title'), data.get('issuing_authority'), doc_type_code,
- self._to_date(data.get('release_date')), self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')),
- data.get('note'), user_id, user_id
- )
- else:
- return False, "不支持的类型", None
-
- cursor.execute(sql, params)
-
- # 3. 添加到任务管理中心 (类型为 data)
- try:
- # 尝试调用异步方法,如果报错则记录日志
- import asyncio
- try:
- # 检查是否有正在运行的事件循环
- loop = asyncio.get_event_loop()
- if loop.is_running():
- # 在运行的循环中创建任务
- loop.create_task(task_service.add_task(doc_id, 'data'))
- else:
- # 否则使用 run 运行(不推荐在 web 环境下这样做,但这里是兜底)
- loop.run_until_complete(task_service.add_task(doc_id, 'data'))
- except RuntimeError:
- # 没有事件循环时
- asyncio.run(task_service.add_task(doc_id, 'data'))
- except Exception as task_err:
- logger.error(f"添加基本信息 {data.get('title')} 到任务中心失败: {task_err}")
- conn.commit()
- return True, "新增成功", doc_id
- except Exception as e:
- logger.exception("新增基本信息失败")
- conn.rollback()
- return False, str(e), None
- finally:
- cursor.close()
- conn.close()
- async def edit_basic_info(self, type: str, doc_id: str, data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
- """编辑基本信息"""
- logger.info(f"Editing basic info for type {type}, id {doc_id}: {data}")
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型"
-
- # 处理 URL 存储(转为相对路径)
- file_url = self.minio_manager.get_relative_path(data.get('file_url'))
- file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
- # 处理文档类型 (中文 -> 简写)
- doc_type_cn = data.get('document_type')
- doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn)
- # 处理专业领域 (中文 -> 简写)
- prof_field_cn = data.get('professional_field')
- prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
- # 处理方案类别 (中文 -> 简写)
- plan_category_cn = data.get('plan_category')
- plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
- # 处理一级分类 (中文 -> 简写)
- level_1_cn = data.get('level_1_classification')
- level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
- # 处理二级分类 (中文 -> 简写)
- level_2_cn = data.get('level_2_classification')
- level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
- # 处理三级分类 (中文 -> 简写)
- level_3_cn = data.get('level_3_classification')
- level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 1. 更新主表 (解耦触发器)
- cursor.execute(
- """
- UPDATE t_samp_document_main
- SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW(), kb_id = %s
- WHERE id = %s
- """,
- (data.get('title'), file_url, file_extension, updater_id, data.get('kb_id'), doc_id)
- )
- # 2. 更新子表 (移除 file_url)
- if type == 'standard':
- sql = f"""
- UPDATE {table_name}
- SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s,
- document_type = %s, professional_field = %s, validity = %s,
- english_name = %s, implementation_date = %s, drafting_unit = %s,
- approving_department = %s, engineering_phase = %s,
- participating_units = %s,
- reference_basis = %s,
- source_url = %s, note = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
- params = (
- data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
- doc_type_code, prof_field_code, data.get('validity'),
- data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'),
- data.get('approving_department'), data.get('engineering_phase'),
- data.get('participating_units'),
- data.get('reference_basis'),
- data.get('source_url'), data.get('note'),
- updater_id, doc_id
- )
- elif type == 'construction_plan':
- sql = f"""
- UPDATE {table_name}
- SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s,
- compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s,
- level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s,
- note = %s, updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
- params = (
- data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'),
- self._to_date(data.get('release_date')), data.get('plan_summary'), data.get('compilation_basis'), plan_category_code,
- level_1_code, level_2_code, level_3_code, level_4_code,
- data.get('note'), updater_id, doc_id
- )
-
- elif type == 'regulation':
- sql = f"""
- UPDATE {table_name}
- SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s,
- effective_start_date = %s, effective_end_date = %s, note = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
- params = (
- data.get('title'), data.get('issuing_authority'), doc_type_code, self._to_date(data.get('release_date')),
- self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')), data.get('note'),
- updater_id, doc_id
- )
- else:
- return False, "不支持的类型"
-
- cursor.execute(sql, params)
-
- conn.commit()
- return True, "编辑成功"
- except Exception as e:
- logger.exception("编辑基本信息失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
- async def delete_basic_info(self, type: str, doc_id: str) -> Tuple[bool, str]:
- """删除基本信息"""
- if not doc_id:
- return False, "缺少 ID 参数"
-
- logger.info(f"Deleting basic info: type={type}, id={doc_id}")
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型"
-
- # 1. 显式删除子表记录 (防止 CASCADE 未生效)
- try:
- cursor.execute(f"DELETE FROM {table_name} WHERE id = %s", (doc_id,))
- logger.info(f"Deleted from sub-table {table_name}, affected: {cursor.rowcount}")
- except Exception as sub_e:
- logger.warning(f"删除子表 {table_name} 记录失败 (可能不存在): {sub_e}")
- # 2. 同步删除任务管理中心的数据 (优先删除关联数据)
- try:
- # 使用当前事务删除任务记录(如果 task_service 支持的话,目前它自建连接)
- # 这里我们直接在当前 cursor 中也执行一次,确保事务一致性
- cursor.execute("DELETE FROM t_task_management WHERE business_id = %s", (doc_id,))
- logger.info(f"Deleted from t_task_management, affected: {cursor.rowcount}")
- except Exception as task_e:
- logger.warning(f"在主事务中删除任务记录失败: {task_e}")
- # 3. 删除主表记录
- cursor.execute("DELETE FROM t_samp_document_main WHERE id = %s", (doc_id,))
- affected_main = cursor.rowcount
- logger.info(f"Deleted from t_samp_document_main, affected: {affected_main}")
-
- if affected_main == 0:
- logger.warning(f"未找到主表记录: {doc_id}")
- # 即使主表没找到,我们也 commit 之前的操作并返回成功(幂等性)
-
- conn.commit()
-
- # 4. 再次确保任务中心数据已删除 (调用原有服务)
- try:
- await task_service.delete_task(doc_id)
- except Exception as task_err:
- logger.error(f"调用 task_service 删除任务失败: {task_err}")
- return True, "删除成功"
- except Exception as e:
- logger.exception(f"删除基本信息异常 (ID: {doc_id})")
- conn.rollback()
- return False, f"删除失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
|