""" 样本中心服务层 从 sample_view.py 提取的SQL查询逻辑 """ import logging import uuid from typing import Optional, List, Dict, Any, Tuple from app.base.async_mysql_connection import get_db_connection from app.base.minio_connection import get_minio_manager from app.core.config import config_handler logger = logging.getLogger(__name__) # 表名映射 TABLE_MAP = { 'basis': 't_samp_standard_base_info', 'work': 't_samp_construction_plan_base_info', 'job': 't_samp_office_regulations' } def get_table_name(source_type: str) -> Optional[str]: """根据source_type获取表名""" return TABLE_MAP.get(source_type) class SampleService: """样本中心服务类 - 使用 SQL 查询方式""" def __init__(self): """初始化服务""" # 使用统一的 MinIO 管理器 self.minio_manager = get_minio_manager() async def get_upload_url(self, filename: str, content_type: str) -> Tuple[bool, str, Dict[str, Any]]: """获取 MinIO 预签名上传 URL""" try: data = self.minio_manager.get_upload_url(filename, content_type) return True, "成功获取上传链接", data except Exception as e: logger.exception("生成上传链接失败") return False, f"生成上传链接失败: {str(e)}", {} # ==================== 文档管理 ==================== async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str) -> Tuple[int, str]: """批量将文档入库到知识库 Args: doc_ids: 文档ID列表 username: 操作人 """ conn = get_db_connection() if not conn: return 0, "数据库连接失败" cursor = conn.cursor() try: # 1. 严格检查转换状态:只有 conversion_status = 2 (转换成功) 且 whether_to_enter = 0 (未入库) 的才能入库 check_sql = f"SELECT id, title FROM t_samp_document_main WHERE id IN ({','.join(['%s']*len(doc_ids))}) AND conversion_status = 2 AND whether_to_enter = 0" cursor.execute(check_sql, tuple(doc_ids)) valid_docs = cursor.fetchall() valid_ids = [doc['id'] for doc in valid_docs] if not valid_ids: return 0, "选中的文档中没有满足入库条件(已转换成功且未入库)的记录" # 2. 更新状态为已入库 update_sql = f"UPDATE t_samp_document_main SET whether_to_enter = 1, updated_by = %s, updated_time = NOW() WHERE id IN ({','.join(['%s']*len(valid_ids))})" cursor.execute(update_sql, (username, *valid_ids)) affected_rows = cursor.rowcount conn.commit() return affected_rows, f"成功入库 {affected_rows} 份文档" except Exception as e: logger.exception(f"文档批量入库失败: {e}") conn.rollback() return 0, f"入库失败: {str(e)}" finally: cursor.close() conn.close() async def batch_delete_documents(self, doc_ids: List[str]) -> Tuple[int, str]: """批量删除文档""" conn = get_db_connection() if not conn: return 0, "数据库连接失败" cursor = conn.cursor() try: if not doc_ids: return 0, "未指定要删除的文档 ID" placeholders = ', '.join(['%s'] * len(doc_ids)) # 尝试同步删除子表中的数据 try: cursor.execute(f"SELECT source_type, source_id FROM t_samp_document_main WHERE id IN ({placeholders})", doc_ids) docs = cursor.fetchall() for doc_row in docs: s_type = doc_row['source_type'] s_id = doc_row['source_id'] if s_type and s_id: sub_table = get_table_name(s_type) if sub_table: sub_sql = f"DELETE FROM {sub_table} WHERE id = %s" try: cursor.execute(sub_sql, (s_id,)) except Exception as sub_e: logger.error(f"删除子表 {sub_table} 数据失败: {sub_e}") except Exception as sync_e: logger.error(f"同步删除子表数据失败: {sync_e}") # 删除主表 t_samp_document_main 中的数据 sql_main = f"DELETE FROM t_samp_document_main WHERE id IN ({placeholders})" cursor.execute(sql_main, doc_ids) affected_rows = cursor.rowcount conn.commit() return affected_rows, f"成功删除 {affected_rows} 条文档数据" except Exception as e: logger.exception("批量删除失败") conn.rollback() return 0, f"批量删除失败: {str(e)}" finally: cursor.close() conn.close() async def get_document_list( self, whether_to_enter: Optional[int] = None, keyword: Optional[str] = None, table_type: Optional[str] = None, page: int = 1, size: int = 50 ) -> Tuple[List[Dict[str, Any]], int, int, int]: """获取文档列表(从主表查询)""" conn = get_db_connection() if not conn: return [], 0, 0, 0 cursor = conn.cursor() try: where_clauses = [] params = [] if table_type: where_clauses.append("source_type = %s") params.append(table_type) if whether_to_enter is not None: where_clauses.append("whether_to_enter = %s") params.append(whether_to_enter) if keyword: where_clauses.append("title LIKE %s") params.append(f"%{keyword}%") where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" offset = (page - 1) * size sql = f"SELECT * FROM t_samp_document_main {where_sql} ORDER BY created_time DESC LIMIT %s OFFSET %s" params.extend([size, offset]) logger.info(f"Executing SQL: {sql} with params: {params}") cursor.execute(sql, tuple(params)) items = [] for row in cursor.fetchall(): item = row # DictCursor already returns dict # 格式化时间 for key in ['created_time', 'updated_time']: if item.get(key) and hasattr(item[key], 'isoformat'): item[key] = item[key].isoformat() # 增加格式化文件名供前端显示 if item.get('conversion_status') == 2: title = item.get('title', 'document') item['md_display_name'] = f"{title}.md" item['json_display_name'] = f"{title}.json" items.append(item) # 总数 count_sql = f"SELECT COUNT(*) as count FROM t_samp_document_main {where_sql}" cursor.execute(count_sql, tuple(params[:-2])) res = cursor.fetchone() total = res['count'] if res else 0 # 统计数据 cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main") res = cursor.fetchone() all_total = res['count'] if res else 0 cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main WHERE whether_to_enter = 1") res = cursor.fetchone() total_entered = res['count'] if res else 0 return items, total, all_total, total_entered except Exception as e: logger.exception("获取文档列表失败") return [], 0, 0, 0 finally: cursor.close() conn.close() async def get_document_detail(self, doc_id: str) -> Optional[Dict[str, Any]]: """获取文档详情""" conn = get_db_connection() if not conn: return None cursor = conn.cursor() try: # 查询主表 cursor.execute("SELECT * FROM t_samp_document_main WHERE id = %s", (doc_id,)) doc = cursor.fetchone() if not doc: return None # 格式化时间 for key in ['created_time', 'updated_time']: if doc.get(key) and hasattr(doc[key], 'isoformat'): doc[key] = doc[key].isoformat() # 增加格式化文件名供前端显示 if doc.get('conversion_status') == 2: title = doc.get('title', 'document') doc['md_display_name'] = f"{title}.md" doc['json_display_name'] = f"{title}.json" return doc except Exception as e: logger.exception("获取文档详情失败") return None finally: cursor.close() conn.close() def _to_int(self, value: Any) -> Optional[int]: """安全转换为整数""" if value is None or value == '' or str(value).lower() == 'null': return None try: return int(value) except (ValueError, TypeError): return None def _to_date(self, value: Any) -> Optional[str]: """安全处理日期字符串""" if value is None or value == '' or str(value).lower() == 'null': return None # 如果已经是 datetime.date 或 datetime.datetime 对象 if hasattr(value, 'strftime'): return value.strftime('%Y-%m-%d') return str(value) async def add_document(self, doc_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]: """添加新文档(先主表后子表,解耦触发器)""" conn = get_db_connection() if not conn: return False, "数据库连接失败", None cursor = conn.cursor() try: doc_id = str(uuid.uuid4()) source_id = str(uuid.uuid4()) table_type = doc_data.get('table_type', 'basis') table_name = TABLE_MAP.get(table_type) # 安全转换字段 release_date = self._to_date(doc_data.get('release_date')) # 1. 插入主表 (作为资产中心) cursor.execute( """ INSERT INTO t_samp_document_main ( id, title, source_type, source_id, file_url, file_extension, created_by, updated_by, created_time, updated_time, conversion_status ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0) """, ( doc_id, doc_data.get('title'), table_type, source_id, doc_data.get('file_url'), doc_data.get('file_extension'), user_id, user_id ) ) # 2. 插入子表 (仅存储业务字段) if table_type == 'basis': cursor.execute( f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())", (source_id, doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), user_id) ) elif table_type == 'work': cursor.execute( f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())", (source_id, doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, user_id) ) elif table_type == 'job': cursor.execute( f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())", (source_id, doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, user_id) ) conn.commit() return True, "文档添加成功", doc_id except Exception as e: logger.exception("添加文档失败") conn.rollback() return False, str(e), None finally: cursor.close() conn.close() async def edit_document(self, doc_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]: """编辑文档(同步主表和子表,解耦触发器)""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: doc_id = doc_data.get('id') source_id = doc_data.get('source_id') table_type = doc_data.get('table_type', 'basis') table_name = TABLE_MAP.get(table_type) # 安全转换字段 release_date = self._to_date(doc_data.get('release_date')) # 1. 更新主表 cursor.execute( """ UPDATE t_samp_document_main SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """, ( doc_data.get('title'), doc_data.get('file_url'), doc_data.get('file_extension'), updater_id, doc_id ) ) # 2. 更新子表 if table_type == 'basis': cursor.execute( f"UPDATE {table_name} SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s, document_type = %s, professional_field = %s, validity = %s, updated_by = %s, updated_time = NOW() WHERE id = %s", (doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), updater_id, source_id) ) elif table_type == 'work': cursor.execute( f"UPDATE {table_name} SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s, updated_by = %s, updated_time = NOW() WHERE id = %s", (doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, updater_id, source_id) ) elif table_type == 'job': cursor.execute( f"UPDATE {table_name} SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s, updated_by = %s, updated_time = NOW() WHERE id = %s", (doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, updater_id, source_id) ) conn.commit() return True, "文档更新成功" except Exception as e: logger.exception("编辑文档失败") conn.rollback() return False, str(e) finally: cursor.close() conn.close() async def enter_document(self, doc_id: str, username: str) -> Tuple[bool, str]: """文档入库(单个)""" affected_rows, message = await self.batch_enter_knowledge_base([doc_id], username) return affected_rows > 0, message async def get_basic_info_list( self, type: str, page: int, size: int, keyword: Optional[str] = None, **filters ) -> Tuple[List[Dict[str, Any]], int]: """获取基本信息列表(关联主表获取文件和转换状态)""" conn = get_db_connection() if not conn: return [], 0 cursor = conn.cursor() try: # 根据类型选择表名和字段映射 if type == 'basis': table_name = "t_samp_standard_base_info" # 关联主表字段:file_url, conversion_status, md_url, json_url fields = """ s.id, s.chinese_name as title, s.standard_number as standard_no, s.issuing_authority, s.release_date, s.document_type, s.professional_field, s.validity, s.created_by, s.created_time, m.file_url, m.conversion_status, m.md_url, m.json_url, m.id as doc_id """ field_map = { 'title': 's.chinese_name', 'standard_no': 's.standard_number', 'issuing_authority': 's.issuing_authority', 'release_date': 's.release_date', 'document_type': 's.document_type', 'professional_field': 's.professional_field', 'validity': 's.validity' } elif type == 'work': table_name = "t_samp_construction_plan_base_info" fields = """ s.id, s.plan_name as title, NULL as standard_no, s.compiling_unit as issuing_authority, s.compiling_date as release_date, NULL as document_type, NULL as professional_field, NULL as validity, s.created_by, s.created_time, m.file_url, m.conversion_status, m.md_url, m.json_url, m.id as doc_id """ field_map = { 'title': 's.plan_name', 'issuing_authority': 's.compiling_unit', 'release_date': 's.compiling_date' } elif type == 'job': table_name = "t_samp_office_regulations" fields = """ s.id, s.file_name as title, NULL as standard_no, s.issuing_department as issuing_authority, s.publish_date as release_date, s.document_type, NULL as professional_field, NULL as validity, s.created_by, s.created_time, m.file_url, m.conversion_status, m.md_url, m.json_url, m.id as doc_id """ field_map = { 'title': 's.file_name', 'issuing_authority': 's.issuing_department', 'release_date': 's.publish_date', 'document_type': 's.document_type' } else: return [], 0 where_clauses = [] params = [] # 统一关键字搜索 if keyword: if type == 'basis': where_clauses.append("(s.chinese_name LIKE %s OR s.standard_number LIKE %s)") params.extend([f"%{keyword}%", f"%{keyword}%"]) elif type == 'work': where_clauses.append("s.plan_name LIKE %s") params.append(f"%{keyword}%") elif type == 'job': where_clauses.append("s.file_name LIKE %s") params.append(f"%{keyword}%") # 精细化检索 for filter_key, filter_value in filters.items(): if not filter_value: continue db_field = field_map.get(filter_key) if db_field: where_clauses.append(f"{db_field} = %s") params.append(filter_value) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" offset = (page - 1) * size # 使用 LEFT JOIN 关联主表 sql = f""" SELECT {fields} FROM {table_name} s LEFT JOIN t_samp_document_main m ON s.id = m.source_id AND m.source_type = %s {where_sql} ORDER BY s.created_time DESC LIMIT %s OFFSET %s """ params = [type] + params + [size, offset] cursor.execute(sql, tuple(params)) items = cursor.fetchall() # 总数 count_sql = f"SELECT COUNT(*) as count FROM {table_name} s {where_sql}" cursor.execute(count_sql, tuple(params[1:-2])) res = cursor.fetchone() total = res['count'] if res else 0 return items, total except Exception as e: logger.exception(f"获取 {type} 列表失败") return [], 0 finally: cursor.close() conn.close() # ==================== 文档转换 ==================== async def get_document_source_type(self, doc_id: str) -> Optional[str]: """获取文档的source_type""" conn = get_db_connection() if not conn: return None cursor = conn.cursor() try: cursor.execute("SELECT source_type FROM t_samp_document_main WHERE id = %s", (doc_id,)) res = cursor.fetchone() return res['source_type'] if res else None except Exception as e: logger.exception(f"获取文档source_type失败: {e}") return None finally: cursor.close() conn.close() async def get_document_title(self, doc_id: str) -> str: """获取文档标题""" conn = get_db_connection() if not conn: return "文档" cursor = conn.cursor() try: cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (doc_id,)) res = cursor.fetchone() return res['title'] if res else "文档" except Exception as e: logger.exception(f"获取文档标题失败: {e}") return "文档" finally: cursor.close() conn.close() async def update_conversion_status(self, doc_id: str, status: int, md_url: Optional[str] = None, json_url: Optional[str] = None, error_message: Optional[str] = None) -> bool: """更新文档转换状态 Args: doc_id: 文档ID status: 转换状态 (0=未转换, 1=转换中, 2=已完成, 3=失败) md_url: Markdown文件URL json_url: JSON文件URL error_message: 错误信息 """ conn = get_db_connection() if not conn: return False cursor = conn.cursor() try: update_clauses = ["conversion_status = %s"] params = [status] if error_message: update_clauses.append("conversion_error = %s") params.append(error_message) if md_url: update_clauses.append("md_url = %s") params.append(md_url) if json_url: update_clauses.append("json_url = %s") params.append(json_url) sql = f"UPDATE t_samp_document_main SET {', '.join(update_clauses)}, updated_time = NOW() WHERE id = %s" params.append(doc_id) cursor.execute(sql, tuple(params)) conn.commit() return True except Exception as e: logger.exception(f"更新转换进度失败: {e}") conn.rollback() return False finally: cursor.close() conn.close() # ==================== 基础信息管理 ==================== async def add_basic_info(self, type: str, data: Dict[str, Any], user_id: str) -> Tuple[bool, str]: """新增基本信息""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: table_name = TABLE_MAP.get(type) if not table_name: return False, "无效的类型" source_id = str(uuid.uuid4()) doc_id = str(uuid.uuid4()) file_url = data.get('file_url') file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None # 1. 插入主表 (解耦触发器,手动同步) cursor.execute( """ INSERT INTO t_samp_document_main ( id, title, source_type, source_id, file_url, file_extension, created_by, updated_by, created_time, updated_time, conversion_status ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0) """, ( doc_id, data.get('title'), type, source_id, file_url, file_extension, user_id, user_id ) ) # 2. 插入子表 (移除 file_url,因为它现在只存储在主表中) if type == 'basis': sql = f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())" params = (source_id, data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('document_type'), data.get('professional_field'), data.get('validity', '现行'), user_id) elif type == 'work': sql = f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())" params = (source_id, data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')), user_id) elif type == 'job': sql = f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())" params = (source_id, data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')), user_id) else: return False, "不支持的类型" cursor.execute(sql, params) conn.commit() return True, "新增成功" except Exception as e: logger.exception("新增基本信息失败") conn.rollback() return False, str(e) finally: cursor.close() conn.close() async def edit_basic_info(self, type: str, info_id: str, data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]: """编辑基本信息""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: table_name = TABLE_MAP.get(type) if not table_name: return False, "无效的类型" file_url = data.get('file_url') file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None # 1. 更新主表 (解耦触发器) cursor.execute( """ UPDATE t_samp_document_main SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW() WHERE source_id = %s AND source_type = %s """, (data.get('title'), file_url, file_extension, updater_id, info_id, type) ) # 2. 更新子表 (移除 file_url) if type == 'basis': sql = f""" UPDATE {table_name} SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s, document_type = %s, professional_field = %s, validity = %s, english_name = %s, implementation_date = %s, drafting_unit = %s, approving_department = %s, participating_units = %s, engineering_phase = %s, reference_basis = %s, source_url = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """ params = ( data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('document_type'), data.get('professional_field'), data.get('validity'), data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'), data.get('approving_department'), data.get('participating_units'), data.get('engineering_phase'), data.get('reference_basis'), data.get('source_url'), updater_id, info_id ) elif type == 'work': # 构造 compilation_basis 更新部分 basis_updates = ", ".join([f"compilation_basis_{i} = %s" for i in range(1, 10)]) sql = f""" UPDATE {table_name} SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s, plan_summary = %s, {basis_updates}, updated_by = %s, updated_time = NOW() WHERE id = %s """ # 准备 compilation_basis 参数 basis_params = [data.get(f'compilation_basis_{i}') for i in range(1, 10)] params = [ data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('plan_summary') ] + basis_params + [updater_id, info_id] # 转换为 tuple params = tuple(params) elif type == 'job': sql = f""" UPDATE {table_name} SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s, effective_start_date = %s, effective_end_date = %s, updated_by = %s, updated_time = NOW() WHERE id = %s """ params = ( data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')), self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')), updater_id, info_id ) else: return False, "不支持的类型" cursor.execute(sql, params) conn.commit() return True, "编辑成功" except Exception as e: logger.exception("编辑基本信息失败") conn.rollback() return False, str(e) finally: cursor.close() conn.close() async def delete_basic_info(self, type: str, info_id: str) -> Tuple[bool, str]: """删除基本信息""" conn = get_db_connection() if not conn: return False, "数据库连接失败" cursor = conn.cursor() try: table_name = TABLE_MAP.get(type) if not table_name: return False, "无效的类型" # 1. 删除子表记录 (触发器会自动删除主表记录) cursor.execute(f"DELETE FROM {table_name} WHERE id = %s", (info_id,)) conn.commit() return True, "删除成功" except Exception as e: logger.exception("删除基本信息失败") conn.rollback() return False, str(e) finally: cursor.close() conn.close()