| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982 |
- """
- 样本中心服务层
- 从 sample_view.py 提取的SQL查询逻辑
- """
- import logging
- import uuid
- from datetime import datetime
- from typing import Optional, List, Dict, Any, Tuple
- from app.base.async_mysql_connection import get_db_connection
- from app.base.minio_connection import get_minio_manager
- from app.core.config import config_handler
- from app.services.milvus_service import milvus_service
- from app.services.task_service import task_service
- logger = logging.getLogger(__name__)
- # 表名映射
- TABLE_MAP = {
- 'basis': 't_samp_standard_base_info',
- 'work': 't_samp_construction_plan_base_info',
- 'job': 't_samp_office_regulations'
- }
- def get_table_name(source_type: str) -> Optional[str]:
- """根据source_type获取表名"""
- return TABLE_MAP.get(source_type)
- class SampleService:
- """样本中心服务类 - 使用 SQL 查询方式"""
-
- def __init__(self):
- """初始化服务"""
- # 使用统一的 MinIO 管理器
- self.minio_manager = get_minio_manager()
-
- # 使用全局 Milvus 服务
- self.milvus_service = milvus_service
-
- # 确保集合已创建
- try:
- self.milvus_service.ensure_collections()
- except Exception as e:
- logger.error(f"初始化 Milvus 集合失败: {e}")
- async def get_upload_url(self, filename: str, content_type: str) -> Tuple[bool, str, Dict[str, Any]]:
- """获取 MinIO 预签名上传 URL"""
- try:
- data = self.minio_manager.get_upload_url(filename, content_type)
- return True, "成功获取上传链接", data
- except Exception as e:
- logger.exception("生成上传链接失败")
- return False, f"生成上传链接失败: {str(e)}", {}
- # ==================== 文档管理 ====================
-
- async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str) -> Tuple[int, str]:
- """批量将文档入库到知识库
-
- Args:
- doc_ids: 文档ID列表
- username: 操作人
- """
- conn = get_db_connection()
- if not conn:
- return 0, "数据库连接失败,请检查数据库服务状态"
-
- cursor = conn.cursor()
- success_count = 0
- skipped_count = 0
- failed_count = 0
- error_details = []
-
- try:
- # 1. 获取所有选中选中的文档详情
- placeholders = ','.join(['%s']*len(doc_ids))
- fetch_sql = f"""
- SELECT id, title, source_type, md_url, conversion_status, created_time
- FROM t_samp_document_main
- WHERE id IN ({placeholders})
- """
- cursor.execute(fetch_sql, tuple(doc_ids))
- selected_docs = cursor.fetchall()
-
- if not selected_docs:
- return 0, "选中的文档在数据库中不存在"
- # 2. 逐份处理
- for doc in selected_docs:
- doc_id = doc['id']
- title = doc.get('title', '未命名文档')
- status = doc.get('conversion_status')
- md_url = doc.get('md_url')
-
- # A. 检查转换状态
- if status != 2:
- reason = "尚未转换成功" if status == 0 else "正在转换中" if status == 1 else "转换失败"
- logger.warning(f"文档 {title}({doc_id}) 状态为 {status},跳过入库: {reason}")
- skipped_count += 1
- error_details.append(f"· {title}: {reason}")
- continue
-
- if not md_url:
- logger.warning(f"文档 {title}({doc_id}) 缺少 md_url,跳过入库")
- skipped_count += 1
- error_details.append(f"· {title}: 转换结果地址丢失")
- continue
-
- # B. 从 MinIO 获取 Markdown 内容
- try:
- md_content = self.minio_manager.get_object_content(md_url)
- if not md_content:
- raise ValueError(f"无法从 MinIO 读取内容 (URL: {md_url})")
- except Exception as minio_err:
- logger.error(f"读取文档 {title} 内容失败: {minio_err}")
- failed_count += 1
- error_details.append(f"· {title}: 读取云端文件失败")
- continue
-
- # C. 调用 MilvusService 进行切分和入库
- try:
- # 准备元数据
- doc_info = {
- "doc_id": doc_id,
- "doc_name": title,
- "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else 20260127,
- "tags": doc.get('source_type') or 'unknown'
- }
- await self.milvus_service.insert_knowledge(md_content, doc_info)
-
- # D. 添加到任务管理中心 (类型为 data)
- try:
- await task_service.add_task(doc_id, 'data')
- except Exception as task_err:
- logger.error(f"添加文档 {title} 到任务中心失败: {task_err}")
- # E. 更新数据库状态
- update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 1, updated_by = %s, updated_time = NOW() WHERE id = %s"
- cursor.execute(update_sql, (username, doc_id))
- success_count += 1
-
- except Exception as milvus_err:
- logger.exception(f"文档 {title} 写入向量库失败")
- failed_count += 1
- error_details.append(f"· {title}: 写入向量库失败 ({str(milvus_err)})")
- continue
-
- conn.commit()
-
- # 构造详细的消息
- msg = f"入库完成:成功 {success_count} 份"
- if skipped_count > 0:
- msg += f",跳过 {skipped_count} 份"
- if failed_count > 0:
- msg += f",失败 {failed_count} 份"
-
- if error_details:
- detailed_msg = msg + "\n\n详情:\n" + "\n".join(error_details)
- return success_count, detailed_msg
-
- return success_count, msg
-
- except Exception as e:
- logger.exception(f"文档批量入库异常: {e}")
- conn.rollback()
- return 0, f"操作异常: {str(e)}"
- finally:
- cursor.close()
- conn.close()
-
- async def batch_delete_documents(self, doc_ids: List[str]) -> Tuple[int, str]:
- """批量删除文档"""
- conn = get_db_connection()
- if not conn:
- return 0, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- if not doc_ids:
- return 0, "未指定要删除的文档 ID"
-
- placeholders = ', '.join(['%s'] * len(doc_ids))
-
- # 尝试同步删除子表中的数据
- try:
- for sub_table in TABLE_MAP.values():
- sub_sql = f"DELETE FROM {sub_table} WHERE id IN ({placeholders})"
- try:
- cursor.execute(sub_sql, doc_ids)
- except Exception as sub_e:
- logger.error(f"删除子表 {sub_table} 数据失败: {sub_e}")
- except Exception as sync_e:
- logger.error(f"同步删除子表数据失败: {sync_e}")
-
- # 删除主表 t_samp_document_main 中的数据
- sql_main = f"DELETE FROM t_samp_document_main WHERE id IN ({placeholders})"
- cursor.execute(sql_main, doc_ids)
- affected_rows = cursor.rowcount
-
- # 同步删除任务管理中心的数据
- try:
- for doc_id in doc_ids:
- await task_service.delete_task(doc_id)
- except Exception as task_err:
- logger.error(f"同步删除任务中心数据失败: {task_err}")
- conn.commit()
-
- return affected_rows, f"成功删除 {affected_rows} 条文档数据"
- except Exception as e:
- logger.exception("批量删除失败")
- conn.rollback()
- return 0, f"批量删除失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_list(
- self,
- whether_to_enter: Optional[int] = None,
- keyword: Optional[str] = None,
- table_type: Optional[str] = None,
- plan_category: Optional[str] = None,
- level_2_classification: Optional[str] = None,
- level_3_classification: Optional[str] = None,
- level_4_classification: Optional[str] = None,
- page: int = 1,
- size: int = 50
- ) -> Tuple[List[Dict[str, Any]], int, int, int]:
- """获取文档列表 (支持关联查询子表)"""
- conn = get_db_connection()
- if not conn:
- return [], 0, 0, 0
-
- cursor = conn.cursor()
-
- try:
- where_clauses = []
- params = []
-
- # 基础查询
- if table_type and table_type in TABLE_MAP:
- # 如果指定了类型,使用 LEFT JOIN 关联查询,以便搜索子表字段
- sub_table = TABLE_MAP[table_type]
- from_sql = f"t_samp_document_main m LEFT JOIN {sub_table} s ON m.id = s.id"
- fields_sql = "m.*, s.*" # 获取所有字段,包括子表字段
- where_clauses.append("m.source_type = %s")
- params.append(table_type)
- order_sql = "m.created_time DESC"
- title_field = "m.title"
- # 施工方案特有的过滤字段
- if table_type == 'work':
- if plan_category:
- where_clauses.append("s.plan_category = %s")
- params.append(plan_category)
- if level_2_classification:
- where_clauses.append("s.level_2_classification = %s")
- params.append(level_2_classification)
- if level_3_classification:
- where_clauses.append("s.level_3_classification = %s")
- params.append(level_3_classification)
- if level_4_classification:
- where_clauses.append("s.level_4_classification = %s")
- params.append(level_4_classification)
-
- # 特殊处理 id 冲突,确保返回的是主表 m.id
- fields_sql = "m.*, s.*, m.id as id"
- else:
- from_sql = "t_samp_document_main"
- fields_sql = "*"
- order_sql = "created_time DESC"
- title_field = "title"
-
- if whether_to_enter is not None:
- # 按照 search_replace_blocks 的逻辑,这里使用 conversion_status 过滤
- where_clauses.append("conversion_status = %s")
- params.append(whether_to_enter)
-
- if keyword:
- where_clauses.append(f"{title_field} LIKE %s")
- params.append(f"%{keyword}%")
-
- where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
- offset = (page - 1) * size
-
- sql = f"SELECT {fields_sql} FROM {from_sql} {where_sql} ORDER BY {order_sql} LIMIT %s OFFSET %s"
- params.extend([size, offset])
-
- logger.info(f"Executing SQL: {sql} with params: {params}")
- cursor.execute(sql, tuple(params))
- items = []
- for row in cursor.fetchall():
- item = row
- # 处理 URL 转换
- for key in ['file_url', 'md_url', 'json_url']:
- if item.get(key):
- item[key] = self.minio_manager.get_full_url(item[key])
-
- # 映射字段以适配前端通用显示
- source_type = item.get('source_type')
- if source_type == 'work':
- item['issuing_authority'] = item.get('compiling_unit')
- item['release_date'] = item.get('compiling_date')
- elif source_type == 'job':
- item['issuing_authority'] = item.get('issuing_department')
- item['release_date'] = item.get('publish_date')
-
- # 格式化时间
- for key in ['created_time', 'updated_time', 'release_date', 'publish_date', 'compiling_date']:
- if item.get(key) and hasattr(item[key], 'isoformat'):
- item[key] = item[key].isoformat()
- elif item.get(key) is not None:
- item[key] = str(item[key])
-
- # 增加格式化文件名供前端显示
- if item.get('conversion_status') == 2:
- title = item.get('title', 'document')
- item['md_display_name'] = f"{title}.md"
- item['json_display_name'] = f"{title}.json"
-
- items.append(item)
-
- # 总数
- count_sql = f"SELECT COUNT(*) as count FROM {from_sql} {where_sql}"
- cursor.execute(count_sql, tuple(params[:-2]))
- res = cursor.fetchone()
- total = res['count'] if res else 0
-
- # 统计数据
- cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main")
- res = cursor.fetchone()
- all_total = res['count'] if res else 0
-
- cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main WHERE conversion_status = 2")
- res = cursor.fetchone()
- total_entered = res['count'] if res else 0
-
- return items, total, all_total, total_entered
- except Exception as e:
- logger.exception("获取文档列表失败")
- return [], 0, 0, 0
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_detail(self, doc_id: str) -> Optional[Dict[str, Any]]:
- """获取文档详情 (关联查询子表)"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- # 1. 查询主表
- cursor.execute("SELECT * FROM t_samp_document_main WHERE id = %s", (doc_id,))
- doc = cursor.fetchone()
- if not doc:
- return None
-
- # 2. 根据 source_type 查询对应的子表信息
- source_type = doc.get('source_type')
- table_name = TABLE_MAP.get(source_type)
-
- if table_name:
- # 关联子表的所有字段
- sub_sql = f"SELECT * FROM {table_name} WHERE id = %s"
- cursor.execute(sub_sql, (doc_id,))
- sub_data = cursor.fetchone()
-
- if sub_data:
- # 将子表字段合并到 doc 中,方便前端使用
- # 注意:如果字段名冲突,子表字段会覆盖主表字段(除了 id)
- sub_data.pop('id', None)
-
- # 特殊处理一些前端需要的映射字段
- if source_type == 'basis':
- doc['standard_no'] = sub_data.get('standard_number')
- elif source_type == 'work':
- doc['issuing_authority'] = sub_data.get('compiling_unit')
- doc['release_date'] = sub_data.get('compiling_date')
- elif source_type == 'job':
- doc['issuing_authority'] = sub_data.get('issuing_department')
- doc['release_date'] = sub_data.get('publish_date')
- doc.update(sub_data)
-
- # 格式化时间
- for key in ['created_time', 'updated_time', 'release_date', 'publish_date', 'compiling_date', 'implementation_date']:
- val = doc.get(key)
- if val and hasattr(val, 'isoformat'):
- doc[key] = val.isoformat()
- elif val is not None:
- doc[key] = str(val)
-
- # 处理 URL 转换
- for key in ['file_url', 'md_url', 'json_url']:
- if doc.get(key):
- doc[key] = self.minio_manager.get_full_url(doc[key])
- # 增加格式化文件名供前端显示
- if doc.get('conversion_status') == 2:
- title = doc.get('title', 'document')
- doc['md_display_name'] = f"{title}.md"
- doc['json_display_name'] = f"{title}.json"
-
- return doc
- except Exception as e:
- logger.exception("获取文档详情失败")
- return None
- finally:
- cursor.close()
- conn.close()
-
- def _to_int(self, value: Any) -> Optional[int]:
- """安全转换为整数"""
- if value is None or value == '' or str(value).lower() == 'null':
- return None
- try:
- return int(value)
- except (ValueError, TypeError):
- return None
- def _to_date(self, value: Any) -> Optional[str]:
- """安全处理日期字符串"""
- if value is None or value == '' or str(value).lower() == 'null':
- return None
- # 如果已经是 datetime.date 或 datetime.datetime 对象
- if hasattr(value, 'strftime'):
- return value.strftime('%Y-%m-%d')
- return str(value)
- async def add_document(self, doc_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]:
- """添加新文档(先主表后子表,解耦触发器)"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
-
- try:
- doc_id = str(uuid.uuid4())
- table_type = doc_data.get('table_type', 'basis')
- table_name = TABLE_MAP.get(table_type)
-
- # 安全转换字段
- release_date = self._to_date(doc_data.get('release_date'))
-
- # 处理 URL 存储(转为相对路径)
- file_url = self.minio_manager.get_relative_path(doc_data.get('file_url'))
-
- # 1. 插入主表 (作为资产中心)
- cursor.execute(
- """
- INSERT INTO t_samp_document_main (
- id, title, source_type, file_url,
- file_extension, created_by, updated_by, created_time, updated_time,
- conversion_status
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0)
- """,
- (
- doc_id, doc_data.get('title'), table_type, file_url,
- doc_data.get('file_extension'), user_id, user_id
- )
- )
- # 2. 插入子表 (仅存储业务字段)
- if table_type == 'basis':
- cursor.execute(
- f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
- (doc_id, doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), doc_data.get('note'), user_id)
- )
- elif table_type == 'work':
- cursor.execute(
- f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, plan_summary, compilation_basis, plan_category, level_1_classification, level_2_classification, level_3_classification, level_4_classification, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
- (doc_id, doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), doc_data.get('plan_category'), doc_data.get('level_1_classification'), doc_data.get('level_2_classification'), doc_data.get('level_3_classification'), doc_data.get('level_4_classification'), doc_data.get('note'), user_id)
- )
- elif table_type == 'job':
- cursor.execute(
- f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
- (doc_id, doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, doc_data.get('note'), user_id)
- )
-
- # 3. 添加到任务管理中心 (类型为 data)
- try:
- await task_service.add_task(doc_id, 'data')
- except Exception as task_err:
- logger.error(f"添加文档 {doc_data.get('title')} 到任务中心失败: {task_err}")
- conn.commit()
- return True, "文档添加成功", doc_id
- except Exception as e:
- logger.exception("添加文档失败")
- conn.rollback()
- return False, str(e), None
- finally:
- cursor.close()
- conn.close()
- async def edit_document(self, doc_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
- """编辑文档(同步主表和子表,解耦触发器)"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- doc_id = doc_data.get('id')
- table_type = doc_data.get('table_type', 'basis')
- table_name = TABLE_MAP.get(table_type)
-
- # 安全转换字段
- release_date = self._to_date(doc_data.get('release_date'))
-
- # 处理 URL 存储(转为相对路径)
- file_url = self.minio_manager.get_relative_path(doc_data.get('file_url'))
-
- # 1. 更新主表
- cursor.execute(
- """
- UPDATE t_samp_document_main
- SET title = %s, file_url = %s, file_extension = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """,
- (
- doc_data.get('title'), file_url, doc_data.get('file_extension'),
- updater_id, doc_id
- )
- )
- # 2. 更新子表
- if table_type == 'basis':
- cursor.execute(
- f"UPDATE {table_name} SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s, document_type = %s, professional_field = %s, validity = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
- (doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), doc_data.get('note'), updater_id, doc_id)
- )
- elif table_type == 'work':
- cursor.execute(
- f"UPDATE {table_name} SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s, plan_summary = %s, compilation_basis = %s, plan_category = %s, level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
- (doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, doc_data.get('plan_summary'), doc_data.get('compilation_basis'), doc_data.get('plan_category'), doc_data.get('level_1_classification'), doc_data.get('level_2_classification'), doc_data.get('level_3_classification'), doc_data.get('level_4_classification'), doc_data.get('note'), updater_id, doc_id)
- )
- elif table_type == 'job':
- cursor.execute(
- f"UPDATE {table_name} SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s, note = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
- (doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, doc_data.get('note'), updater_id, doc_id)
- )
- conn.commit()
- return True, "文档更新成功"
- except Exception as e:
- logger.exception("编辑文档失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
- async def enter_document(self, doc_id: str, username: str) -> Tuple[bool, str]:
- """文档入库(单个)"""
- affected_rows, message = await self.batch_enter_knowledge_base([doc_id], username)
- return affected_rows > 0, message
-
- async def get_basic_info_list(
- self,
- type: str,
- page: int,
- size: int,
- keyword: Optional[str] = None,
- **filters
- ) -> Tuple[List[Dict[str, Any]], int]:
- """获取基本信息列表(关联主表获取文件和转换状态)"""
- conn = get_db_connection()
- if not conn:
- return [], 0
-
- cursor = conn.cursor()
-
- try:
- # 根据类型选择表名和字段映射
- if type == 'basis':
- table_name = "t_samp_standard_base_info"
- # 关联主表字段:file_url, conversion_status, md_url, json_url
- fields = """
- s.id, s.chinese_name as title, s.standard_number as standard_no,
- s.issuing_authority, s.release_date, s.document_type,
- s.professional_field, s.validity, s.note, s.created_by, s.created_time,
- s.updated_by, s.updated_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url
- """
- field_map = {
- 'title': 's.chinese_name',
- 'standard_no': 's.standard_number',
- 'issuing_authority': 's.issuing_authority',
- 'release_date': 's.release_date',
- 'document_type': 's.document_type',
- 'professional_field': 's.professional_field',
- 'validity': 's.validity'
- }
- elif type == 'work':
- table_name = "t_samp_construction_plan_base_info"
- fields = """
- s.id, s.plan_name as title, NULL as standard_no,
- s.project_name, s.project_section,
- s.compiling_unit as issuing_authority, s.compiling_date as release_date,
- NULL as document_type, NULL as professional_field, NULL as validity,
- s.plan_summary, s.compilation_basis,
- s.plan_category, s.level_1_classification, s.level_2_classification,
- s.level_3_classification, s.level_4_classification,
- s.note, s.created_by, s.created_time, s.updated_by, s.updated_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url
- """
- field_map = {
- 'title': 's.plan_name',
- 'issuing_authority': 's.compiling_unit',
- 'release_date': 's.compiling_date',
- 'plan_category': 's.plan_category',
- 'level_1_classification': 's.level_1_classification',
- 'level_2_classification': 's.level_2_classification',
- 'level_3_classification': 's.level_3_classification',
- 'level_4_classification': 's.level_4_classification'
- }
- elif type == 'job':
- table_name = "t_samp_office_regulations"
- fields = """
- s.id, s.file_name as title, NULL as standard_no,
- s.issuing_department as issuing_authority, s.publish_date as release_date,
- s.document_type, NULL as professional_field, NULL as validity,
- s.note, s.created_by, s.created_time, s.updated_by, s.updated_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url
- """
- field_map = {
- 'title': 's.file_name',
- 'issuing_authority': 's.issuing_department',
- 'release_date': 's.publish_date',
- 'document_type': 's.document_type'
- }
- else:
- return [], 0
-
- where_clauses = []
- params = []
-
- # 统一关键字搜索
- if keyword:
- if type == 'basis':
- where_clauses.append("(s.chinese_name LIKE %s OR s.standard_number LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
- elif type == 'work':
- where_clauses.append("s.plan_name LIKE %s")
- params.append(f"%{keyword}%")
- elif type == 'job':
- where_clauses.append("s.file_name LIKE %s")
- params.append(f"%{keyword}%")
-
- # 精细化检索
- for filter_key, filter_value in filters.items():
- if not filter_value:
- continue
-
- db_field = field_map.get(filter_key)
- if db_field:
- where_clauses.append(f"{db_field} = %s")
- params.append(filter_value)
-
- where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
- offset = (page - 1) * size
-
- # 使用 LEFT JOIN 关联主表
- sql = f"""
- SELECT {fields}
- FROM {table_name} s
- LEFT JOIN t_samp_document_main m ON s.id = m.id
- {where_sql}
- ORDER BY s.created_time DESC
- LIMIT %s OFFSET %s
- """
- params = params + [size, offset]
-
- cursor.execute(sql, tuple(params))
- items = cursor.fetchall()
-
- # 处理 URL 转换
- for item in items:
- for key in ['file_url', 'md_url', 'json_url']:
- if item.get(key):
- item[key] = self.minio_manager.get_full_url(item[key])
-
- # 总数
- count_sql = f"SELECT COUNT(*) as count FROM {table_name} s {where_sql}"
- cursor.execute(count_sql, tuple(params[:-2]))
- res = cursor.fetchone()
- total = res['count'] if res else 0
-
- return items, total
- except Exception as e:
- logger.exception(f"获取 {type} 列表失败")
- return [], 0
- finally:
- cursor.close()
- conn.close()
- # ==================== 文档转换 ====================
-
- async def get_document_source_type(self, doc_id: str) -> Optional[str]:
- """获取文档的source_type"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("SELECT source_type FROM t_samp_document_main WHERE id = %s", (doc_id,))
- res = cursor.fetchone()
- return res['source_type'] if res else None
- except Exception as e:
- logger.exception(f"获取文档source_type失败: {e}")
- return None
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_title(self, doc_id: str) -> str:
- """获取文档标题"""
- conn = get_db_connection()
- if not conn:
- return "文档"
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (doc_id,))
- res = cursor.fetchone()
- return res['title'] if res else "文档"
- except Exception as e:
- logger.exception(f"获取文档标题失败: {e}")
- return "文档"
- finally:
- cursor.close()
- conn.close()
-
- async def update_conversion_status(self, doc_id: str, status: int,
- md_url: Optional[str] = None,
- json_url: Optional[str] = None,
- error_message: Optional[str] = None) -> bool:
- """更新文档转换状态
-
- Args:
- doc_id: 文档ID
- status: 转换状态 (0=未转换, 1=转换中, 2=已完成, 3=失败)
- md_url: Markdown文件URL
- json_url: JSON文件URL
- error_message: 错误信息
- """
- conn = get_db_connection()
- if not conn:
- return False
-
- cursor = conn.cursor()
-
- try:
- update_clauses = ["conversion_status = %s"]
- params = [status]
- if error_message:
- update_clauses.append("conversion_error = %s")
- params.append(error_message)
-
- if md_url:
- update_clauses.append("md_url = %s")
- params.append(md_url)
-
- if json_url:
- update_clauses.append("json_url = %s")
- params.append(json_url)
-
- sql = f"UPDATE t_samp_document_main SET {', '.join(update_clauses)}, updated_time = NOW() WHERE id = %s"
- params.append(doc_id)
-
- cursor.execute(sql, tuple(params))
- conn.commit()
- return True
- except Exception as e:
- logger.exception(f"更新转换进度失败: {e}")
- conn.rollback()
- return False
- finally:
- cursor.close()
- conn.close()
- # ==================== 基础信息管理 ====================
- async def add_basic_info(self, type: str, data: Dict[str, Any], user_id: str) -> Tuple[bool, str]:
- """新增基本信息"""
- logger.info(f"Adding basic info for type {type}: {data}")
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型"
-
- doc_id = str(uuid.uuid4())
- file_url = data.get('file_url')
- file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
-
- # 1. 插入主表 (解耦触发器,手动同步)
- cursor.execute(
- """
- INSERT INTO t_samp_document_main (
- id, title, source_type, file_url,
- file_extension, created_by, updated_by, created_time, updated_time,
- conversion_status
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0)
- """,
- (
- doc_id, data.get('title'), type, file_url,
- file_extension, user_id, user_id
- )
- )
-
- # 2. 插入子表 (移除 file_url,因为它现在只存储在主表中)
- if type == 'basis':
- sql = f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())"
- params = (doc_id, data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('document_type'), data.get('professional_field'), data.get('validity', '现行'), data.get('note'), user_id)
- elif type == 'work':
- sql = f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, plan_summary, compilation_basis, plan_category, level_1_classification, level_2_classification, level_3_classification, level_4_classification, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())"
- params = (doc_id, data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('plan_summary'), data.get('compilation_basis'), data.get('plan_category'), data.get('level_1_classification'), data.get('level_2_classification'), data.get('level_3_classification'), data.get('level_4_classification'), data.get('note'), user_id)
- elif type == 'job':
- sql = f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, note, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())"
- params = (doc_id, data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')), data.get('note'), user_id)
- else:
- return False, "不支持的类型"
-
- cursor.execute(sql, params)
-
- # 3. 添加到任务管理中心 (类型为 data)
- try:
- await task_service.add_task(doc_id, 'data')
- except Exception as task_err:
- logger.error(f"添加基本信息 {data.get('title')} 到任务中心失败: {task_err}")
- conn.commit()
- return True, "新增成功", doc_id
- except Exception as e:
- logger.exception("新增基本信息失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
- async def edit_basic_info(self, type: str, info_id: str, data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
- """编辑基本信息"""
- logger.info(f"Editing basic info for type {type}, id {info_id}: {data}")
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型"
-
- # 处理 URL 存储(转为相对路径)
- file_url = self.minio_manager.get_relative_path(data.get('file_url'))
- file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
- # 1. 更新主表 (解耦触发器)
- cursor.execute(
- """
- UPDATE t_samp_document_main
- SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """,
- (data.get('title'), file_url, file_extension, updater_id, info_id)
- )
- # 2. 更新子表 (移除 file_url)
- if type == 'basis':
- sql = f"""
- UPDATE {table_name}
- SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s,
- document_type = %s, professional_field = %s, validity = %s,
- english_name = %s, implementation_date = %s, drafting_unit = %s,
- approving_department = %s, participating_units = %s, engineering_phase = %s,
- reference_basis = %s, source_url = %s, note = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
- params = (
- data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
- data.get('document_type'), data.get('professional_field'), data.get('validity'),
- data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'),
- data.get('approving_department'), data.get('participating_units'), data.get('engineering_phase'),
- data.get('reference_basis'), data.get('source_url'), data.get('note'),
- updater_id, info_id
- )
- elif type == 'work':
- sql = f"""
- UPDATE {table_name}
- SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s,
- plan_summary = %s, compilation_basis = %s, plan_category = %s,
- level_1_classification = %s, level_2_classification = %s, level_3_classification = %s, level_4_classification = %s,
- note = %s, updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
-
- params = (
- data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
- data.get('plan_summary'), data.get('compilation_basis'), data.get('plan_category'),
- data.get('level_1_classification'), data.get('level_2_classification'), data.get('level_3_classification'), data.get('level_4_classification'),
- data.get('note'), updater_id, info_id
- )
-
- elif type == 'job':
- sql = f"""
- UPDATE {table_name}
- SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s,
- effective_start_date = %s, effective_end_date = %s, note = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
- params = (
- data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')),
- self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')), data.get('note'),
- updater_id, info_id
- )
- else:
- return False, "不支持的类型"
-
- cursor.execute(sql, params)
-
- conn.commit()
- return True, "编辑成功"
- except Exception as e:
- logger.exception("编辑基本信息失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
- async def delete_basic_info(self, type: str, info_id: str) -> Tuple[bool, str]:
- """删除基本信息"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型"
-
- # 1. 删除主表记录 (由于设置了 ON DELETE CASCADE,子表记录会自动删除)
- cursor.execute("DELETE FROM t_samp_document_main WHERE id = %s", (info_id,))
-
- # 同步删除任务管理中心的数据
- try:
- await task_service.delete_task(info_id)
- except Exception as task_err:
- logger.error(f"同步删除任务中心数据失败 (ID: {info_id}): {task_err}")
- conn.commit()
- return True, "删除成功"
- except Exception as e:
- logger.exception("删除基本信息失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
|