| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774 |
- """
- 样本中心服务层
- 从 sample_view.py 提取的SQL查询逻辑
- """
- import logging
- import uuid
- from typing import Optional, List, Dict, Any, Tuple
- from app.base.async_mysql_connection import get_db_connection
- from app.base.minio_connection import get_minio_manager
- from app.core.config import config_handler
- logger = logging.getLogger(__name__)
- # 表名映射
- TABLE_MAP = {
- 'basis': 't_samp_standard_base_info',
- 'work': 't_samp_construction_plan_base_info',
- 'job': 't_samp_office_regulations'
- }
- def get_table_name(source_type: str) -> Optional[str]:
- """根据source_type获取表名"""
- return TABLE_MAP.get(source_type)
- class SampleService:
- """样本中心服务类 - 使用 SQL 查询方式"""
-
- def __init__(self):
- """初始化服务"""
- # 使用统一的 MinIO 管理器
- self.minio_manager = get_minio_manager()
- async def get_upload_url(self, filename: str, content_type: str) -> Tuple[bool, str, Dict[str, Any]]:
- """获取 MinIO 预签名上传 URL"""
- try:
- data = self.minio_manager.get_upload_url(filename, content_type)
- return True, "成功获取上传链接", data
- except Exception as e:
- logger.exception("生成上传链接失败")
- return False, f"生成上传链接失败: {str(e)}", {}
- # ==================== 文档管理 ====================
-
- async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str) -> Tuple[int, str]:
- """批量将文档入库到知识库
-
- Args:
- doc_ids: 文档ID列表
- username: 操作人
- """
- conn = get_db_connection()
- if not conn:
- return 0, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- # 1. 严格检查转换状态:只有 conversion_status = 2 (转换成功) 且 whether_to_enter = 0 (未入库) 的才能入库
- check_sql = f"SELECT id, title FROM t_samp_document_main WHERE id IN ({','.join(['%s']*len(doc_ids))}) AND conversion_status = 2 AND whether_to_enter = 0"
- cursor.execute(check_sql, tuple(doc_ids))
- valid_docs = cursor.fetchall()
- valid_ids = [doc['id'] for doc in valid_docs]
-
- if not valid_ids:
- return 0, "选中的文档中没有满足入库条件(已转换成功且未入库)的记录"
- # 2. 更新状态为已入库
- update_sql = f"UPDATE t_samp_document_main SET whether_to_enter = 1, updated_by = %s, updated_time = NOW() WHERE id IN ({','.join(['%s']*len(valid_ids))})"
- cursor.execute(update_sql, (username, *valid_ids))
-
- affected_rows = cursor.rowcount
- conn.commit()
-
- return affected_rows, f"成功入库 {affected_rows} 份文档"
- except Exception as e:
- logger.exception(f"文档批量入库失败: {e}")
- conn.rollback()
- return 0, f"入库失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
-
- async def batch_delete_documents(self, doc_ids: List[str]) -> Tuple[int, str]:
- """批量删除文档"""
- conn = get_db_connection()
- if not conn:
- return 0, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- if not doc_ids:
- return 0, "未指定要删除的文档 ID"
-
- placeholders = ', '.join(['%s'] * len(doc_ids))
-
- # 尝试同步删除子表中的数据
- try:
- cursor.execute(f"SELECT source_type, source_id FROM t_samp_document_main WHERE id IN ({placeholders})", doc_ids)
- docs = cursor.fetchall()
-
- for doc_row in docs:
- s_type = doc_row['source_type']
- s_id = doc_row['source_id']
- if s_type and s_id:
- sub_table = get_table_name(s_type)
- if sub_table:
- sub_sql = f"DELETE FROM {sub_table} WHERE id = %s"
- try:
- cursor.execute(sub_sql, (s_id,))
- except Exception as sub_e:
- logger.error(f"删除子表 {sub_table} 数据失败: {sub_e}")
- except Exception as sync_e:
- logger.error(f"同步删除子表数据失败: {sync_e}")
-
- # 删除主表 t_samp_document_main 中的数据
- sql_main = f"DELETE FROM t_samp_document_main WHERE id IN ({placeholders})"
- cursor.execute(sql_main, doc_ids)
- affected_rows = cursor.rowcount
-
- conn.commit()
-
- return affected_rows, f"成功删除 {affected_rows} 条文档数据"
- except Exception as e:
- logger.exception("批量删除失败")
- conn.rollback()
- return 0, f"批量删除失败: {str(e)}"
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_list(
- self,
- whether_to_enter: Optional[int] = None,
- keyword: Optional[str] = None,
- table_type: Optional[str] = None,
- page: int = 1,
- size: int = 50
- ) -> Tuple[List[Dict[str, Any]], int, int, int]:
- """获取文档列表(从主表查询)"""
- conn = get_db_connection()
- if not conn:
- return [], 0, 0, 0
-
- cursor = conn.cursor()
-
- try:
- where_clauses = []
- params = []
-
- if table_type:
- where_clauses.append("source_type = %s")
- params.append(table_type)
- if whether_to_enter is not None:
- where_clauses.append("whether_to_enter = %s")
- params.append(whether_to_enter)
- if keyword:
- where_clauses.append("title LIKE %s")
- params.append(f"%{keyword}%")
-
- where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
- offset = (page - 1) * size
-
- sql = f"SELECT * FROM t_samp_document_main {where_sql} ORDER BY created_time DESC LIMIT %s OFFSET %s"
- params.extend([size, offset])
-
- logger.info(f"Executing SQL: {sql} with params: {params}")
- cursor.execute(sql, tuple(params))
- items = []
- for row in cursor.fetchall():
- item = row # DictCursor already returns dict
- # 格式化时间
- for key in ['created_time', 'updated_time']:
- if item.get(key) and hasattr(item[key], 'isoformat'):
- item[key] = item[key].isoformat()
-
- # 增加格式化文件名供前端显示
- if item.get('conversion_status') == 2:
- title = item.get('title', 'document')
- item['md_display_name'] = f"{title}.md"
- item['json_display_name'] = f"{title}.json"
-
- items.append(item)
-
- # 总数
- count_sql = f"SELECT COUNT(*) as count FROM t_samp_document_main {where_sql}"
- cursor.execute(count_sql, tuple(params[:-2]))
- res = cursor.fetchone()
- total = res['count'] if res else 0
-
- # 统计数据
- cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main")
- res = cursor.fetchone()
- all_total = res['count'] if res else 0
-
- cursor.execute("SELECT COUNT(*) as count FROM t_samp_document_main WHERE whether_to_enter = 1")
- res = cursor.fetchone()
- total_entered = res['count'] if res else 0
-
- return items, total, all_total, total_entered
- except Exception as e:
- logger.exception("获取文档列表失败")
- return [], 0, 0, 0
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_detail(self, doc_id: str) -> Optional[Dict[str, Any]]:
- """获取文档详情"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- # 查询主表
- cursor.execute("SELECT * FROM t_samp_document_main WHERE id = %s", (doc_id,))
- doc = cursor.fetchone()
- if not doc:
- return None
-
- # 格式化时间
- for key in ['created_time', 'updated_time']:
- if doc.get(key) and hasattr(doc[key], 'isoformat'):
- doc[key] = doc[key].isoformat()
-
- # 增加格式化文件名供前端显示
- if doc.get('conversion_status') == 2:
- title = doc.get('title', 'document')
- doc['md_display_name'] = f"{title}.md"
- doc['json_display_name'] = f"{title}.json"
-
- return doc
- except Exception as e:
- logger.exception("获取文档详情失败")
- return None
- finally:
- cursor.close()
- conn.close()
-
- def _to_int(self, value: Any) -> Optional[int]:
- """安全转换为整数"""
- if value is None or value == '' or str(value).lower() == 'null':
- return None
- try:
- return int(value)
- except (ValueError, TypeError):
- return None
- def _to_date(self, value: Any) -> Optional[str]:
- """安全处理日期字符串"""
- if value is None or value == '' or str(value).lower() == 'null':
- return None
- # 如果已经是 datetime.date 或 datetime.datetime 对象
- if hasattr(value, 'strftime'):
- return value.strftime('%Y-%m-%d')
- return str(value)
- async def add_document(self, doc_data: Dict[str, Any], user_id: str) -> Tuple[bool, str, Optional[str]]:
- """添加新文档(先主表后子表,解耦触发器)"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败", None
-
- cursor = conn.cursor()
-
- try:
- doc_id = str(uuid.uuid4())
- source_id = str(uuid.uuid4())
- table_type = doc_data.get('table_type', 'basis')
- table_name = TABLE_MAP.get(table_type)
-
- # 安全转换字段
- release_date = self._to_date(doc_data.get('release_date'))
-
- # 1. 插入主表 (作为资产中心)
- cursor.execute(
- """
- INSERT INTO t_samp_document_main (
- id, title, source_type, source_id, file_url,
- file_extension, created_by, updated_by, created_time, updated_time,
- conversion_status
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0)
- """,
- (
- doc_id, doc_data.get('title'), table_type, source_id, doc_data.get('file_url'),
- doc_data.get('file_extension'), user_id, user_id
- )
- )
- # 2. 插入子表 (仅存储业务字段)
- if table_type == 'basis':
- cursor.execute(
- f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
- (source_id, doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), user_id)
- )
- elif table_type == 'work':
- cursor.execute(
- f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())",
- (source_id, doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, user_id)
- )
- elif table_type == 'job':
- cursor.execute(
- f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())",
- (source_id, doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, user_id)
- )
-
- conn.commit()
- return True, "文档添加成功", doc_id
- except Exception as e:
- logger.exception("添加文档失败")
- conn.rollback()
- return False, str(e), None
- finally:
- cursor.close()
- conn.close()
- async def edit_document(self, doc_data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
- """编辑文档(同步主表和子表,解耦触发器)"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
-
- try:
- doc_id = doc_data.get('id')
- source_id = doc_data.get('source_id')
- table_type = doc_data.get('table_type', 'basis')
- table_name = TABLE_MAP.get(table_type)
-
- # 安全转换字段
- release_date = self._to_date(doc_data.get('release_date'))
-
- # 1. 更新主表
- cursor.execute(
- """
- UPDATE t_samp_document_main
- SET title = %s, file_url = %s, file_extension = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """,
- (
- doc_data.get('title'), doc_data.get('file_url'), doc_data.get('file_extension'),
- updater_id, doc_id
- )
- )
- # 2. 更新子表
- if table_type == 'basis':
- cursor.execute(
- f"UPDATE {table_name} SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s, document_type = %s, professional_field = %s, validity = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
- (doc_data.get('title'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), release_date, doc_data.get('document_type'), doc_data.get('professional_field'), doc_data.get('validity'), updater_id, source_id)
- )
- elif table_type == 'work':
- cursor.execute(
- f"UPDATE {table_name} SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
- (doc_data.get('title'), doc_data.get('project_name'), doc_data.get('project_section'), doc_data.get('issuing_authority'), release_date, updater_id, source_id)
- )
- elif table_type == 'job':
- cursor.execute(
- f"UPDATE {table_name} SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s, updated_by = %s, updated_time = NOW() WHERE id = %s",
- (doc_data.get('title'), doc_data.get('issuing_authority'), doc_data.get('document_type'), release_date, updater_id, source_id)
- )
- conn.commit()
- return True, "文档更新成功"
- except Exception as e:
- logger.exception("编辑文档失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
- async def enter_document(self, doc_id: str, username: str) -> Tuple[bool, str]:
- """文档入库(单个)"""
- affected_rows, message = await self.batch_enter_knowledge_base([doc_id], username)
- return affected_rows > 0, message
-
- async def get_basic_info_list(
- self,
- type: str,
- page: int,
- size: int,
- keyword: Optional[str] = None,
- **filters
- ) -> Tuple[List[Dict[str, Any]], int]:
- """获取基本信息列表(关联主表获取文件和转换状态)"""
- conn = get_db_connection()
- if not conn:
- return [], 0
-
- cursor = conn.cursor()
-
- try:
- # 根据类型选择表名和字段映射
- if type == 'basis':
- table_name = "t_samp_standard_base_info"
- # 关联主表字段:file_url, conversion_status, md_url, json_url
- fields = """
- s.id, s.chinese_name as title, s.standard_number as standard_no,
- s.issuing_authority, s.release_date, s.document_type,
- s.professional_field, s.validity, s.created_by, s.created_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url, m.id as doc_id
- """
- field_map = {
- 'title': 's.chinese_name',
- 'standard_no': 's.standard_number',
- 'issuing_authority': 's.issuing_authority',
- 'release_date': 's.release_date',
- 'document_type': 's.document_type',
- 'professional_field': 's.professional_field',
- 'validity': 's.validity'
- }
- elif type == 'work':
- table_name = "t_samp_construction_plan_base_info"
- fields = """
- s.id, s.plan_name as title, NULL as standard_no,
- s.compiling_unit as issuing_authority, s.compiling_date as release_date,
- NULL as document_type, NULL as professional_field, NULL as validity,
- s.created_by, s.created_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url, m.id as doc_id
- """
- field_map = {
- 'title': 's.plan_name',
- 'issuing_authority': 's.compiling_unit',
- 'release_date': 's.compiling_date'
- }
- elif type == 'job':
- table_name = "t_samp_office_regulations"
- fields = """
- s.id, s.file_name as title, NULL as standard_no,
- s.issuing_department as issuing_authority, s.publish_date as release_date,
- s.document_type, NULL as professional_field, NULL as validity,
- s.created_by, s.created_time,
- m.file_url, m.conversion_status, m.md_url, m.json_url, m.id as doc_id
- """
- field_map = {
- 'title': 's.file_name',
- 'issuing_authority': 's.issuing_department',
- 'release_date': 's.publish_date',
- 'document_type': 's.document_type'
- }
- else:
- return [], 0
-
- where_clauses = []
- params = []
-
- # 统一关键字搜索
- if keyword:
- if type == 'basis':
- where_clauses.append("(s.chinese_name LIKE %s OR s.standard_number LIKE %s)")
- params.extend([f"%{keyword}%", f"%{keyword}%"])
- elif type == 'work':
- where_clauses.append("s.plan_name LIKE %s")
- params.append(f"%{keyword}%")
- elif type == 'job':
- where_clauses.append("s.file_name LIKE %s")
- params.append(f"%{keyword}%")
-
- # 精细化检索
- for filter_key, filter_value in filters.items():
- if not filter_value:
- continue
-
- db_field = field_map.get(filter_key)
- if db_field:
- where_clauses.append(f"{db_field} = %s")
- params.append(filter_value)
-
- where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
- offset = (page - 1) * size
-
- # 使用 LEFT JOIN 关联主表
- sql = f"""
- SELECT {fields}
- FROM {table_name} s
- LEFT JOIN t_samp_document_main m ON s.id = m.source_id AND m.source_type = %s
- {where_sql}
- ORDER BY s.created_time DESC
- LIMIT %s OFFSET %s
- """
- params = [type] + params + [size, offset]
-
- cursor.execute(sql, tuple(params))
- items = cursor.fetchall()
-
- # 总数
- count_sql = f"SELECT COUNT(*) as count FROM {table_name} s {where_sql}"
- cursor.execute(count_sql, tuple(params[1:-2]))
- res = cursor.fetchone()
- total = res['count'] if res else 0
-
- return items, total
- except Exception as e:
- logger.exception(f"获取 {type} 列表失败")
- return [], 0
- finally:
- cursor.close()
- conn.close()
- # ==================== 文档转换 ====================
-
- async def get_document_source_type(self, doc_id: str) -> Optional[str]:
- """获取文档的source_type"""
- conn = get_db_connection()
- if not conn:
- return None
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("SELECT source_type FROM t_samp_document_main WHERE id = %s", (doc_id,))
- res = cursor.fetchone()
- return res['source_type'] if res else None
- except Exception as e:
- logger.exception(f"获取文档source_type失败: {e}")
- return None
- finally:
- cursor.close()
- conn.close()
-
- async def get_document_title(self, doc_id: str) -> str:
- """获取文档标题"""
- conn = get_db_connection()
- if not conn:
- return "文档"
-
- cursor = conn.cursor()
-
- try:
- cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (doc_id,))
- res = cursor.fetchone()
- return res['title'] if res else "文档"
- except Exception as e:
- logger.exception(f"获取文档标题失败: {e}")
- return "文档"
- finally:
- cursor.close()
- conn.close()
-
- async def update_conversion_status(self, doc_id: str, status: int,
- md_url: Optional[str] = None,
- json_url: Optional[str] = None,
- error_message: Optional[str] = None) -> bool:
- """更新文档转换状态
-
- Args:
- doc_id: 文档ID
- status: 转换状态 (0=未转换, 1=转换中, 2=已完成, 3=失败)
- md_url: Markdown文件URL
- json_url: JSON文件URL
- error_message: 错误信息
- """
- conn = get_db_connection()
- if not conn:
- return False
-
- cursor = conn.cursor()
-
- try:
- update_clauses = ["conversion_status = %s"]
- params = [status]
- if error_message:
- update_clauses.append("conversion_error = %s")
- params.append(error_message)
-
- if md_url:
- update_clauses.append("md_url = %s")
- params.append(md_url)
-
- if json_url:
- update_clauses.append("json_url = %s")
- params.append(json_url)
-
- sql = f"UPDATE t_samp_document_main SET {', '.join(update_clauses)}, updated_time = NOW() WHERE id = %s"
- params.append(doc_id)
-
- cursor.execute(sql, tuple(params))
- conn.commit()
- return True
- except Exception as e:
- logger.exception(f"更新转换进度失败: {e}")
- conn.rollback()
- return False
- finally:
- cursor.close()
- conn.close()
- # ==================== 基础信息管理 ====================
- async def add_basic_info(self, type: str, data: Dict[str, Any], user_id: str) -> Tuple[bool, str]:
- """新增基本信息"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型"
-
- source_id = str(uuid.uuid4())
- doc_id = str(uuid.uuid4())
- file_url = data.get('file_url')
- file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
-
- # 1. 插入主表 (解耦触发器,手动同步)
- cursor.execute(
- """
- INSERT INTO t_samp_document_main (
- id, title, source_type, source_id, file_url,
- file_extension, created_by, updated_by, created_time, updated_time,
- conversion_status
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0)
- """,
- (
- doc_id, data.get('title'), type, source_id, file_url,
- file_extension, user_id, user_id
- )
- )
-
- # 2. 插入子表 (移除 file_url,因为它现在只存储在主表中)
- if type == 'basis':
- sql = f"INSERT INTO {table_name} (id, chinese_name, standard_number, issuing_authority, release_date, document_type, professional_field, validity, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())"
- params = (source_id, data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), data.get('document_type'), data.get('professional_field'), data.get('validity', '现行'), user_id)
- elif type == 'work':
- sql = f"INSERT INTO {table_name} (id, plan_name, project_name, project_section, compiling_unit, compiling_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())"
- params = (source_id, data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')), user_id)
- elif type == 'job':
- sql = f"INSERT INTO {table_name} (id, file_name, issuing_department, document_type, publish_date, created_by, created_time, updated_time) VALUES (%s, %s, %s, %s, %s, %s, NOW(), NOW())"
- params = (source_id, data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')), user_id)
- else:
- return False, "不支持的类型"
-
- cursor.execute(sql, params)
-
- conn.commit()
- return True, "新增成功"
- except Exception as e:
- logger.exception("新增基本信息失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
- async def edit_basic_info(self, type: str, info_id: str, data: Dict[str, Any], updater_id: str) -> Tuple[bool, str]:
- """编辑基本信息"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型"
-
- file_url = data.get('file_url')
- file_extension = file_url.split('.')[-1] if file_url and '.' in file_url else None
- # 1. 更新主表 (解耦触发器)
- cursor.execute(
- """
- UPDATE t_samp_document_main
- SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW()
- WHERE source_id = %s AND source_type = %s
- """,
- (data.get('title'), file_url, file_extension, updater_id, info_id, type)
- )
- # 2. 更新子表 (移除 file_url)
- if type == 'basis':
- sql = f"""
- UPDATE {table_name}
- SET chinese_name = %s, standard_number = %s, issuing_authority = %s, release_date = %s,
- document_type = %s, professional_field = %s, validity = %s,
- english_name = %s, implementation_date = %s, drafting_unit = %s,
- approving_department = %s, participating_units = %s, engineering_phase = %s,
- reference_basis = %s, source_url = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
- params = (
- data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
- data.get('document_type'), data.get('professional_field'), data.get('validity'),
- data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'),
- data.get('approving_department'), data.get('participating_units'), data.get('engineering_phase'),
- data.get('reference_basis'), data.get('source_url'),
- updater_id, info_id
- )
- elif type == 'work':
- # 构造 compilation_basis 更新部分
- basis_updates = ", ".join([f"compilation_basis_{i} = %s" for i in range(1, 10)])
-
- sql = f"""
- UPDATE {table_name}
- SET plan_name = %s, project_name = %s, project_section = %s, compiling_unit = %s, compiling_date = %s,
- plan_summary = %s, {basis_updates},
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
-
- # 准备 compilation_basis 参数
- basis_params = [data.get(f'compilation_basis_{i}') for i in range(1, 10)]
-
- params = [
- data.get('title'), data.get('project_name'), data.get('project_section'), data.get('issuing_authority'), self._to_date(data.get('release_date')),
- data.get('plan_summary')
- ] + basis_params + [updater_id, info_id]
-
- # 转换为 tuple
- params = tuple(params)
-
- elif type == 'job':
- sql = f"""
- UPDATE {table_name}
- SET file_name = %s, issuing_department = %s, document_type = %s, publish_date = %s,
- effective_start_date = %s, effective_end_date = %s,
- updated_by = %s, updated_time = NOW()
- WHERE id = %s
- """
- params = (
- data.get('title'), data.get('issuing_authority'), data.get('document_type'), self._to_date(data.get('release_date')),
- self._to_date(data.get('effective_start_date')), self._to_date(data.get('effective_end_date')),
- updater_id, info_id
- )
- else:
- return False, "不支持的类型"
-
- cursor.execute(sql, params)
-
- conn.commit()
- return True, "编辑成功"
- except Exception as e:
- logger.exception("编辑基本信息失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
- async def delete_basic_info(self, type: str, info_id: str) -> Tuple[bool, str]:
- """删除基本信息"""
- conn = get_db_connection()
- if not conn:
- return False, "数据库连接失败"
-
- cursor = conn.cursor()
- try:
- table_name = TABLE_MAP.get(type)
- if not table_name:
- return False, "无效的类型"
-
- # 1. 删除子表记录 (触发器会自动删除主表记录)
- cursor.execute(f"DELETE FROM {table_name} WHERE id = %s", (info_id,))
-
- conn.commit()
- return True, "删除成功"
- except Exception as e:
- logger.exception("删除基本信息失败")
- conn.rollback()
- return False, str(e)
- finally:
- cursor.close()
- conn.close()
|