| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962 |
- """
- 样本中心服务层
- 从 sample_view.py 提取的SQL查询逻辑
- """
- import logging
- import uuid
- from datetime import datetime
- from typing import Optional, List, Dict, Any, Tuple
- from app.base.async_mysql_connection import get_db_connection
- from app.base.minio_connection import get_minio_manager
- from app.core.config import config_handler
- from app.services.milvus_service import milvus_service
- from app.services.task_service import task_service
- logger = logging.getLogger(__name__)
- # 表名映射
- TABLE_MAP = {
- 'standard': 't_samp_standard_base_info',
- 'construction_plan': 't_samp_construction_plan_base_info',
- 'regulation': 't_samp_office_regulations'
- }
- # 文档类型映射 (前端显示 -> 数据库存储)
- DOCUMENT_TYPE_MAP = {
- "国家标准": "GB",
- "行业标准": "HY",
- "部门规章": "BM",
- "地方标准": "DB",
- "企业标准": "QY",
- "管理制度": "GL",
- "技术规范": "GF",
- "团体标准": "TT",
- "国际标准": "GJ",
- "国家法律": "FL",
- "地方法规": "LR",
- "其他": "QT"
- }
- # 文档类型反向映射 (数据库存储 -> 前端显示)
- # 包含旧版本简写以保证存量数据兼容性
- DOCUMENT_TYPE_REVERSE_MAP = {v: k for k, v in DOCUMENT_TYPE_MAP.items()}
- DOCUMENT_TYPE_REVERSE_MAP.update({
- "HB": "行业标准",
- "QB": "企业标准",
- "GLZD": "管理制度",
- "JSGF": "技术规范",
- "TB": "团体标准",
- "IB": "国际标准",
- "OTHER": "其他"
- })
- # 专业领域映射 (前端显示 -> 数据库存储)
- PROFESSIONAL_FIELD_MAP = {
- "法律法规": "FL",
- "通用标准": "TY",
- "勘察钻探": "KC",
- "地基基础": "DJ",
- "路基路面": "LJ",
- "桥梁工程": "QL",
- "隧道工程": "SD",
- "交通工程": "JT",
- "建筑工程": "JZ",
- "市政工程": "SZ",
- "机电安装": "JD",
- "路桥工程": "LB",
- "装饰装修": "ZS",
- "港口航道": "GK",
- "铁路工程": "TL",
- "房建工程": "FJ",
- "水利电力": "SL",
- "信息化": "XX",
- "试验检测": "SY",
- "安全环保": "AQ",
- "其他": "QT"
- }
- # 四级分类反向映射 (数据库存储 -> 前端显示)
- PROFESSIONAL_FIELD_REVERSE_MAP = {v: k for k, v in PROFESSIONAL_FIELD_MAP.items()}
- # 时效性映射 (前端显示 -> 数据库存储)
- VALIDITY_MAP = {
- "现行": "XH",
- "废止": "FZ",
- "试行": "SX"
- }
- # 时效性反向映射 (数据库存储 -> 前端显示)
- VALIDITY_REVERSE_MAP = {v: k for k, v in VALIDITY_MAP.items()}
- VALIDITY_REVERSE_MAP.update({
- "现行": "现行",
- "已废止": "废止",
- "被替代": "废止"
- })
- # 方案类别映射 (前端显示 -> 数据库存储)
- PLAN_CATEGORY_MAP = {
- "超危大方案": "CH",
- "超危大方案较大Ⅱ级": "CH2",
- "超危大方案特大Ⅳ级": "CH4",
- "超危大方案一般Ⅰ级": "CH1",
- "超危大方案重大Ⅲ级": "CH3",
- "危大方案": "WD",
- "一般方案": "YB"
- }
- # 方案类别反向映射 (数据库存储 -> 前端显示)
- PLAN_CATEGORY_REVERSE_MAP = {v: k for k, v in PLAN_CATEGORY_MAP.items()}
- # 一级分类映射 (前端显示 -> 数据库存储)
- FIRST_LEVEL_MAP = {
- "施工方案": "SC"
- }
- # 一级分类反向映射 (数据库存储 -> 前端显示)
- FIRST_LEVEL_REVERSE_MAP = {v: k for k, v in FIRST_LEVEL_MAP.items()}
- # 二级分类映射 (前端显示 -> 数据库存储)
- SECOND_LEVEL_MAP = {
- "临建工程": "LZ",
- "路基工程": "LJ",
- "桥梁工程": "QL",
- "隧道工程": "SD",
- "其他": "QT"
- }
- # 二级分类反向映射 (数据库存储 -> 前端显示)
- SECOND_LEVEL_REVERSE_MAP = {v: k for k, v in SECOND_LEVEL_MAP.items()}
- # 三级分类映射 (前端显示 -> 数据库存储)
- THIRD_LEVEL_MAP = {
- "TBM施工": "TM",
- "拌和站安、拆施工": "BH",
- "不良地质隧道施工": "BL",
- "常规桥梁": "CG",
- "挡土墙工程类": "DT",
- "辅助坑道施工": "FB",
- "复杂洞口工程施工": "FD",
- "钢筋加工场安、拆": "GG",
- "钢栈桥施工": "GZ",
- "拱桥": "GH",
- "涵洞工程类": "HD",
- "滑坡体处理类": "HP",
- "路堤": "LT",
- "路堑": "LQ",
- "其他": "QT",
- "深基坑": "JK",
- "隧道总体施工": "ZT",
- "特殊结构隧道": "TS",
- "斜拉桥": "XL",
- "悬索桥": "XS"
- }
- # 三级分类反向映射 (数据库存储 -> 前端显示)
- THIRD_LEVEL_REVERSE_MAP = {v: k for k, v in THIRD_LEVEL_MAP.items()}
- # 四级分类映射 (前端显示 -> 数据库存储)
- FOURTH_LEVEL_MAP = {
- "挡土墙": "DT",
- "顶管": "DG",
- "断层破碎带及软弱围岩": "DL",
- "钢筋砼箱涵": "GX",
- "高填路堤": "GT",
- "抗滑桩": "KH",
- "软岩大变形隧道": "RY",
- "上部结构": "SB",
- "深基坑开挖与支护": "JK",
- "深挖路堑": "LC",
- "隧道TBM": "TM",
- "隧道进洞": "JD",
- "隧道竖井": "SJ",
- "隧道斜井": "XJ",
- "特种设备": "TZ",
- "瓦斯隧道": "WS",
- "下部结构": "XB",
- "小净距隧道": "NJ",
- "岩爆隧道": "YB",
- "岩溶隧道": "YR",
- "涌水突泥隧道": "YN",
- "桩基础": "ZJ",
- "其他": "QT"
- }
- # 四级分类反向映射 (数据库存储 -> 前端显示)
- FOURTH_LEVEL_REVERSE_MAP = {v: k for k, v in FOURTH_LEVEL_MAP.items()}
- def get_table_name(source_type: str) -> Optional[str]:
- """根据source_type获取表名"""
- return TABLE_MAP.get(source_type)
- class SampleService:
- """样本中心服务类 - 使用 SQL 查询方式"""
-
- _initialized = False
- def __init__(self):
- """初始化服务"""
- # 使用统一的 MinIO 管理器
- self.minio_manager = get_minio_manager()
-
- # 使用全局 Milvus 服务
- self.milvus_service = milvus_service
-
- # 确保集合已创建 (仅在首次初始化时执行)
- if not SampleService._initialized:
- try:
- self.milvus_service.ensure_collections()
- SampleService._initialized = True
- except Exception as e:
- logger.error(f"初始化 Milvus 集合失败: {e}")
- def _format_document_row(self, item: Dict[str, Any]) -> Dict[str, Any]:
- """格式化文档行数据,处理URL、日期映射和文件名显示"""
- if not item:
- return item
-
- # 1. 处理 URL 转换
- for key in ['file_url', 'md_url', 'json_url']:
- if item.get(key):
- item[key] = self.minio_manager.get_full_url(item[key])
-
- # 2. 映射字段以适配前端通用显示
- source_type = item.get('source_type')
-
- # 处理文档类型显示 (简写 -> 中文)
- doc_type_code = item.get('document_type')
- if doc_type_code in DOCUMENT_TYPE_REVERSE_MAP:
- item['document_type'] = DOCUMENT_TYPE_REVERSE_MAP[doc_type_code]
-
- # 处理专业领域显示 (简写 -> 中文)
- prof_field_code = item.get('professional_field')
- if prof_field_code in PROFESSIONAL_FIELD_REVERSE_MAP:
- item['professional_field'] = PROFESSIONAL_FIELD_REVERSE_MAP[prof_field_code]
- # 处理时效性显示 (简写 -> 中文)
- validity_code = item.get('validity')
- if validity_code in VALIDITY_REVERSE_MAP:
- item['validity'] = VALIDITY_REVERSE_MAP[validity_code]
- # 处理一级分类显示 (简写 -> 中文)
- level_1_code = item.get('level_1_classification')
- if level_1_code in FIRST_LEVEL_REVERSE_MAP:
- item['level_1_classification'] = FIRST_LEVEL_REVERSE_MAP[level_1_code]
- # 处理二级分类显示 (简写 -> 中文)
- level_2_code = item.get('level_2_classification')
- if level_2_code in SECOND_LEVEL_REVERSE_MAP:
- item['level_2_classification'] = SECOND_LEVEL_REVERSE_MAP[level_2_code]
- # 处理三级分类显示 (简写 -> 中文)
- level_3_code = item.get('level_3_classification')
- if level_3_code in THIRD_LEVEL_REVERSE_MAP:
- item['level_3_classification'] = THIRD_LEVEL_REVERSE_MAP[level_3_code]
- # 处理四级分类显示 (简写 -> 中文)
- level_4_code = item.get('level_4_classification')
- if level_4_code in FOURTH_LEVEL_REVERSE_MAP:
- item['level_4_classification'] = FOURTH_LEVEL_REVERSE_MAP[level_4_code]
- # 处理方案类别显示 (简写 -> 中文)
- plan_category_code = item.get('plan_category')
- if plan_category_code in PLAN_CATEGORY_REVERSE_MAP:
- item['plan_category'] = PLAN_CATEGORY_REVERSE_MAP[plan_category_code]
- if source_type == 'standard':
- # 标准规范特有映射
- if 'standard_number' in item:
- item['standard_no'] = item.get('standard_number')
- elif source_type == 'construction_plan':
- # 施工方案特有映射
- item['issuing_authority'] = item.get('compiling_unit')
- item['release_date'] = item.get('compiling_date')
- elif source_type == 'regulation':
- # 办公制度特有映射
- item['issuing_authority'] = item.get('issuing_department')
- item['release_date'] = item.get('publish_date')
-
- # 3. 格式化时间
- date_keys = [
- 'created_time', 'updated_time', 'release_date',
- 'publish_date', 'compiling_date', 'implementation_date',
- 'effective_start_date', 'effective_end_date'
- ]
- for key in date_keys:
- val = item.get(key)
- if val and hasattr(val, 'isoformat'):
- item[key] = val.isoformat()
- elif val is not None:
- item[key] = str(val)
-
- # 4. 增加格式化文件名供前端显示
- if item.get('conversion_status') == 2:
- title = item.get('title', 'document')
- item['md_display_name'] = f"{title}.md"
- item['json_display_name'] = f"{title}.json"
-
- return item
- async def get_upload_url(self, filename: str, content_type: str, prefix: str = None) -> Tuple[bool, str, Dict[str, Any]]:
- """获取 MinIO 预签名上传 URL"""
- try:
- data = self.minio_manager.get_upload_url(filename, content_type, prefix=prefix)
- return True, "成功获取上传链接", data
- except Exception as e:
- logger.exception("生成上传链接失败")
- return False, f"生成上传链接失败: {str(e)}", {}
- # ==================== 文档管理 ====================
-
- async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str, kb_method: str = "general", chunk_size: int = 500, separator: str = "。") -> Tuple[int, str]:
- """批量将文档入库到知识库
-
- Args:
- doc_ids: 文档ID列表
- username: 操作人
- kb_method: 切分方法
- chunk_size: 切分长度
- separator: 切分符号
- """
- conn = get_db_connection()
- if not conn:
- return 0, "数据库连接失败,请检查数据库服务状态"
-
- cursor = conn.cursor()
- success_count = 0
- already_entered_count = 0
- failed_count = 0
- error_details = []
-
- try:
- # 1. 获取所有选中选中的文档详情
- placeholders = ','.join(['%s']*len(doc_ids))
- fetch_sql = f"""
- SELECT id, title, source_type, md_url, conversion_status, whether_to_enter, created_time, kb_id, file_url
- FROM t_samp_document_main
- WHERE id IN ({placeholders})
- """
- cursor.execute(fetch_sql, tuple(doc_ids))
- selected_docs = cursor.fetchall()
-
- if not selected_docs:
- return 0, "选中的文档在数据库中不存在"
- # 2. 逐份处理
- for doc in selected_docs:
- doc_id = doc['id']
- title = doc.get('title', '未命名文档')
- status = doc.get('conversion_status')
- whether_to_enter = doc.get('whether_to_enter', 0)
- md_url = doc.get('md_url')
- source_type = doc.get('source_type')
-
- # A. 检查是否已入库
- if whether_to_enter == 1:
- logger.info(f"文档 {title}({doc_id}) 已入库,跳过二次入库")
- already_entered_count += 1
- error_details.append(f"· {title}: 已在知识库中,无需重复入库")
- continue
- # B. 检查转换状态
- if status != 2:
- reason = "尚未转换成功" if status == 0 else "正在转换中" if status == 1 else "转换失败"
- logger.warning(f"文档 {title}({doc_id}) 状态为 {status},入库失败: {reason}")
- failed_count += 1
- error_details.append(f"· {title}: {reason}")
- continue
-
- if not md_url:
- logger.warning(f"文档 {title}({doc_id}) 缺少 md_url,入库失败")
- failed_count += 1
- error_details.append(f"· {title}: 转换结果地址丢失")
- continue
-
- # C. 确定入库策略 (从数据库读取已绑定的知识库)
- current_kb_id = doc.get('kb_id')
- current_kb_method = kb_method # 直接使用前端传来的切分方式
- if not current_kb_id:
- logger.warning(f"文档 {title}({doc_id}) 未指定知识库,跳过入库")
- failed_count += 1
- error_details.append(f"· {title}: 未指定目标知识库")
- continue
- if not current_kb_method:
- logger.warning(f"文档 {title}({doc_id}) 未指定切分方式,跳过入库")
- failed_count += 1
- error_details.append(f"· {title}: 未指定切分策略")
- continue
- # 获取知识库信息 (collection_name_parent, collection_name_children)
- kb_sql = "SELECT collection_name_parent, collection_name_children FROM t_samp_knowledge_base WHERE id = %s AND is_deleted = 0"
- cursor.execute(kb_sql, (current_kb_id,))
- kb_res = cursor.fetchone()
-
- if not kb_res:
- logger.warning(f"找不到指定的知识库: id={current_kb_id}")
- failed_count += 1
- error_details.append(f"· {title}: 指定的知识库不存在或已被删除")
- continue
-
- collection_name_parent = kb_res['collection_name_parent']
- collection_name_children = kb_res['collection_name_children']
-
- # D. 从 MinIO 获取 Markdown 内容
- try:
- md_content = self.minio_manager.get_object_content(md_url)
- if not md_content:
- raise ValueError(f"无法从 MinIO 读取内容 (URL: {md_url})")
- except Exception as minio_err:
- logger.error(f"读取文档 {title} 内容失败: {minio_err}")
- failed_count += 1
- error_details.append(f"· {title}: 读取云端文件失败")
- continue
-
- # E. 调用 MilvusService 进行切分和入库
- try:
- # 获取业务元数据
- business_metadata = {}
- if source_type in ['standard', 'basis']:
- std_sql = """
- SELECT id as basic_info_id, chinese_name, standard_number, issuing_authority,
- document_type, professional_field, validity
- FROM t_samp_standard_base_info
- WHERE id = %s
- """
- cursor.execute(std_sql, (doc_id,))
- business_metadata = cursor.fetchone() or {}
- elif source_type == 'construction_plan':
- plan_sql = """
- SELECT id as basic_info_id, plan_name, project_name, project_section, compiling_unit,
- compiling_date, plan_summary, plan_category,
- level_1_classification, level_2_classification,
- level_3_classification, level_4_classification
- FROM t_samp_construction_plan_base_info
- WHERE id = %s
- """
- cursor.execute(plan_sql, (doc_id,))
- business_metadata = cursor.fetchone() or {}
- elif source_type == 'regulation':
- reg_sql = """
- SELECT id as basic_info_id, file_name, issuing_department as issuing_authority,
- document_type, publish_date as release_date, note
- FROM t_samp_office_regulations
- WHERE id = %s
- """
- cursor.execute(reg_sql, (doc_id,))
- business_metadata = cursor.fetchone() or {}
-
- # 强制转换 document_type 为简写 (处理存量数据或异常情况)
- if 'document_type' in business_metadata and business_metadata['document_type']:
- doc_type = business_metadata['document_type']
- business_metadata['document_type'] = DOCUMENT_TYPE_MAP.get(doc_type, doc_type)
- # 强制转换 professional_field 为简写
- if 'professional_field' in business_metadata and business_metadata['professional_field']:
- prof_field = business_metadata['professional_field']
- business_metadata['professional_field'] = PROFESSIONAL_FIELD_MAP.get(prof_field, prof_field)
- # 强制转换 validity 为简写
- if 'validity' in business_metadata and business_metadata['validity']:
- validity = business_metadata['validity']
- business_metadata['validity'] = VALIDITY_MAP.get(validity, validity)
- # 强制转换 plan_category 为简写
- if 'plan_category' in business_metadata and business_metadata['plan_category']:
- plan_cat = business_metadata['plan_category']
- business_metadata['plan_category'] = PLAN_CATEGORY_MAP.get(plan_cat, plan_cat)
- # 强制转换 level_1_classification 为简写
- if 'level_1_classification' in business_metadata and business_metadata['level_1_classification']:
- l1_class = business_metadata['level_1_classification']
- business_metadata['level_1_classification'] = FIRST_LEVEL_MAP.get(l1_class, l1_class)
- # 强制转换 level_2_classification 为简写
- if 'level_2_classification' in business_metadata and business_metadata['level_2_classification']:
- l2_class = business_metadata['level_2_classification']
- business_metadata['level_2_classification'] = SECOND_LEVEL_MAP.get(l2_class, l2_class)
- # 强制转换 level_3_classification 为简写
- if 'level_3_classification' in business_metadata and business_metadata['level_3_classification']:
- l3_class = business_metadata['level_3_classification']
- business_metadata['level_3_classification'] = THIRD_LEVEL_MAP.get(l3_class, l3_class)
- # 强制转换 level_4_classification 为简写
- if 'level_4_classification' in business_metadata and business_metadata['level_4_classification']:
- l4_class = business_metadata['level_4_classification']
- business_metadata['level_4_classification'] = FOURTH_LEVEL_MAP.get(l4_class, l4_class)
- # 统一使用主表的 file_url,确保数据来源一致,入库时使用完整 URL
- raw_file_url = doc.get('file_url', '')
- business_metadata['file_url'] = self.minio_manager.get_full_url(raw_file_url) if raw_file_url else ''
- # 准备元数据
- current_date = int(datetime.now().strftime('%Y%m%d'))
- doc_info = {
- "doc_id": doc_id,
- "file_name": title,
- "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else current_date,
- "tags": "",
- "user_id": username, # 传递操作人作为 created_by
- "kb_id": current_kb_id,
- "kb_method": current_kb_method,
- "collection_name_parent": collection_name_parent,
- "collection_name_children": collection_name_children,
- "chunk_size": chunk_size,
- "separator": separator,
- "source_type": source_type,
- "business_metadata": business_metadata # 注入业务元数据
- }
- await self.milvus_service.insert_knowledge(md_content, doc_info)
-
- # F. 更新数据库状态
- update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 1, kb_id = %s, kb_method = %s, updated_by = %s, updated_time = NOW() WHERE id = %s"
- cursor.execute(update_sql, (current_kb_id, current_kb_method, username, doc_id))
- success_count += 1
-
- except Exception as milvus_err:
- logger.exception(f"文档 {title} 写入向量库失败")
- failed_count += 1
- error_details.append(f"· {title}: 写入向量库失败 ({str(milvus_err)})")
- continue
-
- conn.commit()
-
- # 构造详细的消息
- if success_count == len(doc_ids) and failed_count == 0 and already_entered_count == 0:
- msg = f"✅ 入库成功!共处理 {success_count} 份文档。"
- else:
- msg = f"📊 入库处理完成:\n· 成功:{success_count} 份\n"
- if already_entered_count > 0:
- msg += f"· 跳过:{already_entered_count} 份 (已入库)\n"
- if failed_count > 0:
- msg += f"· 失败:{failed_count} 份\n"
-
- if error_details:
- detailed_msg = msg + "\n⚠️ 详情说明:\n" + "\n".join(error_details)
- return success_count, detailed_msg
-
- return success_count, msg
-
- except Exception as e:
- logger.exception(f"文档批量入库异常: {e}")
- conn.rollback()
- return 0, f"操作异常: {str(e)}"
- finally:
- cursor.close()
- conn.close()
-
- async def batch_add_to_task(self, doc_ids: List[str], username: str, project_name: str, task_tags: Optional[List[str]] = None) -> Tuple[bool, str]:
- """批量将文档加入标注任务中心 (单表化)
-
- Args:
- doc_ids: 文档ID列表
- username: 操作人
- project_name: 项目名称 (作为 project_id)
- task_tags: 标注任务标签列表 (例如 ["标签1", "标签2"])
- """
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- if not doc_ids:
- return False, "未指定要加入任务的文档 ID"
-
- # 0. 每次点击都生成一个新的项目 UUID (不再检查重名复用)
- import uuid
- project_id = str(uuid.uuid4())
-
- # 处理标签:转换为 JSON 字符串存储
- import json
- tag_str = json.dumps(task_tags, ensure_ascii=False) if task_tags else None
-
- # 1. 过滤掉未入库的文档
- placeholders = ', '.join(['%s'] * len(doc_ids))
- check_entered_sql = f"SELECT id FROM t_samp_document_main WHERE id IN ({placeholders}) AND whether_to_enter = 1"
- cursor.execute(check_entered_sql, doc_ids)
- entered_ids = [row['id'] for row in cursor.fetchall()]
-
- unentered_count = len(doc_ids) - len(entered_ids)
- if not entered_ids:
- return False, "所选文档均未入库,无法加入标注任务中心"
- # 2. 更新 whether_to_task 状态
- ids_to_add = entered_ids
- add_placeholders = ', '.join(['%s'] * len(ids_to_add))
- sql = f"UPDATE t_samp_document_main SET whether_to_task = 1, updated_by = %s, updated_time = NOW() WHERE id IN ({add_placeholders})"
- cursor.execute(sql, (username, *ids_to_add))
- # 2.5 清理选定文档之前的空任务记录(处理存量僵尸数据)
- cursor.execute(f"DELETE FROM t_task_management WHERE business_id IN ({add_placeholders}) AND project_id IS NULL", ids_to_add)
-
- # 3. 批量获取元数据并写入任务管理表
- EXCLUDE_FIELDS = {
- 'id', 'created_time', 'updated_time', 'created_by', 'updated_by',
- 'conversion_status', 'whether_to_enter', 'whether_to_task',
- 'kb_method', 'whether_to_delete'
- }
-
- # 3.1 批量查询主表信息
- cursor.execute(f"SELECT * FROM t_samp_document_main WHERE id IN ({add_placeholders})", ids_to_add)
- all_doc_main = cursor.fetchall()
- doc_main_map = {doc['id']: doc for doc in all_doc_main}
-
- # 3.2 按来源类型分组查询子表信息
- source_type_groups = {}
- for doc in all_doc_main:
- st = doc.get('source_type')
- if st:
- if st not in source_type_groups: source_type_groups[st] = []
- source_type_groups[st].append(doc['id'])
-
- all_sub_data_map = {}
- for st, ids in source_type_groups.items():
- table_name = TABLE_MAP.get(st)
- if table_name:
- placeholders_sub = ', '.join(['%s'] * len(ids))
- cursor.execute(f"SELECT * FROM {table_name} WHERE id IN ({placeholders_sub})", ids)
- sub_rows = cursor.fetchall()
- for row in sub_rows:
- all_sub_data_map[row['id']] = row
-
- # 3.3 批量准备数据
- task_records = []
- for doc_id in ids_to_add:
- metadata_dict = {}
- doc_main = doc_main_map.get(doc_id)
- if doc_main:
- for k, v in doc_main.items():
- if v is not None and v != '' and k not in EXCLUDE_FIELDS:
- metadata_dict[k] = v
-
- sub_data = all_sub_data_map.get(doc_id)
- if sub_data:
- for k, v in sub_data.items():
- if v is not None and v != '' and k not in EXCLUDE_FIELDS:
- metadata_dict[k] = v
-
- metadata_dict = task_service._serialize_datetime(metadata_dict)
- task_records.append((
- doc_id, None, project_id, project_name, 'data', 'pending',
- tag_str, json.dumps(metadata_dict, ensure_ascii=False) if metadata_dict else None
- ))
- # 3.4 批量写入任务管理表
- if task_records:
- logger.info(f"正在批量写入 {len(task_records)} 条任务记录到数据库...")
- sql_insert = """
- INSERT INTO t_task_management (business_id, task_id, project_id, project_name, type, annotation_status, tag, metadata)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- task_id = IFNULL(VALUES(task_id), task_id),
- project_id = IFNULL(VALUES(project_id), project_id),
- project_name = IFNULL(VALUES(project_name), project_name),
- annotation_status = IFNULL(VALUES(annotation_status), annotation_status),
- tag = IFNULL(VALUES(tag), tag),
- metadata = IFNULL(VALUES(metadata), metadata)
- """
- cursor.executemany(sql_insert, task_records)
-
- conn.commit()
- logger.info(f"数据库写入完成,已提交。正在释放连接...")
- cursor.close()
- conn.close()
- conn = None # 防止 finally 中重复关闭
- cursor = None
- # 4. 自动推送至外部标注平台 (在连接关闭后执行,防止死锁 and 长事务)
- logger.info(f"开始推送项目 {project_name} (ID: {project_id}) 至外部平台...")
- push_success, push_msg = await task_service.send_to_external_platform(project_id)
- logger.info(f"外部平台推送结果: success={push_success}, msg={push_msg}")
-
- msg = f"成功将 {len(ids_to_add)} 份文档加入项目: {project_name}"
- if push_success:
- msg += f" (已推送: {push_msg})"
- else:
- msg += f" (推送失败: {push_msg})"
- if unentered_count > 0:
- msg += f",{unentered_count} 份文档因未入库被跳过"
- return True, msg
- except Exception as e:
- logger.exception("批量加入任务失败")
- if conn:
- conn.rollback()
- return False, f"操作失败: {str(e)}"
- finally:
- if cursor:
- cursor.close()
- if conn:
- conn.close()
- async def clear_knowledge_base_data(self, doc_ids: List[str], username: str) -> Tuple[int, str]:
- """批量清空文档在知识库(Milvus)中的数据片段"""
- conn = get_db_connection()
- if not conn:
- return 0, "数据库连接失败"
-
- cursor = conn.cursor()
- success_count = 0
- error_details = []
-
- try:
- # 1. 获取文档的知识库信息
- placeholders = ', '.join(['%s'] * len(doc_ids))
- sql = f"""
- SELECT id, title, kb_id, whether_to_enter
- FROM t_samp_document_main
- WHERE id IN ({placeholders})
- """
- cursor.execute(sql, doc_ids)
- docs = cursor.fetchall()
-
- for doc in docs:
- doc_id = doc['id']
- title = doc.get('title', '未命名')
- kb_id = doc.get('kb_id')
-
- # 只有入库了的才需要清空
- if doc.get('whether_to_enter') != 1:
- continue
-
- if not kb_id:
- error_details.append(f"· {title}: 未关联知识库,无法清空")
- continue
-
- # 获取集合名称
- cursor.execute("SELECT collection_name_parent, collection_name_children FROM t_samp_knowledge_base WHERE id = %s", (kb_id,))
- kb_info = cursor.fetchone()
-
- if not kb_info:
- error_details.append(f"· {title}: 找不到关联的知识库配置")
- continue
-
- # 2. 从 Milvus 删除
- try:
- collections = [kb_info.get('collection_name_parent'), kb_info.get('collection_name_children')]
- for coll_name in collections:
- if coll_name and self.milvus_service.client.has_collection(coll_name):
- # 使用 document_id 进行过滤删除
- self.milvus_service.client.delete(
- collection_name=coll_name,
- filter=f'document_id == "{doc_id}"'
- )
-
- # 3. 更新数据库状态
- update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 0, updated_by = %s, updated_time = NOW() WHERE id = %s"
- cursor.execute(update_sql, (username, doc_id))
- success_count += 1
- except Exception as milvus_err:
- logger.error(f"清空文档 {title} Milvus 数据失败: {milvus_err}")
- error_details.append(f"· {title}: 清空向量库数据失败")
-
- conn.commit()
-
- msg = f"成功清空 {success_count} 份文档的知识库片段"
- if error_details:
- msg += "\n部分操作受限:\n" + "\n".join(error_details)
-
- return success_count, msg
-
- except Exception as e:
- logger.exception("批量清空知识库数据失败")
- conn.rollback()
- return 0, f"操作失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
- async def batch_delete_documents(self, doc_ids: List[str]) -> Tuple[int, str]:
- """批量删除文档 (仅限未入库或已清空的文档)"""
- conn = get_db_connection()
- if not conn:
- return 0, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- if not doc_ids:
- return 0, "未指定要删除的文档 ID"
-
- # 1. 检查文档状态:已入库的文档不允许直接删除
- placeholders = ', '.join(['%s'] * len(doc_ids))
- check_sql = f"SELECT id, title FROM t_samp_document_main WHERE id IN ({placeholders}) AND whether_to_enter = 1"
- cursor.execute(check_sql, doc_ids)
- entered_docs = cursor.fetchall()
-
- if entered_docs:
- titles = [d['title'] for d in entered_docs]
- return 0, f"删除失败:以下文档已入库,请先执行‘清空数据’操作后再删除:\n{', '.join(titles[:5])}{'...' if len(titles)>5 else ''}"
-
- # 2. 执行物理删除
- # 尝试同步删除子表中的数据
- try:
- for sub_table in TABLE_MAP.values():
- sub_sql = f"DELETE FROM {sub_table} WHERE id IN ({placeholders})"
- try:
- cursor.execute(sub_sql, doc_ids)
- except Exception as sub_e:
- logger.error(f"删除子表 {sub_table} 数据失败: {sub_e}")
- except Exception as sync_e:
- logger.error(f"同步删除子表数据失败: {sync_e}")
-
- # 删除主表 t_samp_document_main 中的数据
- sql_main = f"DELETE FROM t_samp_document_main WHERE id IN ({placeholders})"
- cursor.execute(sql_main, doc_ids)
- affected_rows = cursor.rowcount
-
- # 同步删除任务管理中心的数据
- try:
- from app.services.task_service import task_service
- for doc_id in doc_ids:
- # 注意:task_service 的删除方法可能叫 delete_task_by_business_id 或类似,这里假设存在
- # 如果没有,我们需要查出主键 ID 再删
- cursor.execute("DELETE FROM t_task_management WHERE business_id = %s", (doc_id,))
- except Exception as task_err:
- logger.error(f"同步删除任务中心数据失败: {task_err}")
- conn.commit()
-
- return affected_rows, f"成功删除 {affected_rows} 条文档数据"
- except Exception as e:
- logger.exception("批量删除失败")
- conn.rollback()
- return 0, f"批量删除失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
- async def get_tag_tree(self) -> List[Dict[str, Any]]:
- """获取标签层级树 (从 t_samp_tag_category 查询)"""
- conn = get_db_connection()
- if not conn:
- return []
-
- cursor = conn.cursor()
- try:
- sql = """
- SELECT id, parent_id, name, level, type
- FROM t_samp_tag_category
- WHERE is_deleted = 0 AND status = 1
- ORDER BY level ASC, sort_no ASC
- """
- cursor.execute(sql)
- tags = cursor.fetchall()
-
- # 构建树形结构
- tag_dict = {tag['id']: {**tag, 'children': []} for tag in tags}
- tree = []
- for tag_id, tag_item in tag_dict.items():
- parent_id = tag_item['parent_id']
- if parent_id == 0:
- tree.append(tag_item)
- elif parent_id in tag_dict:
- tag_dict[parent_id]['children'].append(tag_item)
-
- return tree
- except Exception as e:
- logger.error(f"获取标签树失败: {e}")
- return []
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_list(
- self,
- whether_to_enter: Optional[int] = None,
- conversion_status: Optional[int] = None,
- keyword: Optional[str] = None,
- table_type: Optional[str] = None,
- plan_category: Optional[str] = None,
- level_2_classification: Optional[str] = None,
- level_3_classification: Optional[str] = None,
- level_4_classification: Optional[str] = None,
- page: int = 1,
- size: int = 50
- ) -> Tuple[List[Dict[str, Any]], int, int, int]:
- """获取文档列表 (支持关联查询子表)"""
- conn = get_db_connection()
- if not conn:
- return [], 0, 0, 0
-
- cursor = conn.cursor()
-
- try:
- where_clauses = []
- params = []
-
- # 基础查询
- if table_type:
- where_clauses.append("m.source_type = %s")
- params.append(table_type)
-
- if table_type in TABLE_MAP:
- # 如果指定了类型且在 TABLE_MAP 中,使用 LEFT JOIN 关联查询,以便搜索子表字段
- sub_table = TABLE_MAP[table_type]
- from_sql = f"""
- t_samp_document_main m
- LEFT JOIN {sub_table} s ON m.id = s.id
- LEFT JOIN t_sys_user u1 ON m.created_by = u1.id
- LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id
- LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id
- """
- fields_sql = "m.*, s.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name, m.id as id"
-
- # 施工方案特有的过滤字段
- if table_type == 'construction_plan':
- if plan_category:
- where_clauses.append("s.plan_category = %s")
- params.append(plan_category)
- if level_2_classification:
- where_clauses.append("s.level_2_classification = %s")
- params.append(SECOND_LEVEL_MAP.get(level_2_classification, level_2_classification))
- if level_3_classification:
- where_clauses.append("s.level_3_classification = %s")
- params.append(THIRD_LEVEL_MAP.get(level_3_classification, level_3_classification))
- if level_4_classification:
- where_clauses.append("s.level_4_classification = %s")
- params.append(FOURTH_LEVEL_MAP.get(level_4_classification, level_4_classification))
- else:
- from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id"
- fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name"
- else:
- from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id"
- fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name"
-
- order_sql = "m.created_time DESC"
- title_field = "m.title"
-
- # 分离 whether_to_enter 与 conversion_status 的过滤逻辑
- if whether_to_enter is not None:
- where_clauses.append("m.whether_to_enter = %s")
- params.append(whether_to_enter)
-
- if conversion_status is not None:
- where_clauses.append("m.conversion_status = %s")
- params.append(conversion_status)
-
- if keyword:
- where_clauses.append(f"{title_field} LIKE %s")
- params.append(f"%{keyword}%")
-
- where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
- offset = (page - 1) * size
-
- sql = f"SELECT {fields_sql} FROM {from_sql} {where_sql} ORDER BY {order_sql} LIMIT %s OFFSET %s"
- params.extend([size, offset])
-
- cursor.execute(sql, tuple(params))
- items = [self._format_document_row(row) for row in cursor.fetchall()]
-
- # 总数
- count_sql = f"SELECT COUNT(*) as count FROM {from_sql} {where_sql}"
- cursor.execute(count_sql, tuple(params[:-2]))
- res = cursor.fetchone()
- total = res['count'] if res else 0
-
- # 统计数据
- cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main")
- res = cursor.fetchone()
- all_total = res['count'] if res else 0
-
- cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main WHERE whether_to_enter = 1")
- res = cursor.fetchone()
- total_entered = res['count'] if res else 0
-
- return items, total, all_total, total_entered
- except Exception as e:
- logger.exception("获取文档列表失败")
- return [], 0, 0, 0
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_detail(self, doc_id: str) -> Optional[Dict[str, Any]]:
- """获取文档详情 (关联查询子表)"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- # 1. 查询主表 (关联用户表获取姓名)
- cursor.execute("""
- SELECT m.*, u1.username as creator_name, u2.username as updater_name
- FROM t_samp_document_main m
- LEFT JOIN t_sys_user u1 ON m.created_by = u1.id
- LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id
- WHERE m.id = %s
- """, (doc_id,))
- doc = cursor.fetchone()
- if not doc:
- return None
-
- # 2. 根据 source_type 查询对应的子表信息
- source_type = doc.get('source_type')
- table_name = TABLE_MAP.get(source_type)
-
- if table_name:
- # 关联子表的所有字段
- sub_sql = f"SELECT * FROM {table_name} WHERE id = %s"
- cursor.execute(sub_sql, (doc_id,))
- sub_data = cursor.fetchone()
-
- if sub_data:
- # 将子表字段合并到 doc 中,方便前端使用
- # 注意:如果字段名冲突,子表字段会覆盖主表字段(除了 id)
- sub_data.pop('id', None)
- doc.update(sub_data)
-
- return self._format_document_row(doc)
- except Exception as e:
- logger.exception("获取文档详情失败")
- return None
- finally:
- cursor.close()
- conn.close()
-
- def _to_int(self, value: Any) -> Optional[int]:
- """安全转换为整数"""
- if value is None or value == '' or str(value).lower() == 'null':
- return None
- try:
- return int(value)
- except (ValueError, TypeError):
- return None
- def _to_date(self, value: Any) -> Optional[str]:
- """安全处理日期字符串"""
- if value is None or value == '' or str(value).lower() == 'null':
- return None
- # 如果已经是 datetime.date 或 datetime.datetime 对象
- if hasattr(value, 'strftime'):
- return value.strftime('%Y-%m-%d')
- return str(value)
- async def add_document(self, doc_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]:
- """添加新文档(先主表后子表,解耦触发器)"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
-
- try:
- doc_id = str(uuid.uuid4())
- table_type = doc_data.get('table_type', 'standard')
- table_name = TABLE_MAP.get(table_type)
-
- # 安全转换字段
- release_date = self._to_date(doc_data.get('release_date'))
-
- # 处理 URL 存储(转为相对路径)
- file_url = self.minio_manager.get_relative_path(doc_data.get('file_url'))
-
- # 处理文档类型 (中文 -> 简写)
- doc_type_cn = doc_data.get('document_type')
- doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn) # 找不到则保持原样
- # 处理专业领域 (中文 -> 简写)
- prof_field_cn = doc_data.get('professional_field')
- prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
- # 处理时效性 (中文 -> 简写)
- validity_cn = doc_data.get('validity')
- validity_code = VALIDITY_MAP.get(validity_cn, validity_cn)
- # 处理方案类别 (中文 -> 简写)
- plan_category_cn = doc_data.get('plan_category')
- plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
- # 处理一级分类 (中文 -> 简写)
- level_1_cn = doc_data.get('level_1_classification')
- level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
- # 处理二级分类 (中文 -> 简写)
- level_2_cn = doc_data.get('level_2_classification')
- level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
- # 处理三级分类 (中文 -> 简写)
- level_3_cn = doc_data.get('level_3_classification')
- level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = doc_data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = doc_data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = doc_data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 1. 插入主表 (作为资产中心)
- cursor.execute(
- """
- INSERT INTO t_samp_document_main (
- id, title, source_type, file_url,
- file_extension, created_by, updated_by, created_time, updated_time,
- conversion_status, whether_to_task, kb_id
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s)
- """,
- (
- doc_id, doc_data.get('title'), table_type, file_url,
- doc_data.get('file_extension'), user_id, user_id,
- doc_data.get('kb_id')
- )
- )
- # 2. 插入子表 (仅存储业务字段)
- if table_type == 'standard':
- cursor.execute(
- f"""
- INSERT INTO {table_name} (
- id, chinese_name, english_name, standard_number, issuing_authority,
- release_date, implementation_date, drafting_unit, approving_department,
- participating_units, document_type, professional_field, engineering_phase,
- validity, reference_basis, source_url, note,
- created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """,
- (
- doc_id, doc_data.get('title'), doc_data.get('english_name'), doc_data.get('standard_no'),
- doc_data.get('issuing_authority'), release_date, self._to_date(doc_data.get('implementation_date')),
- doc_data.get('drafting_unit'), doc_data.get('approving_department'), doc_data.get('participating_units'),
- doc_type_code, prof_field_code, doc_data.get('engineering_phase'),
- validity_code, doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'),
- user_id, user_id
- )
- )
- elif table_type == 'construction_plan':
- cursor.execute(
- f"""
- INSERT INTO {table_name} (
- id, plan_name, project_name, project_section, compiling_unit,
- compiling_date, plan_summary, compilation_basis, plan_category,
- level_1_classification, level_2_classification, level_3_classification, level_4_classification,
- note, created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """,
- (
- doc_id, doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'),
- doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'),
- doc_data.get('compilation_basis'), plan_category_code, level_1_code or 'SC',
- level_2_code, level_3_code, level_4_code,
- doc_data.get('note'), user_id, user_id
- )
- )
- elif table_type == 'regulation':
- cursor.execute(
- f"""
- INSERT INTO {table_name} (
- id, file_name, issuing_department, document_type, publish_date,
- effective_start_date, effective_end_date, note,
- created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """,
- (
- doc_id, doc_data.get('title'), doc_data.get('issuing_authority'), doc_type_code,
- release_date, self._to_date(doc_data.get('effective_start_date')), self._to_date(doc_data.get('effective_end_date')),
- doc_data.get('note'), user_id, user_id
- )
- )
-
- conn.commit()
- return True, "文档添加成功", doc_id
- except Exception as e:
- logger.exception("添加文档失败")
- conn.rollback()
- return False, str(e), None
- finally:
- cursor.close()
- conn.close()
- async def edit_document(self, doc_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
- """编辑文档(同步主表和子表,解耦触发器)"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- doc_id = doc_data.get('id')
- table_type = doc_data.get('table_type', 'standard')
- table_name = TABLE_MAP.get(table_type)
-
- # 安全转换字段
- release_date = self._to_date(doc_data.get('release_date'))
-
- # 处理 URL 存储(转为相对路径)
- file_url = self.minio_manager.get_relative_path(doc_data.get('file_url'))
-
- # 处理文档类型 (中文 -> 简写)
- doc_type_cn = doc_data.get('document_type')
- doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn) # 找不到则保持原样
- # 处理专业领域 (中文 -> 简写)
- prof_field_cn = doc_data.get('professional_field')
- prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
- # 处理时效性 (中文 -> 简写)
- validity_cn = doc_data.get('validity')
- validity_code = VALIDITY_MAP.get(validity_cn, validity_cn)
- # 处理方案类别 (中文 -> 简写)
- plan_category_cn = doc_data.get('plan_category')
- plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
- # 处理一级分类 (中文 -> 简写)
- level_1_cn = doc_data.get('level_1_classification')
- level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
- # 处理二级分类 (中文 -> 简写)
- level_2_cn = doc_data.get('level_2_classification')
- level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
- # 处理三级分类 (中文 -> 简写)
- level_3_cn = doc_data.get('level_3_classification')
- level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = doc_data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 1. 更新主表
- cursor.execute(
- """
- UPDATE t_samp_document_main SET
- title = %s, file_url = %s, file_extension = %s,
- updated_by = %s, updated_time = NOW(), kb_id = %s
- WHERE id = %s
- """,
- (
- doc_data.get('title'), file_url, doc_data.get('file_extension'),
- updater_id, doc_data.get('kb_id'), doc_id
- )
- )
- # 2. 更新子表
- if table_type == 'standard':
- cursor.execute(
- f"""
- UPDATE {table_name} SET
- chinese_name = %s, english_name = %s, standard_number = %s, issuing_authority = %s,
- release_date = %s, implementation_date = %s, drafting_unit = %s, approving_department = %s,
- participating_units = %s, document_type = %s, professional_field = %s, engineering_phase = %s,
- validity = %s, reference_basis = %s, source_url = %s, note = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """,
- (
- doc_data.get('title'), doc_data.get('english_name'), doc_data.get('standard_no'), doc_data.get('issuing_authority'),
- release_date, self._to_date(doc_data.get('implementation_date')), doc_data.get('drafting_unit'), doc_data.get('approving_department'),
- doc_data.get('participating_units'), doc_type_code, prof_field_code, doc_data.get('engineering_phase'),
- validity_code, doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'),
- updater_id, doc_id
- )
- )
- elif table_type == 'construction_plan':
- cursor.execute(
- f"""
- UPDATE {table_name} SET
- plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s,
- compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s,
- level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s,
- note = %s, updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """,
- (
- doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'),
- release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), plan_category_code,
- level_1_code, level_2_code, level_3_code, level_4_code,
- doc_data.get('note'), updater_id, doc_id
- )
- )
- elif table_type == 'regulation':
- cursor.execute(
- f"""
- UPDATE {table_name} SET
- file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s,
- effective_start_date = %s, effective_end_date = %s, note = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """,
- (
- doc_data.get('title'), doc_data.get('issuing_authority'), doc_type_code, release_date,
- self._to_date(doc_data.get('effective_start_date')), self._to_date(doc_data.get('effective_end_date')), doc_data.get('note'),
- updater_id, doc_id
- )
- )
- conn.commit()
- return True, "文档更新成功"
- except Exception as e:
- logger.exception("编辑文档失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
- async def enter_document(self, doc_id: str, username: str) -> Tuple[bool, str]:
- """文档入库(单个)"""
- affected_rows, message = await self.batch_enter_knowledge_base([doc_id], username)
- return affected_rows > 0, message
-
- async def get_basic_info_list(
- self,
- type: str,
- page: int,
- size: int,
- keyword: Optional[str] = None,
- **filters
- ) -> Tuple[List[Dict[str, Any]], int]:
- """获取基本信息列表(关联主表获取文件和转换状态)"""
- conn = get_db_connection()
- if not conn:
- return [], 0
-
- cursor = conn.cursor()
-
- try:
- # 根据类型选择表名和字段映射
- if type == 'standard':
- table_name = "t_samp_standard_base_info"
- # 关联主表字段:file_url, conversion_status, md_url, json_url
- fields = """
- s.id, s.chinese_name as title, s.standard_number as standard_no,
- s.issuing_authority, s.release_date, s.implementation_date,
- s.drafting_unit, s.approving_department,
- s.document_type, s.professional_field, s.validity, s.note,
- s.participating_units, s.reference_basis,
- s.created_by, u1.username as creator_name, s.created_time,
- s.updated_by, u2.username as updater_name, s.updated_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter,
- m.source_type
- """
- field_map = {
- 'title': 's.chinese_name',
- 'standard_no': 's.standard_number',
- 'issuing_authority': 's.issuing_authority',
- 'release_date': 's.release_date',
- 'document_type': 's.document_type',
- 'professional_field': 's.professional_field',
- 'validity': 's.validity'
- }
- elif type == 'construction_plan':
- table_name = "t_samp_construction_plan_base_info"
- fields = """
- s.id, s.plan_name as title, NULL as standard_no,
- s.project_name, s.project_section,
- s.compiling_unit as issuing_authority, s.compiling_date as release_date,
- NULL as implementation_date, NULL as drafting_unit, NULL as approving_department,
- NULL as document_type, NULL as professional_field, NULL as validity,
- s.plan_summary, s.compilation_basis,
- s.plan_category, s.level_1_classification, s.level_2_classification,
- s.level_3_classification, s.level_4_classification,
- s.note,
- s.created_by, u1.username as creator_name, s.created_time,
- s.updated_by, u2.username as updater_name, s.updated_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter,
- m.source_type
- """
- field_map = {
- 'title': 's.plan_name',
- 'issuing_authority': 's.compiling_unit',
- 'release_date': 's.compiling_date',
- 'plan_category': 's.plan_category',
- 'level_1_classification': 's.level_1_classification',
- 'level_2_classification': 's.level_2_classification',
- 'level_3_classification': 's.level_3_classification',
- 'level_4_classification': 's.level_4_classification'
- }
- elif type == 'regulation':
- table_name = "t_samp_office_regulations"
- fields = """
- s.id, s.file_name as title, NULL as standard_no,
- s.issuing_department as issuing_authority, s.publish_date as release_date,
- s.document_type, NULL as professional_field, NULL as validity,
- s.note,
- s.created_by, u1.username as creator_name, s.created_time,
- s.updated_by, u2.username as updater_name, s.updated_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter,
- m.source_type
- """
- field_map = {
- 'title': 's.file_name',
- 'issuing_authority': 's.issuing_department',
- 'release_date': 's.publish_date',
- 'document_type': 's.document_type'
- }
- else:
- return [], 0
-
- where_clauses = []
- params = []
-
- # 统一关键字搜索
- if keyword:
- if type == 'standard':
- where_clauses.append("(s.chinese_name LIKE %s OR s.standard_number LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
- elif type == 'construction_plan':
- where_clauses.append("s.plan_name LIKE %s")
- params.append(f"%{keyword}%")
- field_map['issuing_authority'] = 's.compiling_unit'
- field_map['release_date'] = 's.compiling_date'
- elif type == 'regulation':
- where_clauses.append("s.file_name LIKE %s")
- params.append(f"%{keyword}%")
- field_map['issuing_authority'] = 's.issuing_department'
- field_map['release_date'] = 's.publish_date'
-
- # 精细化检索
- for filter_key, filter_value in filters.items():
- if not filter_value:
- continue
-
- # 特殊处理日期范围
- date_field = field_map.get('release_date', 's.release_date')
- if filter_key == 'release_date_start':
- where_clauses.append(f"{date_field} >= %s")
- params.append(filter_value)
- continue
- if filter_key == 'release_date_end':
- where_clauses.append(f"{date_field} <= %s")
- params.append(filter_value)
- continue
- db_field = field_map.get(filter_key)
- if db_field:
- # 处理文档类型查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'document_type':
- filter_value = DOCUMENT_TYPE_MAP.get(filter_value, filter_value)
-
- # 处理专业领域查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'professional_field':
- filter_value = PROFESSIONAL_FIELD_MAP.get(filter_value, filter_value)
-
- # 处理方案类别查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'plan_category':
- filter_value = PLAN_CATEGORY_MAP.get(filter_value, filter_value)
- # 处理一级分类查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'level_1_classification':
- filter_value = FIRST_LEVEL_MAP.get(filter_value, filter_value)
- # 处理二级分类查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'level_2_classification':
- filter_value = SECOND_LEVEL_MAP.get(filter_value, filter_value)
- # 处理三级分类查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'level_3_classification':
- filter_value = THIRD_LEVEL_MAP.get(filter_value, filter_value)
- # 处理四级分类查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'level_4_classification':
- filter_value = FOURTH_LEVEL_MAP.get(filter_value, filter_value)
- # 处理时效性查询 (前端传中文 -> 数据库查简写)
- if filter_key == 'validity':
- filter_value = VALIDITY_MAP.get(filter_value, filter_value)
- # 如果是 title, standard_no, issuing_authority,支持模糊查询
- if filter_key in ['title', 'standard_no', 'issuing_authority']:
- where_clauses.append(f"{db_field} LIKE %s")
- params.append(f"%{filter_value}%")
- else:
- where_clauses.append(f"{db_field} = %s")
- params.append(filter_value)
-
- where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
- offset = (page - 1) * size
-
- # 使用 LEFT JOIN 关联主表和用户表获取姓名
- sql = f"""
- SELECT {fields}, kb.name as kb_name
- FROM {table_name} s
- LEFT JOIN t_samp_document_main m ON s.id = m.id
- LEFT JOIN t_sys_user u1 ON s.created_by = u1.id
- LEFT JOIN t_sys_user u2 ON s.updated_by = u2.id
- LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id
- {where_sql}
- ORDER BY s.created_time DESC
- LIMIT %s OFFSET %s
- """
- params = params + [size, offset]
-
- cursor.execute(sql, tuple(params))
- items = cursor.fetchall()
-
- # 处理 URL 转换
- for item in items:
- # 处理文档类型显示 (简写 -> 中文)
- doc_type_code = item.get('document_type')
- if doc_type_code in DOCUMENT_TYPE_REVERSE_MAP:
- item['document_type'] = DOCUMENT_TYPE_REVERSE_MAP[doc_type_code]
-
- # 处理专业领域显示 (简写 -> 中文)
- prof_field_code = item.get('professional_field')
- if prof_field_code in PROFESSIONAL_FIELD_REVERSE_MAP:
- item['professional_field'] = PROFESSIONAL_FIELD_REVERSE_MAP[prof_field_code]
- # 处理方案类别显示 (简写 -> 中文)
- plan_category_code = item.get('plan_category')
- if plan_category_code in PLAN_CATEGORY_REVERSE_MAP:
- item['plan_category'] = PLAN_CATEGORY_REVERSE_MAP[plan_category_code]
- # 处理一级分类显示 (简写 -> 中文)
- level_1_code = item.get('level_1_classification')
- if level_1_code in FIRST_LEVEL_REVERSE_MAP:
- item['level_1_classification'] = FIRST_LEVEL_REVERSE_MAP[level_1_code]
- # 处理二级分类显示 (简写 -> 中文)
- level_2_code = item.get('level_2_classification')
- if level_2_code in SECOND_LEVEL_REVERSE_MAP:
- item['level_2_classification'] = SECOND_LEVEL_REVERSE_MAP[level_2_code]
- # 处理三级分类显示 (简写 -> 中文)
- level_3_code = item.get('level_3_classification')
- if level_3_code in THIRD_LEVEL_REVERSE_MAP:
- item['level_3_classification'] = THIRD_LEVEL_REVERSE_MAP[level_3_code]
- # 处理四级分类显示 (简写 -> 中文)
- level_4_code = item.get('level_4_classification')
- if level_4_code in FOURTH_LEVEL_REVERSE_MAP:
- item['level_4_classification'] = FOURTH_LEVEL_REVERSE_MAP[level_4_code]
- # 处理时效性显示 (简写 -> 中文)
- validity_code = item.get('validity')
- if validity_code in VALIDITY_REVERSE_MAP:
- item['validity'] = VALIDITY_REVERSE_MAP[validity_code]
- for key in ['file_url', 'md_url', 'json_url']:
- if item.get(key):
- item[key] = self.minio_manager.get_full_url(item[key])
-
- # 总数
- count_sql = f"SELECT COUNT(*) as count FROM {table_name} s {where_sql}"
- cursor.execute(count_sql, tuple(params[:-2]))
- res = cursor.fetchone()
- total = res['count'] if res else 0
-
- return items, total
- except Exception as e:
- logger.exception(f"获取 {type} 列表失败")
- return [], 0
- finally:
- cursor.close()
- conn.close()
- # ==================== 文档转换 ====================
-
- async def get_document_source_type(self, doc_id: str) -> Optional[str]:
- """获取文档的source_type"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("SELECT source_type FROM t_samp_document_main WHERE id = %s", (doc_id,))
- res = cursor.fetchone()
- return res['source_type'] if res else None
- except Exception as e:
- logger.exception(f"获取文档source_type失败: {e}")
- return None
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_title(self, doc_id: str) -> str:
- """获取文档标题"""
- conn = get_db_connection()
- if not conn:
- return "文档"
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (doc_id,))
- res = cursor.fetchone()
- return res['title'] if res else "文档"
- except Exception as e:
- logger.exception(f"获取文档标题失败: {e}")
- return "文档"
- finally:
- cursor.close()
- conn.close()
-
- async def update_conversion_status(self, doc_id: str, status: int,
- md_url: Optional[str] = None,
- json_url: Optional[str] = None,
- error_message: Optional[str] = None) -> bool:
- """更新文档转换状态
-
- Args:
- doc_id: 文档ID
- status: 转换状态 (0=未转换, 1=转换中, 2=已完成, 3=失败)
- md_url: Markdown文件URL
- json_url: JSON文件URL
- error_message: 错误信息
- """
- conn = get_db_connection()
- if not conn:
- return False
-
- cursor = conn.cursor()
-
- try:
- update_clauses = ["conversion_status = %s"]
- params = [status]
- if error_message:
- update_clauses.append("conversion_error = %s")
- params.append(error_message)
-
- if md_url:
- update_clauses.append("md_url = %s")
- params.append(md_url)
-
- if json_url:
- update_clauses.append("json_url = %s")
- params.append(json_url)
-
- sql = f"UPDATE t_samp_document_main SET {', '.join(update_clauses)}, updated_time = NOW() WHERE id = %s"
- params.append(doc_id)
-
- cursor.execute(sql, tuple(params))
- conn.commit()
- return True
- except Exception as e:
- logger.exception(f"更新转换进度失败: {e}")
- conn.rollback()
- return False
- finally:
- cursor.close()
- conn.close()
- # ==================== 基础信息管理 ====================
- async def add_basic_info(self, type: str, data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]:
- """新增基本信息"""
- logger.info(f"Adding basic info for type {type}: {data}")
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型", None
-
- doc_id = str(uuid.uuid4())
- file_url = data.get('file_url')
- file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
-
- # 处理文档类型 (中文 -> 简写)
- doc_type_cn = data.get('document_type')
- doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn)
-
- # 处理专业领域 (中文 -> 简写)
- prof_field_cn = data.get('professional_field')
- prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
- # 处理时效性 (中文 -> 简写)
- validity_cn = data.get('validity')
- validity_code = VALIDITY_MAP.get(validity_cn, validity_cn)
- # 处理方案类别 (中文 -> 简写)
- plan_category_cn = data.get('plan_category')
- plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
- # 处理一级分类 (中文 -> 简写)
- level_1_cn = data.get('level_1_classification')
- level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
- # 处理二级分类 (中文 -> 简写)
- level_2_cn = data.get('level_2_classification')
- level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
- # 处理三级分类 (中文 -> 简写)
- level_3_cn = data.get('level_3_classification')
- level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 1. 插入主表 (解耦触发器,手动同步)
- cursor.execute(
- """
- INSERT INTO t_samp_document_main (
- id, title, source_type, file_url,
- file_extension, created_by, updated_by, created_time, updated_time,
- conversion_status, whether_to_task, kb_id
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s)
- """,
- (
- doc_id, data.get('title'), type, file_url,
- file_extension, user_id, user_id, data.get('kb_id')
- )
- )
-
- # 2. 插入子表 (移除 file_url,因为它现在只存储 in 主表中)
- if type == 'standard':
- sql = f"""
- INSERT INTO {table_name} (
- id, chinese_name, english_name, standard_number, issuing_authority,
- release_date, implementation_date, drafting_unit, approving_department,
- participating_units,
- document_type, professional_field, engineering_phase,
- validity, reference_basis, source_url, note,
- created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """
- params = (
- doc_id, data.get('title'), data.get('english_name'), data.get('standard_no'),
- data.get('issuing_authority'), self._to_date(data.get('release_date')), self._to_date(data.get('implementation_date')),
- data.get('drafting_unit'), data.get('approving_department'),
- data.get('participating_units'),
- doc_type_code, prof_field_code, data.get('engineering_phase'),
- validity_code or 'XH', data.get('reference_basis'), data.get('source_url'), data.get('note'),
- user_id, user_id
- )
- elif type == 'construction_plan':
- sql = f"""
- INSERT INTO {table_name} (
- id, plan_name, project_name, project_section, compiling_unit,
- compiling_date, plan_summary, compilation_basis, plan_category,
- level_1_classification, level_2_classification, level_3_classification, level_4_classification,
- note, created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """
- params = (
- doc_id, data.get('title'), data.get('project_name'), data.get('project_section'),
- data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('plan_summary'),
- data.get('compilation_basis'), plan_category_code, level_1_code or 'SC',
- level_2_code, level_3_code, level_4_code,
- data.get('note'), user_id, user_id
- )
- elif type == 'regulation':
- sql = f"""
- INSERT INTO {table_name} (
- id, file_name, issuing_department, document_type, publish_date,
- effective_start_date, effective_end_date, note,
- created_by, updated_by, created_time, updated_time
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """
- params = (
- doc_id, data.get('title'), data.get('issuing_authority'), doc_type_code,
- self._to_date(data.get('release_date')), self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')),
- data.get('note'), user_id, user_id
- )
- else:
- return False, "不支持的类型", None
-
- cursor.execute(sql, params)
-
- conn.commit()
- return True, "新增成功", doc_id
- except Exception as e:
- logger.exception("新增基本信息失败")
- conn.rollback()
- return False, str(e), None
- finally:
- cursor.close()
- conn.close()
- async def edit_basic_info(self, type: str, doc_id: str, data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
- """编辑基本信息"""
- logger.info(f"Editing basic info for type {type}, id {doc_id}: {data}")
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型"
-
- # 处理 URL 存储(转为相对路径)
- file_url = self.minio_manager.get_relative_path(data.get('file_url'))
- file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
- # 处理文档类型 (中文 -> 简写)
- doc_type_cn = data.get('document_type')
- doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn)
- # 处理专业领域 (中文 -> 简写)
- prof_field_cn = data.get('professional_field')
- prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
- # 处理时效性 (中文 -> 简写)
- validity_cn = data.get('validity')
- validity_code = VALIDITY_MAP.get(validity_cn, validity_cn)
- # 处理方案类别 (中文 -> 简写)
- plan_category_cn = data.get('plan_category')
- plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
- # 处理一级分类 (中文 -> 简写)
- level_1_cn = data.get('level_1_classification')
- level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn)
- # 处理二级分类 (中文 -> 简写)
- level_2_cn = data.get('level_2_classification')
- level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn)
- # 处理三级分类 (中文 -> 简写)
- level_3_cn = data.get('level_3_classification')
- level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn)
- # 处理四级分类 (中文 -> 简写)
- level_4_cn = data.get('level_4_classification')
- level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
- # 1. 更新主表 (解耦触发器)
- cursor.execute(
- """
- UPDATE t_samp_document_main
- SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW(), kb_id = %s
- WHERE id = %s
- """,
- (data.get('title'), file_url, file_extension, updater_id, data.get('kb_id'), doc_id)
- )
- # 2. 更新子表 (移除 file_url)
- if type == 'standard':
- sql = f"""
- UPDATE {table_name}
- SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s,
- document_type = %s, professional_field = %s, validity = %s,
- english_name = %s, implementation_date = %s, drafting_unit = %s,
- approving_department = %s, engineering_phase = %s,
- participating_units = %s,
- reference_basis = %s,
- source_url = %s, note = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
- params = (
- data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
- doc_type_code, prof_field_code, validity_code,
- data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'),
- data.get('approving_department'), data.get('engineering_phase'),
- data.get('participating_units'),
- data.get('reference_basis'),
- data.get('source_url'), data.get('note'),
- updater_id, doc_id
- )
- elif type == 'construction_plan':
- sql = f"""
- UPDATE {table_name}
- SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s,
- compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s,
- level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s,
- note = %s, updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
- params = (
- data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'),
- self._to_date(data.get('release_date')), data.get('plan_summary'), data.get('compilation_basis'), plan_category_code,
- level_1_code, level_2_code, level_3_code, level_4_code,
- data.get('note'), updater_id, doc_id
- )
-
- elif type == 'regulation':
- sql = f"""
- UPDATE {table_name}
- SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s,
- effective_start_date = %s, effective_end_date = %s, note = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
- params = (
- data.get('title'), data.get('issuing_authority'), doc_type_code, self._to_date(data.get('release_date')),
- self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')), data.get('note'),
- updater_id, doc_id
- )
- else:
- return False, "不支持的类型"
-
- cursor.execute(sql, params)
-
- conn.commit()
- return True, "编辑成功"
- except Exception as e:
- logger.exception("编辑基本信息失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
- async def delete_basic_info(self, type: str, doc_id: str) -> Tuple[bool, str]:
- """删除基本信息"""
- if not doc_id:
- return False, "缺少 ID 参数"
-
- logger.info(f"Deleting basic info: type={type}, id={doc_id}")
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型"
-
- # 1. 显式删除子表记录 (防止 CASCADE 未生效)
- try:
- cursor.execute(f"DELETE FROM {table_name} WHERE id = %s", (doc_id,))
- logger.info(f"Deleted from sub-table {table_name}, affected: {cursor.rowcount}")
- except Exception as sub_e:
- logger.warning(f"删除子表 {table_name} 记录失败 (可能不存在): {sub_e}")
- # 2. 同步删除任务管理中心的数据 (优先删除关联数据)
- try:
- # 使用当前事务删除任务记录(如果 task_service 支持的话,目前它自建连接)
- # 这里我们直接在当前 cursor 中也执行一次,确保事务一致性
- cursor.execute("DELETE FROM t_task_management WHERE business_id = %s", (doc_id,))
- logger.info(f"Deleted from t_task_management, affected: {cursor.rowcount}")
- except Exception as task_e:
- logger.warning(f"在主事务中删除任务记录失败: {task_e}")
- # 3. 删除主表记录
- cursor.execute("DELETE FROM t_samp_document_main WHERE id = %s", (doc_id,))
- affected_main = cursor.rowcount
- logger.info(f"Deleted from t_samp_document_main, affected: {affected_main}")
-
- if affected_main == 0:
- logger.warning(f"未找到主表记录: {doc_id}")
- # 即使主表没找到,我们也 commit 之前的操作并返回成功(幂等性)
-
- conn.commit()
-
- # 4. 再次确保任务中心数据已删除 (调用原有服务)
- try:
- await task_service.delete_task(doc_id)
- except Exception as task_err:
- logger.error(f"调用 task_service 删除任务失败: {task_err}")
- return True, "删除成功"
- except Exception as e:
- logger.exception(f"删除基本信息异常 (ID: {doc_id})")
- conn.rollback()
- return False, f"删除失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
|