""" 样本中心服务层 从 sample_view.py 提取的SQL查询逻辑 """ import logging import uuid from datetime import datetime from typing import Optional, List, Dict, Any, Tuple from app.base.async_mysql_connection import get_db_connection from app.base.minio_connection import get_minio_manager from app.core.config import config_handler from app.services.milvus_service import milvus_service from app.services.task_service import task_service logger = logging.getLogger(__name__) # 表名映射 TABLE_MAP = { 'basis': 't_samp_standard_base_info', 'work': 't_samp_construction_plan_base_info', 'job': 't_samp_office_regulations' } def get_table_name(source_type: str) -> Optional[str]: """根据source_type获取表名""" return TABLE_MAP.get(source_type) class SampleService: """样本中心服务类 - 使用 SQL 查询方式""" def __init__(self): """初始化服务""" # 使用统一的 MinIO 管理器 self.minio_manager = get_minio_manager() # 使用全局 Milvus 服务 self.milvus_service = milvus_service # 确保集合已创建 try: self.milvus_service.ensure_collections() except Exception as e: logger.error(f"初始化 Milvus 集合失败: {e}") async def get_upload_url(self, filename: str, content_type: str) -> Tuple[bool, str, Dict[str, Any]]: """获取 MinIO 预签名上传 URL""" try: data = self.minio_manager.get_upload_url(filename, content_type) return True, "成功获取上传链接", data except Exception as e: logger.exception("生成上传链接失败") return False, f"生成上传链接失败: {str(e)}", {} # ==================== 文档管理 ==================== async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str) -> Tuple[int, str]: """批量将文档入库到知识库 Args: doc_ids: 文档ID列表 username: 操作人 """ conn = get_db_connection() if not conn: return 0, "数据库连接失败,请检查数据库服务状态" cursor = conn.cursor() success_count = 0 skipped_count = 0 failed_count = 0 error_details = [] try: # 1. 获取所有选中选中的文档详情 placeholders = ','.join(['%s']*len(doc_ids)) fetch_sql = f""" SELECT id, title, source_type, md_url, conversion_status, created_time FROM t_samp_document_main WHERE id IN ({placeholders}) """ cursor.execute(fetch_sql, tuple(doc_ids)) selected_docs = cursor.fetchall() if not selected_docs: return 0, "选中的文档在数据库中不存在" # 2. 逐份处理 for doc in selected_docs: doc_id = doc['id'] title = doc.get('title', '未命名文档') status = doc.get('conversion_status') md_url = doc.get('md_url') # A. 检查转换状态 if status != 2: reason = "尚未转换成功" if status == 0 else "正在转换中" if status == 1 else "转换失败" logger.warning(f"文档 {title}({doc_id}) 状态为 {status},跳过入库: {reason}") skipped_count += 1 error_details.append(f"· {title}: {reason}") continue if not md_url: logger.warning(f"文档 {title}({doc_id}) 缺少 md_url,跳过入库") skipped_count += 1 error_details.append(f"· {title}: 转换结果地址丢失") continue # B. 从 MinIO 获取 Markdown 内容 try: md_content = self.minio_manager.get_object_content(md_url) if not md_content: raise ValueError(f"无法从 MinIO 读取内容 (URL: {md_url})") except Exception as minio_err: logger.error(f"读取文档 {title} 内容失败: {minio_err}") failed_count += 1 error_details.append(f"· {title}: 读取云端文件失败") continue # C. 调用 MilvusService 进行切分和入库 try: # 准备元数据 doc_info = { "doc_id": doc_id, "doc_name": title, "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else 20260127, "tags": doc.get('source_type') or 'unknown' } await self.milvus_service.insert_knowledge(md_content, doc_info) # D. 添加到任务管理中心 (类型为 data) try: await task_service.add_task(doc_id, 'data') except Exception as task_err: logger.error(f"添加文档 {title} 到任务中心失败: {task_err}") # E. 更新数据库状态 update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 1, updated_by = %s, updated_time = NOW() WHERE id = %s" cursor.execute(update_sql, (username, doc_id)) success_count += 1 except Exception as milvus_err: logger.exception(f"文档 {title} 写入向量库失败") failed_count += 1 error_details.append(f"· {title}: 写入向量库失败 ({str(milvus_err)})") continue conn.commit() # 构造详细的消息 msg = f"入库完成:成功 {success_count} 份" if skipped_count > 0: msg += f",跳过 {skipped_count} 份" if failed_count > 0: msg += f",失败 {failed_count} 份" if error_details: detailed_msg = msg + "\n\n详情:\n" + "\n".join(error_details) return success_count, detailed_msg return success_count, msg except Exception as e: logger.exception(f"文档批量入库异常: {e}") conn.rollback() return 0, f"操作异常: {str(e)}" finally: cursor.close() conn.close() async def batch_delete_documents(self, doc_ids: List[str]) -> Tuple[int, str]: """批量删除文档""" conn = get_db_connection() if not conn: return 0, "数据库连接失败" cursor = conn.cursor() try: if not doc_ids: return 0, "未指定要删除的文档 ID" placeholders = ', '.join(['%s'] * len(doc_ids)) # 尝试同步删除子表中的数据 try: for sub_table in TABLE_MAP.values(): sub_sql = f"DELETE FROM {sub_table} WHERE id IN ({placeholders})" try: cursor.execute(sub_sql, doc_ids) except Exception as sub_e: logger.error(f"删除子表 {sub_table} 数据失败: {sub_e}") except Exception as sync_e: logger.error(f"同步删除子表数据失败: {sync_e}") # 删除主表 t_samp_document_main 中的数据 sql_main = f"DELETE FROM t_samp_document_main WHERE id IN ({placeholders})" cursor.execute(sql_main, doc_ids) affected_rows = cursor.rowcount # 同步删除任务管理中心的数据 try: for doc_id in doc_ids: await task_service.delete_task(doc_id) except Exception as task_err: logger.error(f"同步删除任务中心数据失败: {task_err}") conn.commit() return affected_rows, f"成功删除 {affected_rows} 条文档数据" except Exception as e: logger.exception("批量删除失败") conn.rollback() return 0, f"批量删除失败: {str(e)}" finally: cursor.close() conn.close() async def get_document_list( self, whether_to_enter: Optional[int] = None, keyword: Optional[str] = None, table_type: Optional[str] = None, plan_category: Optional[str] = None, level_2_classification: Optional[str] = None, level_3_classification: Optional[str] = None, level_4_classification: Optional[str] = None, page: int = 1, size: int = 50 ) -> Tuple[List[Dict[str, Any]], int, int, int]: """获取文档列表 (支持关联查询子表)""" conn = get_db_connection() if not conn: return [], 0, 0, 0 cursor = conn.cursor() try: where_clauses = [] params = [] # 基础查询 if table_type and table_type in TABLE_MAP: # 如果指定了类型,使用 LEFT JOIN 关联查询,以便搜索子表字段 sub_table = TABLE_MAP[table_type] from_sql = f"t_samp_document_main m LEFT JOIN {sub_table} s ON m.id = s.id" fields_sql = "m.*, s.*" # 获取所有字段,包括子表字段 where_clauses.append("m.source_type = %s") params.append(table_type) order_sql = "m.created_time DESC" title_field = "m.title" # 施工方案特有的过滤字段 if table_type == 'work': if plan_category: where_clauses.append("s.plan_category = %s") params.append(plan_category) if level_2_classification: where_clauses.append("s.level_2_classification = %s") params.append(level_2_classification) if level_3_classification: where_clauses.append("s.level_3_classification = %s") params.append(level_3_classification) if level_4_classification: where_clauses.append("s.level_4_classification = %s") params.append(level_4_classification) # 特殊处理 id 冲突,确保返回的是主表 m.id fields_sql = "m.*, s.*, m.id as id" else: from_sql = "t_samp_document_main" fields_sql = "*" order_sql = "created_time DESC" title_field = "title" if whether_to_enter is not None: # 按照 search_replace_blocks 的逻辑,这里使用 conversion_status 过滤 where_clauses.append("conversion_status = %s") params.append(whether_to_enter) if keyword: where_clauses.append(f"{title_field} LIKE %s") params.append(f"%{keyword}%") where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" offset = (page - 1) * size sql = f"SELECT {fields_sql} FROM {from_sql} {where_sql} ORDER BY {order_sql} LIMIT %s OFFSET %s" params.extend([size, offset]) logger.info(f"Executing SQL: {sql} with params: {params}") cursor.execute(sql, tuple(params)) items = [] for row in cursor.fetchall(): item = row # 处理 URL 转换 for key in ['file_url', 'md_url', 'json_url']: if item.get(key): item[key] = self.minio_manager.get_full_url(item[key]) # 映射字段以适配前端通用显示 source_type = item.get('source_type') if source_type == 'work': item['issuing_authority'] = item.get('compiling_unit') item['release_date'] = item.get('compiling_date') elif source_type == 'job': item['issuing_authority'] = item.get('issuing_department') item['release_date'] = item.get('publish_date') # 格式化时间 for key in ['created_time', 'updated_time', 'release_date', 'publish_date', 'compiling_date']: if item.get(key) and hasattr(item[key], 'isoformat'): item[key] = item[key].isoformat() elif item.get(key) is not None: item[key] = str(item[key]) # 增加格式化文件名供前端显示 if item.get('conversion_status') == 2: title = item.get('title', 'document') item['md_display_name'] = f"{title}.md" item['json_display_name'] = f"{title}.json" items.append(item) # 总数 count_sql = f"SELECT COUNT(*) as count FROM {from_sql} {where_sql}" cursor.execute(count_sql, tuple(params[:-2])) res = cursor.fetchone() total = res['count'] if res else 0 # 统计数据 cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main") res = cursor.fetchone() all_total = res['count'] if res else 0 cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main WHERE conversion_status = 2") res = cursor.fetchone() total_entered = res['count'] if res else 0 return items, total, all_total, total_entered except Exception as e: logger.exception("获取文档列表失败") return [], 0, 0, 0 finally: cursor.close() conn.close() async def get_document_detail(self, doc_id: str) -> Optional[Dict[str, Any]]: """获取文档详情 (关联查询子表)""" conn = get_db_connection() if not conn: return None cursor = conn.cursor() try: # 1. 查询主表 cursor.execute("SELECT * FROM t_samp_document_main WHERE id = %s", (doc_id,)) doc = cursor.fetchone() if not doc: return None # 2. 根据 source_type 查询对应的子表信息 source_type = doc.get('source_type') table_name = TABLE_MAP.get(source_type) if table_name: # 关联子表的所有字段 sub_sql = f"SELECT * FROM {table_name} WHERE id = %s" cursor.execute(sub_sql, (doc_id,)) sub_data = cursor.fetchone() if sub_data: # 将子表字段合并到 doc 中,方便前端使用 # 注意:如果字段名冲突,子表字段会覆盖主表字段(除了 id) sub_data.pop('id', None) # 特殊处理一些前端需要的映射字段 if source_type == 'basis': doc['standard_no'] = sub_data.get('standard_number') elif source_type == 'work': doc['issuing_authority'] = sub_data.get('compiling_unit') doc['release_date'] = sub_data.get('compiling_date') elif source_type == 'job': doc['issuing_authority'] = sub_data.get('issuing_department') doc['release_date'] = sub_data.get('publish_date') doc.update(sub_data) # 格式化时间 for key in ['created_time', 'updated_time', 'release_date', 'publish_date', 'compiling_date', 'implementation_date']: val = doc.get(key) if val and hasattr(val, 'isoformat'): doc[key] = val.isoformat() elif val is not None: doc[key] = str(val) # 处理 URL 转换 for key in ['file_url', 'md_url', 'json_url']: if doc.get(key): doc[key] = self.minio_manager.get_full_url(doc[key]) # 增加格式化文件名供前端显示 if doc.get('conversion_status') == 2: title = doc.get('title', 'document') doc['md_display_name'] = f"{title}.md" doc['json_display_name'] = f"{title}.json" return doc except Exception as e: logger.exception("获取文档详情失败") return None finally: cursor.close() conn.close() def _to_int(self, value: Any) -> Optional[int]: """安全转换为整数""" if value is None or value == '' or str(value).lower() == 'null': return None try: return int(value) except (ValueError, TypeError): return None def _to_date(self, value: Any) -> Optional[str]: """安全处理日期字符串""" if value is None or value == '' or str(value).lower() == 'null': return None # 如果已经是 datetime.date 或 datetime.datetime 对象 if hasattr(value, 'strftime'): return value.strftime('%Y-%m-%d') return str(value) async def add_document(self, doc_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]: """添加新文档(先主表后子表,解耦触发器)""" conn = get_db_connection() if not conn: return False, "数据库连接失败", None cursor = conn.cursor() try: doc_id = str(uuid.uuid4()) table_type = doc_data.get('table_type', 'basis') table_name = TABLE_MAP.get(table_type) # 安全转换字段 release_date = self._to_date(doc_data.get('release_date')) # 处理 URL 存储(转为相对路径) file_url = self.minio_manager.get_relative_path(doc_data.get('file_url')) # 1. 插入主表 (作为资产中心) cursor.execute( """ INSERT INTO t_samp_document_main ( id, title, source_type, file_url, file_extension, created_by, updated_by, created_time, updated_time, conversion_status ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0) """, ( doc_id, doc_data.get('title'), table_type, file_url, doc_data.get('file_extension'), user_id, user_id ) ) # 2. 插入子表 (仅存储业务字段) if table_type == 'basis': cursor.execute( f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())", (doc_id, doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), doc_data.get('note'), user_id) ) elif table_type == 'work': cursor.execute( f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, plan_summary, compilation_basis, plan_category, level_1_classification, level_2_classification, level_3_classification, level_4_classification, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())", (doc_id, doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), doc_data.get('plan_category'), doc_data.get('level_1_classification'), doc_data.get('level_2_classification'), doc_data.get('level_3_classification'), doc_data.get('level_4_classification'), doc_data.get('note'), user_id) ) elif table_type == 'job': cursor.execute( f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())", (doc_id, doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, doc_data.get('note'), user_id) ) # 3. 添加到任务管理中心 (类型为 data) try: await task_service.add_task(doc_id, 'data') except Exception as task_err: logger.error(f"添加文档 {doc_data.get('title')} 到任务中心失败: {task_err}") conn.commit() return True, "文档添加成功", doc_id except Exception as e: logger.exception("添加文档失败") conn.rollback() return False, str(e), None finally: cursor.close() conn.close() async def edit_document(self, doc_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]: """编辑文档(同步主表和子表,解耦触发器)""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: doc_id = doc_data.get('id') table_type = doc_data.get('table_type', 'basis') table_name = TABLE_MAP.get(table_type) # 安全转换字段 release_date = self._to_date(doc_data.get('release_date')) # 处理 URL 存储(转为相对路径) file_url = self.minio_manager.get_relative_path(doc_data.get('file_url')) # 1. 更新主表 cursor.execute( """ UPDATE t_samp_document_main SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """, ( doc_data.get('title'), file_url, doc_data.get('file_extension'), updater_id, doc_id ) ) # 2. 更新子表 if table_type == 'basis': cursor.execute( f"UPDATE {table_name} SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s, document_type = %s, professional_field = %s, validity = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s", (doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), doc_data.get('note'), updater_id, doc_id) ) elif table_type == 'work': cursor.execute( f"UPDATE {table_name} SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s, level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s", (doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), doc_data.get('plan_category'), doc_data.get('level_1_classification'), doc_data.get('level_2_classification'), doc_data.get('level_3_classification'), doc_data.get('level_4_classification'), doc_data.get('note'), updater_id, doc_id) ) elif table_type == 'job': cursor.execute( f"UPDATE {table_name} SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s", (doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, doc_data.get('note'), updater_id, doc_id) ) conn.commit() return True, "文档更新成功" except Exception as e: logger.exception("编辑文档失败") conn.rollback() return False, str(e) finally: cursor.close() conn.close() async def enter_document(self, doc_id: str, username: str) -> Tuple[bool, str]: """文档入库(单个)""" affected_rows, message = await self.batch_enter_knowledge_base([doc_id], username) return affected_rows > 0, message async def get_basic_info_list( self, type: str, page: int, size: int, keyword: Optional[str] = None, **filters ) -> Tuple[List[Dict[str, Any]], int]: """获取基本信息列表(关联主表获取文件和转换状态)""" conn = get_db_connection() if not conn: return [], 0 cursor = conn.cursor() try: # 根据类型选择表名和字段映射 if type == 'basis': table_name = "t_samp_standard_base_info" # 关联主表字段:file_url, conversion_status, md_url, json_url fields = """ s.id, s.chinese_name as title, s.standard_number as standard_no, s.issuing_authority, s.release_date, s.document_type, s.professional_field, s.validity, s.note, s.created_by, s.created_time, s.updated_by, s.updated_time, m.file_url, m.conversion_status, m.md_url, m.json_url """ field_map = { 'title': 's.chinese_name', 'standard_no': 's.standard_number', 'issuing_authority': 's.issuing_authority', 'release_date': 's.release_date', 'document_type': 's.document_type', 'professional_field': 's.professional_field', 'validity': 's.validity' } elif type == 'work': table_name = "t_samp_construction_plan_base_info" fields = """ s.id, s.plan_name as title, NULL as standard_no, s.project_name, s.project_section, s.compiling_unit as issuing_authority, s.compiling_date as release_date, NULL as document_type, NULL as professional_field, NULL as validity, s.plan_summary, s.compilation_basis, s.plan_category, s.level_1_classification, s.level_2_classification, s.level_3_classification, s.level_4_classification, s.note, s.created_by, s.created_time, s.updated_by, s.updated_time, m.file_url, m.conversion_status, m.md_url, m.json_url """ field_map = { 'title': 's.plan_name', 'issuing_authority': 's.compiling_unit', 'release_date': 's.compiling_date', 'plan_category': 's.plan_category', 'level_1_classification': 's.level_1_classification', 'level_2_classification': 's.level_2_classification', 'level_3_classification': 's.level_3_classification', 'level_4_classification': 's.level_4_classification' } elif type == 'job': table_name = "t_samp_office_regulations" fields = """ s.id, s.file_name as title, NULL as standard_no, s.issuing_department as issuing_authority, s.publish_date as release_date, s.document_type, NULL as professional_field, NULL as validity, s.note, s.created_by, s.created_time, s.updated_by, s.updated_time, m.file_url, m.conversion_status, m.md_url, m.json_url """ field_map = { 'title': 's.file_name', 'issuing_authority': 's.issuing_department', 'release_date': 's.publish_date', 'document_type': 's.document_type' } else: return [], 0 where_clauses = [] params = [] # 统一关键字搜索 if keyword: if type == 'basis': where_clauses.append("(s.chinese_name LIKE %s OR s.standard_number LIKE %s)") params.extend([f"%{keyword}%", f"%{keyword}%"]) elif type == 'work': where_clauses.append("s.plan_name LIKE %s") params.append(f"%{keyword}%") elif type == 'job': where_clauses.append("s.file_name LIKE %s") params.append(f"%{keyword}%") # 精细化检索 for filter_key, filter_value in filters.items(): if not filter_value: continue db_field = field_map.get(filter_key) if db_field: where_clauses.append(f"{db_field} = %s") params.append(filter_value) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" offset = (page - 1) * size # 使用 LEFT JOIN 关联主表 sql = f""" SELECT {fields} FROM {table_name} s LEFT JOIN t_samp_document_main m ON s.id = m.id {where_sql} ORDER BY s.created_time DESC LIMIT %s OFFSET %s """ params = params + [size, offset] cursor.execute(sql, tuple(params)) items = cursor.fetchall() # 处理 URL 转换 for item in items: for key in ['file_url', 'md_url', 'json_url']: if item.get(key): item[key] = self.minio_manager.get_full_url(item[key]) # 总数 count_sql = f"SELECT COUNT(*) as count FROM {table_name} s {where_sql}" cursor.execute(count_sql, tuple(params[:-2])) res = cursor.fetchone() total = res['count'] if res else 0 return items, total except Exception as e: logger.exception(f"获取 {type} 列表失败") return [], 0 finally: cursor.close() conn.close() # ==================== 文档转换 ==================== async def get_document_source_type(self, doc_id: str) -> Optional[str]: """获取文档的source_type""" conn = get_db_connection() if not conn: return None cursor = conn.cursor() try: cursor.execute("SELECT source_type FROM t_samp_document_main WHERE id = %s", (doc_id,)) res = cursor.fetchone() return res['source_type'] if res else None except Exception as e: logger.exception(f"获取文档source_type失败: {e}") return None finally: cursor.close() conn.close() async def get_document_title(self, doc_id: str) -> str: """获取文档标题""" conn = get_db_connection() if not conn: return "文档" cursor = conn.cursor() try: cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (doc_id,)) res = cursor.fetchone() return res['title'] if res else "文档" except Exception as e: logger.exception(f"获取文档标题失败: {e}") return "文档" finally: cursor.close() conn.close() async def update_conversion_status(self, doc_id: str, status: int, md_url: Optional[str] = None, json_url: Optional[str] = None, error_message: Optional[str] = None) -> bool: """更新文档转换状态 Args: doc_id: 文档ID status: 转换状态 (0=未转换, 1=转换中, 2=已完成, 3=失败) md_url: Markdown文件URL json_url: JSON文件URL error_message: 错误信息 """ conn = get_db_connection() if not conn: return False cursor = conn.cursor() try: update_clauses = ["conversion_status = %s"] params = [status] if error_message: update_clauses.append("conversion_error = %s") params.append(error_message) if md_url: update_clauses.append("md_url = %s") params.append(md_url) if json_url: update_clauses.append("json_url = %s") params.append(json_url) sql = f"UPDATE t_samp_document_main SET {', '.join(update_clauses)}, updated_time = NOW() WHERE id = %s" params.append(doc_id) cursor.execute(sql, tuple(params)) conn.commit() return True except Exception as e: logger.exception(f"更新转换进度失败: {e}") conn.rollback() return False finally: cursor.close() conn.close() # ==================== 基础信息管理 ==================== async def add_basic_info(self, type: str, data: Dict[str, Any], user_id: str) -> Tuple[bool, str]: """新增基本信息""" logger.info(f"Adding basic info for type {type}: {data}") conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: table_name = TABLE_MAP.get(type) if not table_name: return False, "无效的类型" doc_id = str(uuid.uuid4()) file_url = data.get('file_url') file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None # 1. 插入主表 (解耦触发器,手动同步) cursor.execute( """ INSERT INTO t_samp_document_main ( id, title, source_type, file_url, file_extension, created_by, updated_by, created_time, updated_time, conversion_status ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0) """, ( doc_id, data.get('title'), type, file_url, file_extension, user_id, user_id ) ) # 2. 插入子表 (移除 file_url,因为它现在只存储在主表中) if type == 'basis': sql = f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())" params = (doc_id, data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('document_type'), data.get('professional_field'), data.get('validity', '现行'), data.get('note'), user_id) elif type == 'work': sql = f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, plan_summary, compilation_basis, plan_category, level_1_classification, level_2_classification, level_3_classification, level_4_classification, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())" params = (doc_id, data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('plan_summary'), data.get('compilation_basis'), data.get('plan_category'), data.get('level_1_classification'), data.get('level_2_classification'), data.get('level_3_classification'), data.get('level_4_classification'), data.get('note'), user_id) elif type == 'job': sql = f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())" params = (doc_id, data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')), data.get('note'), user_id) else: return False, "不支持的类型" cursor.execute(sql, params) # 3. 添加到任务管理中心 (类型为 data) try: await task_service.add_task(doc_id, 'data') except Exception as task_err: logger.error(f"添加基本信息 {data.get('title')} 到任务中心失败: {task_err}") conn.commit() return True, "新增成功", doc_id except Exception as e: logger.exception("新增基本信息失败") conn.rollback() return False, str(e) finally: cursor.close() conn.close() async def edit_basic_info(self, type: str, info_id: str, data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]: """编辑基本信息""" logger.info(f"Editing basic info for type {type}, id {info_id}: {data}") conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: table_name = TABLE_MAP.get(type) if not table_name: return False, "无效的类型" # 处理 URL 存储(转为相对路径) file_url = self.minio_manager.get_relative_path(data.get('file_url')) file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None # 1. 更新主表 (解耦触发器) cursor.execute( """ UPDATE t_samp_document_main SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """, (data.get('title'), file_url, file_extension, updater_id, info_id) ) # 2. 更新子表 (移除 file_url) if type == 'basis': sql = f""" UPDATE {table_name} SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s, document_type = %s, professional_field = %s, validity = %s, english_name = %s, implementation_date = %s, drafting_unit = %s, approving_department = %s, participating_units = %s, engineering_phase = %s, reference_basis = %s, source_url = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """ params = ( data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('document_type'), data.get('professional_field'), data.get('validity'), data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'), data.get('approving_department'), data.get('participating_units'), data.get('engineering_phase'), data.get('reference_basis'), data.get('source_url'), data.get('note'), updater_id, info_id ) elif type == 'work': sql = f""" UPDATE {table_name} SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s, level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """ params = ( data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('plan_summary'), data.get('compilation_basis'), data.get('plan_category'), data.get('level_1_classification'), data.get('level_2_classification'), data.get('level_3_classification'), data.get('level_4_classification'), data.get('note'), updater_id, info_id ) elif type == 'job': sql = f""" UPDATE {table_name} SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s, effective_start_date = %s, effective_end_date = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """ params = ( data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')), self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')), data.get('note'), updater_id, info_id ) else: return False, "不支持的类型" cursor.execute(sql, params) conn.commit() return True, "编辑成功" except Exception as e: logger.exception("编辑基本信息失败") conn.rollback() return False, str(e) finally: cursor.close() conn.close() async def delete_basic_info(self, type: str, info_id: str) -> Tuple[bool, str]: """删除基本信息""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: table_name = TABLE_MAP.get(type) if not table_name: return False, "无效的类型" # 1. 删除主表记录 (由于设置了 ON DELETE CASCADE,子表记录会自动删除) cursor.execute("DELETE FROM t_samp_document_main WHERE id = %s", (info_id,)) # 同步删除任务管理中心的数据 try: await task_service.delete_task(info_id) except Exception as task_err: logger.error(f"同步删除任务中心数据失败 (ID: {info_id}): {task_err}") conn.commit() return True, "删除成功" except Exception as e: logger.exception("删除基本信息失败") conn.rollback() return False, str(e) finally: cursor.close() conn.close()