""" 样本中心服务层 从 sample_view.py 提取的SQL查询逻辑 """ import logging import uuid from datetime import datetime from typing import Optional, List, Dict, Any, Tuple from app.base.async_mysql_connection import get_db_connection from app.base.minio_connection import get_minio_manager from app.core.config import config_handler from app.services.milvus_service import milvus_service from app.services.task_service import task_service logger = logging.getLogger(__name__) # 表名映射 TABLE_MAP = { 'standard': 't_samp_standard_base_info', 'construction_plan': 't_samp_construction_plan_base_info', 'regulation': 't_samp_office_regulations' } # 文档类型映射 (前端显示 -> 数据库存储) DOCUMENT_TYPE_MAP = { "国家标准": "GB", "行业标准": "HY", "部门规章": "BM", "地方标准": "DB", "企业标准": "QY", "管理制度": "GL", "技术规范": "GF", "团体标准": "TT", "国际标准": "GJ", "国家法律": "FL", "地方法规": "LR", "其他": "QT" } # 文档类型反向映射 (数据库存储 -> 前端显示) # 包含旧版本简写以保证存量数据兼容性 DOCUMENT_TYPE_REVERSE_MAP = {v: k for k, v in DOCUMENT_TYPE_MAP.items()} DOCUMENT_TYPE_REVERSE_MAP.update({ "HB": "行业标准", "QB": "企业标准", "GLZD": "管理制度", "JSGF": "技术规范", "TB": "团体标准", "IB": "国际标准", "OTHER": "其他" }) # 专业领域映射 (前端显示 -> 数据库存储) PROFESSIONAL_FIELD_MAP = { "法律法规": "FL", "通用标准": "TY", "勘察钻探": "KC", "地基基础": "DJ", "路基路面": "LJ", "桥梁工程": "QL", "隧道工程": "SD", "交通工程": "JT", "建筑工程": "JZ", "市政工程": "SZ", "机电安装": "JD", "路桥工程": "LB", "装饰装修": "ZS", "港口航道": "GK", "铁路工程": "TL", "房建工程": "FJ", "水利电力": "SL", "信息化": "XX", "试验检测": "SY", "安全环保": "AQ", "其他": "QT" } # 专业领域反向映射 (数据库存储 -> 前端显示) PROFESSIONAL_FIELD_REVERSE_MAP = {v: k for k, v in PROFESSIONAL_FIELD_MAP.items()} # 方案类别映射 (前端显示 -> 数据库存储) PLAN_CATEGORY_MAP = { "超危大方案": "CH", "超危大方案较大Ⅱ级": "CH2", "超危大方案特大Ⅳ级": "CH4", "超危大方案一般Ⅰ级": "CH1", "超危大方案重大Ⅲ级": "CH3", "危大方案": "WD", "一般方案": "YB" } # 方案类别反向映射 (数据库存储 -> 前端显示) PLAN_CATEGORY_REVERSE_MAP = {v: k for k, v in PLAN_CATEGORY_MAP.items()} # 一级分类映射 (前端显示 -> 数据库存储) FIRST_LEVEL_MAP = { "施工方案": "SC" } # 一级分类反向映射 (数据库存储 -> 前端显示) FIRST_LEVEL_REVERSE_MAP = {v: k for k, v in FIRST_LEVEL_MAP.items()} # 二级分类映射 (前端显示 -> 数据库存储) SECOND_LEVEL_MAP = { "临建工程": "LZ", "路基工程": "LJ", "桥梁工程": "QL", "隧道工程": "SD", "其他": "QT" } # 二级分类反向映射 (数据库存储 -> 前端显示) SECOND_LEVEL_REVERSE_MAP = {v: k for k, v in SECOND_LEVEL_MAP.items()} # 三级分类映射 (前端显示 -> 数据库存储) THIRD_LEVEL_MAP = { "TBM施工": "TM", "拌和站安、拆施工": "BH", "不良地质隧道施工": "BL", "常规桥梁": "CG", "挡土墙工程类": "DT", "辅助坑道施工": "FB", "复杂洞口工程施工": "FD", "钢筋加工场安、拆": "GG", "钢栈桥施工": "GZ", "拱桥": "GH", "涵洞工程类": "HD", "滑坡体处理类": "HP", "路堤": "LT", "路堑": "LQ", "其他": "QT", "深基坑": "JK", "隧道总体施工": "ZT", "特殊结构隧道": "TS", "斜拉桥": "XL", "悬索桥": "XS" } # 三级分类反向映射 (数据库存储 -> 前端显示) THIRD_LEVEL_REVERSE_MAP = {v: k for k, v in THIRD_LEVEL_MAP.items()} # 四级分类映射 (前端显示 -> 数据库存储) FOURTH_LEVEL_MAP = { "挡土墙": "DT", "顶管": "DG", "断层破碎带及软弱围岩": "DL", "钢筋砼箱涵": "GX", "高填路堤": "GT", "抗滑桩": "KH", "软岩大变形隧道": "RY", "上部结构": "SB", "深基坑开挖与支护": "JK", "深挖路堑": "LC", "隧道TBM": "TM", "隧道进洞": "JD", "隧道竖井": "SJ", "隧道斜井": "XJ", "特种设备": "TZ", "瓦斯隧道": "WS", "下部结构": "XB", "小净距隧道": "NJ", "岩爆隧道": "YB", "岩溶隧道": "YR", "涌水突泥隧道": "YN", "桩基础": "ZJ", "其他": "QT" } # 四级分类反向映射 (数据库存储 -> 前端显示) FOURTH_LEVEL_REVERSE_MAP = {v: k for k, v in FOURTH_LEVEL_MAP.items()} def get_table_name(source_type: str) -> Optional[str]: """根据source_type获取表名""" return TABLE_MAP.get(source_type) class SampleService: """样本中心服务类 - 使用 SQL 查询方式""" _initialized = False def __init__(self): """初始化服务""" # 使用统一的 MinIO 管理器 self.minio_manager = get_minio_manager() # 使用全局 Milvus 服务 self.milvus_service = milvus_service # 确保集合已创建 (仅在首次初始化时执行) if not SampleService._initialized: try: self.milvus_service.ensure_collections() SampleService._initialized = True except Exception as e: logger.error(f"初始化 Milvus 集合失败: {e}") def _format_document_row(self, item: Dict[str, Any]) -> Dict[str, Any]: """格式化文档行数据,处理URL、日期映射和文件名显示""" if not item: return item # 1. 处理 URL 转换 for key in ['file_url', 'md_url', 'json_url']: if item.get(key): item[key] = self.minio_manager.get_full_url(item[key]) # 2. 映射字段以适配前端通用显示 source_type = item.get('source_type') # 处理文档类型显示 (简写 -> 中文) doc_type_code = item.get('document_type') if doc_type_code in DOCUMENT_TYPE_REVERSE_MAP: item['document_type'] = DOCUMENT_TYPE_REVERSE_MAP[doc_type_code] # 处理专业领域显示 (简写 -> 中文) prof_field_code = item.get('professional_field') if prof_field_code in PROFESSIONAL_FIELD_REVERSE_MAP: item['professional_field'] = PROFESSIONAL_FIELD_REVERSE_MAP[prof_field_code] # 处理一级分类显示 (简写 -> 中文) level_1_code = item.get('level_1_classification') if level_1_code in FIRST_LEVEL_REVERSE_MAP: item['level_1_classification'] = FIRST_LEVEL_REVERSE_MAP[level_1_code] # 处理二级分类显示 (简写 -> 中文) level_2_code = item.get('level_2_classification') if level_2_code in SECOND_LEVEL_REVERSE_MAP: item['level_2_classification'] = SECOND_LEVEL_REVERSE_MAP[level_2_code] # 处理三级分类显示 (简写 -> 中文) level_3_code = item.get('level_3_classification') if level_3_code in THIRD_LEVEL_REVERSE_MAP: item['level_3_classification'] = THIRD_LEVEL_REVERSE_MAP[level_3_code] # 处理四级分类显示 (简写 -> 中文) level_4_code = item.get('level_4_classification') if level_4_code in FOURTH_LEVEL_REVERSE_MAP: item['level_4_classification'] = FOURTH_LEVEL_REVERSE_MAP[level_4_code] # 处理方案类别显示 (简写 -> 中文) plan_category_code = item.get('plan_category') if plan_category_code in PLAN_CATEGORY_REVERSE_MAP: item['plan_category'] = PLAN_CATEGORY_REVERSE_MAP[plan_category_code] if source_type == 'standard': # 标准规范特有映射 if 'standard_number' in item: item['standard_no'] = item.get('standard_number') elif source_type == 'construction_plan': # 施工方案特有映射 item['issuing_authority'] = item.get('compiling_unit') item['release_date'] = item.get('compiling_date') elif source_type == 'regulation': # 办公制度特有映射 item['issuing_authority'] = item.get('issuing_department') item['release_date'] = item.get('publish_date') # 3. 格式化时间 date_keys = [ 'created_time', 'updated_time', 'release_date', 'publish_date', 'compiling_date', 'implementation_date', 'effective_start_date', 'effective_end_date' ] for key in date_keys: val = item.get(key) if val and hasattr(val, 'isoformat'): item[key] = val.isoformat() elif val is not None: item[key] = str(val) # 4. 增加格式化文件名供前端显示 if item.get('conversion_status') == 2: title = item.get('title', 'document') item['md_display_name'] = f"{title}.md" item['json_display_name'] = f"{title}.json" return item async def get_upload_url(self, filename: str, content_type: str, prefix: str = None) -> Tuple[bool, str, Dict[str, Any]]: """获取 MinIO 预签名上传 URL""" try: data = self.minio_manager.get_upload_url(filename, content_type, prefix=prefix) return True, "成功获取上传链接", data except Exception as e: logger.exception("生成上传链接失败") return False, f"生成上传链接失败: {str(e)}", {} # ==================== 文档管理 ==================== async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str, kb_method: str = "general", chunk_size: int = 500, separator: str = "。") -> Tuple[int, str]: """批量将文档入库到知识库 Args: doc_ids: 文档ID列表 username: 操作人 kb_method: 切分方法 chunk_size: 切分长度 separator: 切分符号 """ conn = get_db_connection() if not conn: return 0, "数据库连接失败,请检查数据库服务状态" cursor = conn.cursor() success_count = 0 already_entered_count = 0 failed_count = 0 error_details = [] try: # 1. 获取所有选中选中的文档详情 placeholders = ','.join(['%s']*len(doc_ids)) fetch_sql = f""" SELECT id, title, source_type, md_url, conversion_status, whether_to_enter, created_time, kb_id, file_url FROM t_samp_document_main WHERE id IN ({placeholders}) """ cursor.execute(fetch_sql, tuple(doc_ids)) selected_docs = cursor.fetchall() if not selected_docs: return 0, "选中的文档在数据库中不存在" # 2. 逐份处理 for doc in selected_docs: doc_id = doc['id'] title = doc.get('title', '未命名文档') status = doc.get('conversion_status') whether_to_enter = doc.get('whether_to_enter', 0) md_url = doc.get('md_url') source_type = doc.get('source_type') # A. 检查是否已入库 if whether_to_enter == 1: logger.info(f"文档 {title}({doc_id}) 已入库,跳过二次入库") already_entered_count += 1 error_details.append(f"· {title}: 已在知识库中,无需重复入库") continue # B. 检查转换状态 if status != 2: reason = "尚未转换成功" if status == 0 else "正在转换中" if status == 1 else "转换失败" logger.warning(f"文档 {title}({doc_id}) 状态为 {status},入库失败: {reason}") failed_count += 1 error_details.append(f"· {title}: {reason}") continue if not md_url: logger.warning(f"文档 {title}({doc_id}) 缺少 md_url,入库失败") failed_count += 1 error_details.append(f"· {title}: 转换结果地址丢失") continue # C. 确定入库策略 (从数据库读取已绑定的知识库) current_kb_id = doc.get('kb_id') current_kb_method = kb_method # 直接使用前端传来的切分方式 if not current_kb_id: logger.warning(f"文档 {title}({doc_id}) 未指定知识库,跳过入库") failed_count += 1 error_details.append(f"· {title}: 未指定目标知识库") continue if not current_kb_method: logger.warning(f"文档 {title}({doc_id}) 未指定切分方式,跳过入库") failed_count += 1 error_details.append(f"· {title}: 未指定切分策略") continue # 获取知识库信息 (collection_name_parent, collection_name_children) kb_sql = "SELECT collection_name_parent, collection_name_children FROM t_samp_knowledge_base WHERE id = %s AND is_deleted = 0" cursor.execute(kb_sql, (current_kb_id,)) kb_res = cursor.fetchone() if not kb_res: logger.warning(f"找不到指定的知识库: id={current_kb_id}") failed_count += 1 error_details.append(f"· {title}: 指定的知识库不存在或已被删除") continue collection_name_parent = kb_res['collection_name_parent'] collection_name_children = kb_res['collection_name_children'] # D. 从 MinIO 获取 Markdown 内容 try: md_content = self.minio_manager.get_object_content(md_url) if not md_content: raise ValueError(f"无法从 MinIO 读取内容 (URL: {md_url})") except Exception as minio_err: logger.error(f"读取文档 {title} 内容失败: {minio_err}") failed_count += 1 error_details.append(f"· {title}: 读取云端文件失败") continue # E. 调用 MilvusService 进行切分和入库 try: # 获取业务元数据 business_metadata = {} if source_type in ['standard', 'basis']: std_sql = """ SELECT id as basic_info_id, chinese_name, standard_number, issuing_authority, document_type, professional_field, validity FROM t_samp_standard_base_info WHERE id = %s """ cursor.execute(std_sql, (doc_id,)) business_metadata = cursor.fetchone() or {} elif source_type == 'construction_plan': plan_sql = """ SELECT id as basic_info_id, plan_name, project_name, project_section, compiling_unit, compiling_date, plan_summary, plan_category, level_1_classification, level_2_classification, level_3_classification, level_4_classification FROM t_samp_construction_plan_base_info WHERE id = %s """ cursor.execute(plan_sql, (doc_id,)) business_metadata = cursor.fetchone() or {} elif source_type == 'regulation': reg_sql = """ SELECT id as basic_info_id, file_name, issuing_department as issuing_authority, document_type, publish_date as release_date, note FROM t_samp_office_regulations WHERE id = %s """ cursor.execute(reg_sql, (doc_id,)) business_metadata = cursor.fetchone() or {} # 强制转换 document_type 为简写 (处理存量数据或异常情况) if 'document_type' in business_metadata and business_metadata['document_type']: doc_type = business_metadata['document_type'] business_metadata['document_type'] = DOCUMENT_TYPE_MAP.get(doc_type, doc_type) # 强制转换 professional_field 为简写 if 'professional_field' in business_metadata and business_metadata['professional_field']: prof_field = business_metadata['professional_field'] business_metadata['professional_field'] = PROFESSIONAL_FIELD_MAP.get(prof_field, prof_field) # 强制转换 plan_category 为简写 if 'plan_category' in business_metadata and business_metadata['plan_category']: plan_cat = business_metadata['plan_category'] business_metadata['plan_category'] = PLAN_CATEGORY_MAP.get(plan_cat, plan_cat) # 强制转换 level_1_classification 为简写 if 'level_1_classification' in business_metadata and business_metadata['level_1_classification']: l1_class = business_metadata['level_1_classification'] business_metadata['level_1_classification'] = FIRST_LEVEL_MAP.get(l1_class, l1_class) # 强制转换 level_2_classification 为简写 if 'level_2_classification' in business_metadata and business_metadata['level_2_classification']: l2_class = business_metadata['level_2_classification'] business_metadata['level_2_classification'] = SECOND_LEVEL_MAP.get(l2_class, l2_class) # 强制转换 level_3_classification 为简写 if 'level_3_classification' in business_metadata and business_metadata['level_3_classification']: l3_class = business_metadata['level_3_classification'] business_metadata['level_3_classification'] = THIRD_LEVEL_MAP.get(l3_class, l3_class) # 强制转换 level_4_classification 为简写 if 'level_4_classification' in business_metadata and business_metadata['level_4_classification']: l4_class = business_metadata['level_4_classification'] business_metadata['level_4_classification'] = FOURTH_LEVEL_MAP.get(l4_class, l4_class) # 统一使用主表的 file_url,确保数据来源一致,入库时使用完整 URL raw_file_url = doc.get('file_url', '') business_metadata['file_url'] = self.minio_manager.get_full_url(raw_file_url) if raw_file_url else '' # 准备元数据 current_date = int(datetime.now().strftime('%Y%m%d')) doc_info = { "doc_id": doc_id, "file_name": title, "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else current_date, "tags": "", "user_id": username, # 传递操作人作为 created_by "kb_id": current_kb_id, "kb_method": current_kb_method, "collection_name_parent": collection_name_parent, "collection_name_children": collection_name_children, "chunk_size": chunk_size, "separator": separator, "source_type": source_type, "business_metadata": business_metadata # 注入业务元数据 } await self.milvus_service.insert_knowledge(md_content, doc_info) # F. 添加到任务管理中心 (类型为 data) try: await task_service.add_task(doc_id, 'data') except Exception as task_err: logger.error(f"添加文档 {title} 到任务中心失败: {task_err}") # G. 更新数据库状态 update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 1, kb_id = %s, kb_method = %s, updated_by = %s, updated_time = NOW() WHERE id = %s" cursor.execute(update_sql, (current_kb_id, current_kb_method, username, doc_id)) success_count += 1 except Exception as milvus_err: logger.exception(f"文档 {title} 写入向量库失败") failed_count += 1 error_details.append(f"· {title}: 写入向量库失败 ({str(milvus_err)})") continue conn.commit() # 构造详细的消息 if success_count == len(doc_ids) and failed_count == 0 and already_entered_count == 0: msg = f"✅ 入库成功!共处理 {success_count} 份文档。" else: msg = f"📊 入库处理完成:\n· 成功:{success_count} 份\n" if already_entered_count > 0: msg += f"· 跳过:{already_entered_count} 份 (已入库)\n" if failed_count > 0: msg += f"· 失败:{failed_count} 份\n" if error_details: detailed_msg = msg + "\n⚠️ 详情说明:\n" + "\n".join(error_details) return success_count, detailed_msg return success_count, msg except Exception as e: logger.exception(f"文档批量入库异常: {e}") conn.rollback() return 0, f"操作异常: {str(e)}" finally: cursor.close() conn.close() async def batch_add_to_task(self, doc_ids: List[str], username: str, project_name: str, task_tags: Optional[List[str]] = None) -> Tuple[bool, str]: """批量将文档加入标注任务中心 (单表化) Args: doc_ids: 文档ID列表 username: 操作人 project_name: 项目名称 (作为 project_id) task_tags: 标注任务标签列表 (例如 ["标签1", "标签2"]) """ conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: if not doc_ids: return False, "未指定要加入任务的文档 ID" # 0. 每次点击都生成一个新的项目 UUID (不再检查重名复用) import uuid project_id = str(uuid.uuid4()) # 处理标签:转换为 JSON 字符串存储 import json tag_str = json.dumps(task_tags, ensure_ascii=False) if task_tags else None # 1. 过滤掉未入库的文档 placeholders = ', '.join(['%s'] * len(doc_ids)) check_entered_sql = f"SELECT id FROM t_samp_document_main WHERE id IN ({placeholders}) AND whether_to_enter = 1" cursor.execute(check_entered_sql, doc_ids) entered_ids = [row['id'] for row in cursor.fetchall()] unentered_count = len(doc_ids) - len(entered_ids) if not entered_ids: return False, "所选文档均未入库,无法加入标注任务中心" # 2. 更新 whether_to_task 状态 ids_to_add = entered_ids add_placeholders = ', '.join(['%s'] * len(ids_to_add)) sql = f"UPDATE t_samp_document_main SET whether_to_task = 1, updated_by = %s, updated_time = NOW() WHERE id IN ({add_placeholders})" cursor.execute(sql, (username, *ids_to_add)) # 3. 写入任务管理表 (单表逻辑) for doc_id in ids_to_add: try: # 获取业务元数据 metadata_dict = {} try: # 定义需要过滤掉的非业务/内部状态字段 EXCLUDE_FIELDS = { 'id', 'created_time', 'updated_time', 'created_by', 'updated_by', 'conversion_status', 'whether_to_enter', 'whether_to_task', 'kb_method', 'whether_to_delete' } # 查询主表和子表信息 cursor.execute("SELECT * FROM t_samp_document_main WHERE id = %s", (doc_id,)) doc_main = cursor.fetchone() if doc_main: # 基础元数据 (仅保留标题和来源类型等核心信息) for k, v in doc_main.items(): if v is not None and v != '' and k not in EXCLUDE_FIELDS: metadata_dict[k] = v # 子表元数据 source_type = doc_main.get('source_type') table_name = TABLE_MAP.get(source_type) if table_name: cursor.execute(f"SELECT * FROM {table_name} WHERE id = %s", (doc_id,)) sub_data = cursor.fetchone() if sub_data: for k, v in sub_data.items(): if v is not None and v != '' and k not in EXCLUDE_FIELDS: metadata_dict[k] = v # 递归格式化时间 metadata_dict = task_service._serialize_datetime(metadata_dict) except Exception as meta_err: logger.warning(f"获取文档 {doc_id} 元数据失败: {meta_err}") await task_service.add_task( business_id=doc_id, task_type='data', project_id=project_id, project_name=project_name, tag=tag_str, metadata=json.dumps(metadata_dict, ensure_ascii=False) if metadata_dict else None ) except Exception as e: logger.exception(f"添加文档 {doc_id} 到任务中心失败: {e}") conn.commit() # 4. 自动推送至外部标注平台 push_success, push_msg = await task_service.send_to_external_platform(project_id) msg = f"成功将 {len(ids_to_add)} 份文档加入项目: {project_name}" if push_success: msg += f" (已推送: {push_msg})" else: msg += f" (推送失败: {push_msg})" if unentered_count > 0: msg += f",{unentered_count} 份文档因未入库被跳过" return True, msg except Exception as e: logger.exception("批量加入任务失败") if conn: conn.rollback() return False, f"操作失败: {str(e)}" finally: if cursor: cursor.close() if conn: conn.close() async def clear_knowledge_base_data(self, doc_ids: List[str], username: str) -> Tuple[int, str]: """批量清空文档在知识库(Milvus)中的数据片段""" conn = get_db_connection() if not conn: return 0, "数据库连接失败" cursor = conn.cursor() success_count = 0 error_details = [] try: # 1. 获取文档的知识库信息 placeholders = ', '.join(['%s'] * len(doc_ids)) sql = f""" SELECT id, title, kb_id, whether_to_enter FROM t_samp_document_main WHERE id IN ({placeholders}) """ cursor.execute(sql, doc_ids) docs = cursor.fetchall() for doc in docs: doc_id = doc['id'] title = doc.get('title', '未命名') kb_id = doc.get('kb_id') # 只有入库了的才需要清空 if doc.get('whether_to_enter') != 1: continue if not kb_id: error_details.append(f"· {title}: 未关联知识库,无法清空") continue # 获取集合名称 cursor.execute("SELECT collection_name_parent, collection_name_children FROM t_samp_knowledge_base WHERE id = %s", (kb_id,)) kb_info = cursor.fetchone() if not kb_info: error_details.append(f"· {title}: 找不到关联的知识库配置") continue # 2. 从 Milvus 删除 try: collections = [kb_info.get('collection_name_parent'), kb_info.get('collection_name_children')] for coll_name in collections: if coll_name and self.milvus_service.client.has_collection(coll_name): # 使用 document_id 进行过滤删除 self.milvus_service.client.delete( collection_name=coll_name, filter=f'document_id == "{doc_id}"' ) # 3. 更新数据库状态 update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 0, updated_by = %s, updated_time = NOW() WHERE id = %s" cursor.execute(update_sql, (username, doc_id)) success_count += 1 except Exception as milvus_err: logger.error(f"清空文档 {title} Milvus 数据失败: {milvus_err}") error_details.append(f"· {title}: 清空向量库数据失败") conn.commit() msg = f"成功清空 {success_count} 份文档的知识库片段" if error_details: msg += "\n部分操作受限:\n" + "\n".join(error_details) return success_count, msg except Exception as e: logger.exception("批量清空知识库数据失败") conn.rollback() return 0, f"操作失败: {str(e)}" finally: cursor.close() conn.close() async def batch_delete_documents(self, doc_ids: List[str]) -> Tuple[int, str]: """批量删除文档 (仅限未入库或已清空的文档)""" conn = get_db_connection() if not conn: return 0, "数据库连接失败" cursor = conn.cursor() try: if not doc_ids: return 0, "未指定要删除的文档 ID" # 1. 检查文档状态:已入库的文档不允许直接删除 placeholders = ', '.join(['%s'] * len(doc_ids)) check_sql = f"SELECT id, title FROM t_samp_document_main WHERE id IN ({placeholders}) AND whether_to_enter = 1" cursor.execute(check_sql, doc_ids) entered_docs = cursor.fetchall() if entered_docs: titles = [d['title'] for d in entered_docs] return 0, f"删除失败:以下文档已入库,请先执行‘清空数据’操作后再删除:\n{', '.join(titles[:5])}{'...' if len(titles)>5 else ''}" # 2. 执行物理删除 # 尝试同步删除子表中的数据 try: for sub_table in TABLE_MAP.values(): sub_sql = f"DELETE FROM {sub_table} WHERE id IN ({placeholders})" try: cursor.execute(sub_sql, doc_ids) except Exception as sub_e: logger.error(f"删除子表 {sub_table} 数据失败: {sub_e}") except Exception as sync_e: logger.error(f"同步删除子表数据失败: {sync_e}") # 删除主表 t_samp_document_main 中的数据 sql_main = f"DELETE FROM t_samp_document_main WHERE id IN ({placeholders})" cursor.execute(sql_main, doc_ids) affected_rows = cursor.rowcount # 同步删除任务管理中心的数据 try: from app.services.task_service import task_service for doc_id in doc_ids: # 注意:task_service 的删除方法可能叫 delete_task_by_business_id 或类似,这里假设存在 # 如果没有,我们需要查出主键 ID 再删 cursor.execute("DELETE FROM t_task_management WHERE business_id = %s", (doc_id,)) except Exception as task_err: logger.error(f"同步删除任务中心数据失败: {task_err}") conn.commit() return affected_rows, f"成功删除 {affected_rows} 条文档数据" except Exception as e: logger.exception("批量删除失败") conn.rollback() return 0, f"批量删除失败: {str(e)}" finally: cursor.close() conn.close() async def get_tag_tree(self) -> List[Dict[str, Any]]: """获取标签层级树 (从 t_samp_tag_category 查询)""" conn = get_db_connection() if not conn: return [] cursor = conn.cursor() try: sql = """ SELECT id, parent_id, name, level, type FROM t_samp_tag_category WHERE is_deleted = 0 AND status = 1 ORDER BY level ASC, sort_no ASC """ cursor.execute(sql) tags = cursor.fetchall() # 构建树形结构 tag_dict = {tag['id']: {**tag, 'children': []} for tag in tags} tree = [] for tag_id, tag_item in tag_dict.items(): parent_id = tag_item['parent_id'] if parent_id == 0: tree.append(tag_item) elif parent_id in tag_dict: tag_dict[parent_id]['children'].append(tag_item) return tree except Exception as e: logger.error(f"获取标签树失败: {e}") return [] finally: cursor.close() conn.close() async def get_document_list( self, whether_to_enter: Optional[int] = None, conversion_status: Optional[int] = None, keyword: Optional[str] = None, table_type: Optional[str] = None, plan_category: Optional[str] = None, level_2_classification: Optional[str] = None, level_3_classification: Optional[str] = None, level_4_classification: Optional[str] = None, page: int = 1, size: int = 50 ) -> Tuple[List[Dict[str, Any]], int, int, int]: """获取文档列表 (支持关联查询子表)""" conn = get_db_connection() if not conn: return [], 0, 0, 0 cursor = conn.cursor() try: where_clauses = [] params = [] # 基础查询 if table_type: where_clauses.append("m.source_type = %s") params.append(table_type) if table_type in TABLE_MAP: # 如果指定了类型且在 TABLE_MAP 中,使用 LEFT JOIN 关联查询,以便搜索子表字段 sub_table = TABLE_MAP[table_type] from_sql = f""" t_samp_document_main m LEFT JOIN {sub_table} s ON m.id = s.id LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id """ fields_sql = "m.*, s.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name, m.id as id" # 施工方案特有的过滤字段 if table_type == 'construction_plan': if plan_category: where_clauses.append("s.plan_category = %s") params.append(plan_category) if level_2_classification: where_clauses.append("s.level_2_classification = %s") params.append(SECOND_LEVEL_MAP.get(level_2_classification, level_2_classification)) if level_3_classification: where_clauses.append("s.level_3_classification = %s") params.append(THIRD_LEVEL_MAP.get(level_3_classification, level_3_classification)) if level_4_classification: where_clauses.append("s.level_4_classification = %s") params.append(FOURTH_LEVEL_MAP.get(level_4_classification, level_4_classification)) else: from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id" fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name" else: from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id" fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name" order_sql = "m.created_time DESC" title_field = "m.title" # 分离 whether_to_enter 与 conversion_status 的过滤逻辑 if whether_to_enter is not None: where_clauses.append("m.whether_to_enter = %s") params.append(whether_to_enter) if conversion_status is not None: where_clauses.append("m.conversion_status = %s") params.append(conversion_status) if keyword: where_clauses.append(f"{title_field} LIKE %s") params.append(f"%{keyword}%") where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" offset = (page - 1) * size sql = f"SELECT {fields_sql} FROM {from_sql} {where_sql} ORDER BY {order_sql} LIMIT %s OFFSET %s" params.extend([size, offset]) cursor.execute(sql, tuple(params)) items = [self._format_document_row(row) for row in cursor.fetchall()] # 总数 count_sql = f"SELECT COUNT(*) as count FROM {from_sql} {where_sql}" cursor.execute(count_sql, tuple(params[:-2])) res = cursor.fetchone() total = res['count'] if res else 0 # 统计数据 cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main") res = cursor.fetchone() all_total = res['count'] if res else 0 cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main WHERE whether_to_enter = 1") res = cursor.fetchone() total_entered = res['count'] if res else 0 return items, total, all_total, total_entered except Exception as e: logger.exception("获取文档列表失败") return [], 0, 0, 0 finally: cursor.close() conn.close() async def get_document_detail(self, doc_id: str) -> Optional[Dict[str, Any]]: """获取文档详情 (关联查询子表)""" conn = get_db_connection() if not conn: return None cursor = conn.cursor() try: # 1. 查询主表 (关联用户表获取姓名) cursor.execute(""" SELECT m.*, u1.username as creator_name, u2.username as updater_name FROM t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id WHERE m.id = %s """, (doc_id,)) doc = cursor.fetchone() if not doc: return None # 2. 根据 source_type 查询对应的子表信息 source_type = doc.get('source_type') table_name = TABLE_MAP.get(source_type) if table_name: # 关联子表的所有字段 sub_sql = f"SELECT * FROM {table_name} WHERE id = %s" cursor.execute(sub_sql, (doc_id,)) sub_data = cursor.fetchone() if sub_data: # 将子表字段合并到 doc 中,方便前端使用 # 注意:如果字段名冲突,子表字段会覆盖主表字段(除了 id) sub_data.pop('id', None) doc.update(sub_data) return self._format_document_row(doc) except Exception as e: logger.exception("获取文档详情失败") return None finally: cursor.close() conn.close() def _to_int(self, value: Any) -> Optional[int]: """安全转换为整数""" if value is None or value == '' or str(value).lower() == 'null': return None try: return int(value) except (ValueError, TypeError): return None def _to_date(self, value: Any) -> Optional[str]: """安全处理日期字符串""" if value is None or value == '' or str(value).lower() == 'null': return None # 如果已经是 datetime.date 或 datetime.datetime 对象 if hasattr(value, 'strftime'): return value.strftime('%Y-%m-%d') return str(value) async def add_document(self, doc_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]: """添加新文档(先主表后子表,解耦触发器)""" conn = get_db_connection() if not conn: return False, "数据库连接失败", None cursor = conn.cursor() try: doc_id = str(uuid.uuid4()) table_type = doc_data.get('table_type', 'standard') table_name = TABLE_MAP.get(table_type) # 安全转换字段 release_date = self._to_date(doc_data.get('release_date')) # 处理 URL 存储(转为相对路径) file_url = self.minio_manager.get_relative_path(doc_data.get('file_url')) # 处理文档类型 (中文 -> 简写) doc_type_cn = doc_data.get('document_type') doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn) # 找不到则保持原样 # 处理专业领域 (中文 -> 简写) prof_field_cn = doc_data.get('professional_field') prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn) # 处理方案类别 (中文 -> 简写) plan_category_cn = doc_data.get('plan_category') plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn) # 处理一级分类 (中文 -> 简写) level_1_cn = doc_data.get('level_1_classification') level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn) # 处理二级分类 (中文 -> 简写) level_2_cn = doc_data.get('level_2_classification') level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn) # 处理三级分类 (中文 -> 简写) level_3_cn = doc_data.get('level_3_classification') level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn) # 处理四级分类 (中文 -> 简写) level_4_cn = doc_data.get('level_4_classification') level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn) # 处理四级分类 (中文 -> 简写) level_4_cn = doc_data.get('level_4_classification') level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn) # 处理四级分类 (中文 -> 简写) level_4_cn = doc_data.get('level_4_classification') level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn) # 1. 插入主表 (作为资产中心) cursor.execute( """ INSERT INTO t_samp_document_main ( id, title, source_type, file_url, file_extension, created_by, updated_by, created_time, updated_time, conversion_status, whether_to_task, kb_id ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s) """, ( doc_id, doc_data.get('title'), table_type, file_url, doc_data.get('file_extension'), user_id, user_id, doc_data.get('kb_id') ) ) # 2. 插入子表 (仅存储业务字段) if table_type == 'standard': cursor.execute( f""" INSERT INTO {table_name} ( id, chinese_name, english_name, standard_number, issuing_authority, release_date, implementation_date, drafting_unit, approving_department, participating_units, document_type, professional_field, engineering_phase, validity, reference_basis, source_url, note, created_by, updated_by, created_time, updated_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) """, ( doc_id, doc_data.get('title'), doc_data.get('english_name'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, self._to_date(doc_data.get('implementation_date')), doc_data.get('drafting_unit'), doc_data.get('approving_department'), doc_data.get('participating_units'), doc_type_code, prof_field_code, doc_data.get('engineering_phase'), doc_data.get('validity', '现行'), doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'), user_id, user_id ) ) elif table_type == 'construction_plan': cursor.execute( f""" INSERT INTO {table_name} ( id, plan_name, project_name, project_section, compiling_unit, compiling_date, plan_summary, compilation_basis, plan_category, level_1_classification, level_2_classification, level_3_classification, level_4_classification, note, created_by, updated_by, created_time, updated_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) """, ( doc_id, doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), plan_category_code, level_1_code or 'SC', level_2_code, level_3_code, level_4_code, doc_data.get('note'), user_id, user_id ) ) elif table_type == 'regulation': cursor.execute( f""" INSERT INTO {table_name} ( id, file_name, issuing_department, document_type, publish_date, effective_start_date, effective_end_date, note, created_by, updated_by, created_time, updated_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) """, ( doc_id, doc_data.get('title'), doc_data.get('issuing_authority'), doc_type_code, release_date, self._to_date(doc_data.get('effective_start_date')), self._to_date(doc_data.get('effective_end_date')), doc_data.get('note'), user_id, user_id ) ) # 3. 添加到任务管理中心 (类型为 data) try: await task_service.add_task(doc_id, 'data') except Exception as task_err: logger.error(f"添加文档 {doc_data.get('title')} 到任务中心失败: {task_err}") conn.commit() return True, "文档添加成功", doc_id except Exception as e: logger.exception("添加文档失败") conn.rollback() return False, str(e), None finally: cursor.close() conn.close() async def edit_document(self, doc_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]: """编辑文档(同步主表和子表,解耦触发器)""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: doc_id = doc_data.get('id') table_type = doc_data.get('table_type', 'standard') table_name = TABLE_MAP.get(table_type) # 安全转换字段 release_date = self._to_date(doc_data.get('release_date')) # 处理 URL 存储(转为相对路径) file_url = self.minio_manager.get_relative_path(doc_data.get('file_url')) # 处理文档类型 (中文 -> 简写) doc_type_cn = doc_data.get('document_type') doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn) # 找不到则保持原样 # 处理专业领域 (中文 -> 简写) prof_field_cn = doc_data.get('professional_field') prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn) # 处理方案类别 (中文 -> 简写) plan_category_cn = doc_data.get('plan_category') plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn) # 处理一级分类 (中文 -> 简写) level_1_cn = doc_data.get('level_1_classification') level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn) # 处理二级分类 (中文 -> 简写) level_2_cn = doc_data.get('level_2_classification') level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn) # 处理三级分类 (中文 -> 简写) level_3_cn = doc_data.get('level_3_classification') level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn) # 处理四级分类 (中文 -> 简写) level_4_cn = doc_data.get('level_4_classification') level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn) # 1. 更新主表 cursor.execute( """ UPDATE t_samp_document_main SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW(), kb_id = %s WHERE id = %s """, ( doc_data.get('title'), file_url, doc_data.get('file_extension'), updater_id, doc_data.get('kb_id'), doc_id ) ) # 2. 更新子表 if table_type == 'standard': cursor.execute( f""" UPDATE {table_name} SET chinese_name = %s, english_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s, implementation_date = %s, drafting_unit = %s, approving_department = %s, participating_units = %s, document_type = %s, professional_field = %s, engineering_phase = %s, validity = %s, reference_basis = %s, source_url = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """, ( doc_data.get('title'), doc_data.get('english_name'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, self._to_date(doc_data.get('implementation_date')), doc_data.get('drafting_unit'), doc_data.get('approving_department'), doc_data.get('participating_units'), doc_type_code, prof_field_code, doc_data.get('engineering_phase'), doc_data.get('validity'), doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'), updater_id, doc_id ) ) elif table_type == 'construction_plan': cursor.execute( f""" UPDATE {table_name} SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s, level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """, ( doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), plan_category_code, level_1_code, level_2_code, level_3_code, level_4_code, doc_data.get('note'), updater_id, doc_id ) ) elif table_type == 'regulation': cursor.execute( f""" UPDATE {table_name} SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s, effective_start_date = %s, effective_end_date = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """, ( doc_data.get('title'), doc_data.get('issuing_authority'), doc_type_code, release_date, self._to_date(doc_data.get('effective_start_date')), self._to_date(doc_data.get('effective_end_date')), doc_data.get('note'), updater_id, doc_id ) ) conn.commit() return True, "文档更新成功" except Exception as e: logger.exception("编辑文档失败") conn.rollback() return False, str(e) finally: cursor.close() conn.close() async def enter_document(self, doc_id: str, username: str) -> Tuple[bool, str]: """文档入库(单个)""" affected_rows, message = await self.batch_enter_knowledge_base([doc_id], username) return affected_rows > 0, message async def get_basic_info_list( self, type: str, page: int, size: int, keyword: Optional[str] = None, **filters ) -> Tuple[List[Dict[str, Any]], int]: """获取基本信息列表(关联主表获取文件和转换状态)""" conn = get_db_connection() if not conn: return [], 0 cursor = conn.cursor() try: # 根据类型选择表名和字段映射 if type == 'standard': table_name = "t_samp_standard_base_info" # 关联主表字段:file_url, conversion_status, md_url, json_url fields = """ s.id, s.chinese_name as title, s.standard_number as standard_no, s.issuing_authority, s.release_date, s.document_type, s.professional_field, s.validity, s.note, s.participating_units, s.reference_basis, s.created_by, u1.username as creator_name, s.created_time, s.updated_by, u2.username as updater_name, s.updated_time, m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter """ field_map = { 'title': 's.chinese_name', 'standard_no': 's.standard_number', 'issuing_authority': 's.issuing_authority', 'release_date': 's.release_date', 'document_type': 's.document_type', 'professional_field': 's.professional_field', 'validity': 's.validity' } elif type == 'construction_plan': table_name = "t_samp_construction_plan_base_info" fields = """ s.id, s.plan_name as title, NULL as standard_no, s.project_name, s.project_section, s.compiling_unit as issuing_authority, s.compiling_date as release_date, NULL as document_type, NULL as professional_field, NULL as validity, s.plan_summary, s.compilation_basis, s.plan_category, s.level_1_classification, s.level_2_classification, s.level_3_classification, s.level_4_classification, s.note, s.created_by, u1.username as creator_name, s.created_time, s.updated_by, u2.username as updater_name, s.updated_time, m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter """ field_map = { 'title': 's.plan_name', 'issuing_authority': 's.compiling_unit', 'release_date': 's.compiling_date', 'plan_category': 's.plan_category', 'level_1_classification': 's.level_1_classification', 'level_2_classification': 's.level_2_classification', 'level_3_classification': 's.level_3_classification', 'level_4_classification': 's.level_4_classification' } elif type == 'regulation': table_name = "t_samp_office_regulations" fields = """ s.id, s.file_name as title, NULL as standard_no, s.issuing_department as issuing_authority, s.publish_date as release_date, s.document_type, NULL as professional_field, NULL as validity, s.note, s.created_by, u1.username as creator_name, s.created_time, s.updated_by, u2.username as updater_name, s.updated_time, m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter """ field_map = { 'title': 's.file_name', 'issuing_authority': 's.issuing_department', 'release_date': 's.publish_date', 'document_type': 's.document_type' } else: return [], 0 where_clauses = [] params = [] # 统一关键字搜索 if keyword: if type == 'standard': where_clauses.append("(s.chinese_name LIKE %s OR s.standard_number LIKE %s)") params.extend([f"%{keyword}%", f"%{keyword}%"]) elif type == 'construction_plan': where_clauses.append("s.plan_name LIKE %s") params.append(f"%{keyword}%") field_map['issuing_authority'] = 's.compiling_unit' field_map['release_date'] = 's.compiling_date' elif type == 'regulation': where_clauses.append("s.file_name LIKE %s") params.append(f"%{keyword}%") field_map['issuing_authority'] = 's.issuing_department' field_map['release_date'] = 's.publish_date' # 精细化检索 for filter_key, filter_value in filters.items(): if not filter_value: continue # 特殊处理日期范围 date_field = field_map.get('release_date', 's.release_date') if filter_key == 'release_date_start': where_clauses.append(f"{date_field} >= %s") params.append(filter_value) continue if filter_key == 'release_date_end': where_clauses.append(f"{date_field} <= %s") params.append(filter_value) continue db_field = field_map.get(filter_key) if db_field: # 处理文档类型查询 (前端传中文 -> 数据库查简写) if filter_key == 'document_type': filter_value = DOCUMENT_TYPE_MAP.get(filter_value, filter_value) # 处理专业领域查询 (前端传中文 -> 数据库查简写) if filter_key == 'professional_field': filter_value = PROFESSIONAL_FIELD_MAP.get(filter_value, filter_value) # 处理方案类别查询 (前端传中文 -> 数据库查简写) if filter_key == 'plan_category': filter_value = PLAN_CATEGORY_MAP.get(filter_value, filter_value) # 处理一级分类查询 (前端传中文 -> 数据库查简写) if filter_key == 'level_1_classification': filter_value = FIRST_LEVEL_MAP.get(filter_value, filter_value) # 处理二级分类查询 (前端传中文 -> 数据库查简写) if filter_key == 'level_2_classification': filter_value = SECOND_LEVEL_MAP.get(filter_value, filter_value) # 处理三级分类查询 (前端传中文 -> 数据库查简写) if filter_key == 'level_3_classification': filter_value = THIRD_LEVEL_MAP.get(filter_value, filter_value) # 处理四级分类查询 (前端传中文 -> 数据库查简写) if filter_key == 'level_4_classification': filter_value = FOURTH_LEVEL_MAP.get(filter_value, filter_value) # 如果是 title, standard_no, issuing_authority,支持模糊查询 if filter_key in ['title', 'standard_no', 'issuing_authority']: where_clauses.append(f"{db_field} LIKE %s") params.append(f"%{filter_value}%") else: where_clauses.append(f"{db_field} = %s") params.append(filter_value) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" offset = (page - 1) * size # 使用 LEFT JOIN 关联主表和用户表获取姓名 sql = f""" SELECT {fields}, kb.name as kb_name FROM {table_name} s LEFT JOIN t_samp_document_main m ON s.id = m.id LEFT JOIN t_sys_user u1 ON s.created_by = u1.id LEFT JOIN t_sys_user u2 ON s.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id {where_sql} ORDER BY s.created_time DESC LIMIT %s OFFSET %s """ params = params + [size, offset] cursor.execute(sql, tuple(params)) items = cursor.fetchall() # 处理 URL 转换 for item in items: # 处理文档类型显示 (简写 -> 中文) doc_type_code = item.get('document_type') if doc_type_code in DOCUMENT_TYPE_REVERSE_MAP: item['document_type'] = DOCUMENT_TYPE_REVERSE_MAP[doc_type_code] # 处理专业领域显示 (简写 -> 中文) prof_field_code = item.get('professional_field') if prof_field_code in PROFESSIONAL_FIELD_REVERSE_MAP: item['professional_field'] = PROFESSIONAL_FIELD_REVERSE_MAP[prof_field_code] # 处理方案类别显示 (简写 -> 中文) plan_category_code = item.get('plan_category') if plan_category_code in PLAN_CATEGORY_REVERSE_MAP: item['plan_category'] = PLAN_CATEGORY_REVERSE_MAP[plan_category_code] # 处理一级分类显示 (简写 -> 中文) level_1_code = item.get('level_1_classification') if level_1_code in FIRST_LEVEL_REVERSE_MAP: item['level_1_classification'] = FIRST_LEVEL_REVERSE_MAP[level_1_code] # 处理二级分类显示 (简写 -> 中文) level_2_code = item.get('level_2_classification') if level_2_code in SECOND_LEVEL_REVERSE_MAP: item['level_2_classification'] = SECOND_LEVEL_REVERSE_MAP[level_2_code] # 处理三级分类显示 (简写 -> 中文) level_3_code = item.get('level_3_classification') if level_3_code in THIRD_LEVEL_REVERSE_MAP: item['level_3_classification'] = THIRD_LEVEL_REVERSE_MAP[level_3_code] # 处理四级分类显示 (简写 -> 中文) level_4_code = item.get('level_4_classification') if level_4_code in FOURTH_LEVEL_REVERSE_MAP: item['level_4_classification'] = FOURTH_LEVEL_REVERSE_MAP[level_4_code] for key in ['file_url', 'md_url', 'json_url']: if item.get(key): item[key] = self.minio_manager.get_full_url(item[key]) # 总数 count_sql = f"SELECT COUNT(*) as count FROM {table_name} s {where_sql}" cursor.execute(count_sql, tuple(params[:-2])) res = cursor.fetchone() total = res['count'] if res else 0 return items, total except Exception as e: logger.exception(f"获取 {type} 列表失败") return [], 0 finally: cursor.close() conn.close() # ==================== 文档转换 ==================== async def get_document_source_type(self, doc_id: str) -> Optional[str]: """获取文档的source_type""" conn = get_db_connection() if not conn: return None cursor = conn.cursor() try: cursor.execute("SELECT source_type FROM t_samp_document_main WHERE id = %s", (doc_id,)) res = cursor.fetchone() return res['source_type'] if res else None except Exception as e: logger.exception(f"获取文档source_type失败: {e}") return None finally: cursor.close() conn.close() async def get_document_title(self, doc_id: str) -> str: """获取文档标题""" conn = get_db_connection() if not conn: return "文档" cursor = conn.cursor() try: cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (doc_id,)) res = cursor.fetchone() return res['title'] if res else "文档" except Exception as e: logger.exception(f"获取文档标题失败: {e}") return "文档" finally: cursor.close() conn.close() async def update_conversion_status(self, doc_id: str, status: int, md_url: Optional[str] = None, json_url: Optional[str] = None, error_message: Optional[str] = None) -> bool: """更新文档转换状态 Args: doc_id: 文档ID status: 转换状态 (0=未转换, 1=转换中, 2=已完成, 3=失败) md_url: Markdown文件URL json_url: JSON文件URL error_message: 错误信息 """ conn = get_db_connection() if not conn: return False cursor = conn.cursor() try: update_clauses = ["conversion_status = %s"] params = [status] if error_message: update_clauses.append("conversion_error = %s") params.append(error_message) if md_url: update_clauses.append("md_url = %s") params.append(md_url) if json_url: update_clauses.append("json_url = %s") params.append(json_url) sql = f"UPDATE t_samp_document_main SET {', '.join(update_clauses)}, updated_time = NOW() WHERE id = %s" params.append(doc_id) cursor.execute(sql, tuple(params)) conn.commit() return True except Exception as e: logger.exception(f"更新转换进度失败: {e}") conn.rollback() return False finally: cursor.close() conn.close() # ==================== 基础信息管理 ==================== async def add_basic_info(self, type: str, data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]: """新增基本信息""" logger.info(f"Adding basic info for type {type}: {data}") conn = get_db_connection() if not conn: return False, "数据库连接失败", None cursor = conn.cursor() try: table_name = TABLE_MAP.get(type) if not table_name: return False, "无效的类型", None doc_id = str(uuid.uuid4()) file_url = data.get('file_url') file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None # 处理文档类型 (中文 -> 简写) doc_type_cn = data.get('document_type') doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn) # 处理专业领域 (中文 -> 简写) prof_field_cn = data.get('professional_field') prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn) # 处理方案类别 (中文 -> 简写) plan_category_cn = data.get('plan_category') plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn) # 处理一级分类 (中文 -> 简写) level_1_cn = data.get('level_1_classification') level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn) # 处理二级分类 (中文 -> 简写) level_2_cn = data.get('level_2_classification') level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn) # 处理三级分类 (中文 -> 简写) level_3_cn = data.get('level_3_classification') level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn) # 处理四级分类 (中文 -> 简写) level_4_cn = data.get('level_4_classification') level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn) # 处理四级分类 (中文 -> 简写) level_4_cn = data.get('level_4_classification') level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn) # 1. 插入主表 (解耦触发器,手动同步) cursor.execute( """ INSERT INTO t_samp_document_main ( id, title, source_type, file_url, file_extension, created_by, updated_by, created_time, updated_time, conversion_status, whether_to_task, kb_id ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s) """, ( doc_id, data.get('title'), type, file_url, file_extension, user_id, user_id, data.get('kb_id') ) ) # 2. 插入子表 (移除 file_url,因为它现在只存储 in 主表中) if type == 'standard': sql = f""" INSERT INTO {table_name} ( id, chinese_name, english_name, standard_number, issuing_authority, release_date, implementation_date, drafting_unit, approving_department, participating_units, document_type, professional_field, engineering_phase, validity, reference_basis, source_url, note, created_by, updated_by, created_time, updated_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) """ params = ( doc_id, data.get('title'), data.get('english_name'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), self._to_date(data.get('implementation_date')), data.get('drafting_unit'), data.get('approving_department'), data.get('participating_units'), doc_type_code, prof_field_code, data.get('engineering_phase'), data.get('validity', '现行'), data.get('reference_basis'), data.get('source_url'), data.get('note'), user_id, user_id ) elif type == 'construction_plan': sql = f""" INSERT INTO {table_name} ( id, plan_name, project_name, project_section, compiling_unit, compiling_date, plan_summary, compilation_basis, plan_category, level_1_classification, level_2_classification, level_3_classification, level_4_classification, note, created_by, updated_by, created_time, updated_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) """ params = ( doc_id, data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('plan_summary'), data.get('compilation_basis'), plan_category_code, level_1_code or 'SC', level_2_code, level_3_code, level_4_code, data.get('note'), user_id, user_id ) elif type == 'regulation': sql = f""" INSERT INTO {table_name} ( id, file_name, issuing_department, document_type, publish_date, effective_start_date, effective_end_date, note, created_by, updated_by, created_time, updated_time ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) """ params = ( doc_id, data.get('title'), data.get('issuing_authority'), doc_type_code, self._to_date(data.get('release_date')), self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')), data.get('note'), user_id, user_id ) else: return False, "不支持的类型", None cursor.execute(sql, params) # 3. 添加到任务管理中心 (类型为 data) try: # 尝试调用异步方法,如果报错则记录日志 import asyncio try: # 检查是否有正在运行的事件循环 loop = asyncio.get_event_loop() if loop.is_running(): # 在运行的循环中创建任务 loop.create_task(task_service.add_task(doc_id, 'data')) else: # 否则使用 run 运行(不推荐在 web 环境下这样做,但这里是兜底) loop.run_until_complete(task_service.add_task(doc_id, 'data')) except RuntimeError: # 没有事件循环时 asyncio.run(task_service.add_task(doc_id, 'data')) except Exception as task_err: logger.error(f"添加基本信息 {data.get('title')} 到任务中心失败: {task_err}") conn.commit() return True, "新增成功", doc_id except Exception as e: logger.exception("新增基本信息失败") conn.rollback() return False, str(e), None finally: cursor.close() conn.close() async def edit_basic_info(self, type: str, doc_id: str, data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]: """编辑基本信息""" logger.info(f"Editing basic info for type {type}, id {doc_id}: {data}") conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: table_name = TABLE_MAP.get(type) if not table_name: return False, "无效的类型" # 处理 URL 存储(转为相对路径) file_url = self.minio_manager.get_relative_path(data.get('file_url')) file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None # 处理文档类型 (中文 -> 简写) doc_type_cn = data.get('document_type') doc_type_code = DOCUMENT_TYPE_MAP.get(doc_type_cn, doc_type_cn) # 处理专业领域 (中文 -> 简写) prof_field_cn = data.get('professional_field') prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn) # 处理方案类别 (中文 -> 简写) plan_category_cn = data.get('plan_category') plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn) # 处理一级分类 (中文 -> 简写) level_1_cn = data.get('level_1_classification') level_1_code = FIRST_LEVEL_MAP.get(level_1_cn, level_1_cn) # 处理二级分类 (中文 -> 简写) level_2_cn = data.get('level_2_classification') level_2_code = SECOND_LEVEL_MAP.get(level_2_cn, level_2_cn) # 处理三级分类 (中文 -> 简写) level_3_cn = data.get('level_3_classification') level_3_code = THIRD_LEVEL_MAP.get(level_3_cn, level_3_cn) # 处理四级分类 (中文 -> 简写) level_4_cn = data.get('level_4_classification') level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn) # 1. 更新主表 (解耦触发器) cursor.execute( """ UPDATE t_samp_document_main SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW(), kb_id = %s WHERE id = %s """, (data.get('title'), file_url, file_extension, updater_id, data.get('kb_id'), doc_id) ) # 2. 更新子表 (移除 file_url) if type == 'standard': sql = f""" UPDATE {table_name} SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s, document_type = %s, professional_field = %s, validity = %s, english_name = %s, implementation_date = %s, drafting_unit = %s, approving_department = %s, engineering_phase = %s, participating_units = %s, reference_basis = %s, source_url = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """ params = ( data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), doc_type_code, prof_field_code, data.get('validity'), data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'), data.get('approving_department'), data.get('engineering_phase'), data.get('participating_units'), data.get('reference_basis'), data.get('source_url'), data.get('note'), updater_id, doc_id ) elif type == 'construction_plan': sql = f""" UPDATE {table_name} SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s, level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """ params = ( data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('plan_summary'), data.get('compilation_basis'), plan_category_code, level_1_code, level_2_code, level_3_code, level_4_code, data.get('note'), updater_id, doc_id ) elif type == 'regulation': sql = f""" UPDATE {table_name} SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s, effective_start_date = %s, effective_end_date = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """ params = ( data.get('title'), data.get('issuing_authority'), doc_type_code, self._to_date(data.get('release_date')), self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')), data.get('note'), updater_id, doc_id ) else: return False, "不支持的类型" cursor.execute(sql, params) conn.commit() return True, "编辑成功" except Exception as e: logger.exception("编辑基本信息失败") conn.rollback() return False, str(e) finally: cursor.close() conn.close() async def delete_basic_info(self, type: str, doc_id: str) -> Tuple[bool, str]: """删除基本信息""" if not doc_id: return False, "缺少 ID 参数" logger.info(f"Deleting basic info: type={type}, id={doc_id}") conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: table_name = TABLE_MAP.get(type) if not table_name: return False, "无效的类型" # 1. 显式删除子表记录 (防止 CASCADE 未生效) try: cursor.execute(f"DELETE FROM {table_name} WHERE id = %s", (doc_id,)) logger.info(f"Deleted from sub-table {table_name}, affected: {cursor.rowcount}") except Exception as sub_e: logger.warning(f"删除子表 {table_name} 记录失败 (可能不存在): {sub_e}") # 2. 同步删除任务管理中心的数据 (优先删除关联数据) try: # 使用当前事务删除任务记录(如果 task_service 支持的话,目前它自建连接) # 这里我们直接在当前 cursor 中也执行一次,确保事务一致性 cursor.execute("DELETE FROM t_task_management WHERE business_id = %s", (doc_id,)) logger.info(f"Deleted from t_task_management, affected: {cursor.rowcount}") except Exception as task_e: logger.warning(f"在主事务中删除任务记录失败: {task_e}") # 3. 删除主表记录 cursor.execute("DELETE FROM t_samp_document_main WHERE id = %s", (doc_id,)) affected_main = cursor.rowcount logger.info(f"Deleted from t_samp_document_main, affected: {affected_main}") if affected_main == 0: logger.warning(f"未找到主表记录: {doc_id}") # 即使主表没找到,我们也 commit 之前的操作并返回成功(幂等性) conn.commit() # 4. 再次确保任务中心数据已删除 (调用原有服务) try: await task_service.delete_task(doc_id) except Exception as task_err: logger.error(f"调用 task_service 删除任务失败: {task_err}") return True, "删除成功" except Exception as e: logger.exception(f"删除基本信息异常 (ID: {doc_id})") conn.rollback() return False, f"删除失败: {str(e)}" finally: cursor.close() conn.close()