|
|
@@ -111,7 +111,7 @@ class SampleService:
|
|
|
Args:
|
|
|
doc_ids: 文档ID列表
|
|
|
username: 操作人
|
|
|
- kb_id: 知识库ID
|
|
|
+ kb_id: 知识库ID (可选,若不传则根据 source_type 自动匹配)
|
|
|
kb_method: 切分方法
|
|
|
"""
|
|
|
conn = get_db_connection()
|
|
|
@@ -129,7 +129,7 @@ class SampleService:
|
|
|
# 1. 获取所有选中选中的文档详情
|
|
|
placeholders = ','.join(['%s']*len(doc_ids))
|
|
|
fetch_sql = f"""
|
|
|
- SELECT id, title, source_type, md_url, conversion_status, whether_to_enter, created_time
|
|
|
+ SELECT id, title, source_type, md_url, conversion_status, whether_to_enter, created_time, kb_id
|
|
|
FROM t_samp_document_main
|
|
|
WHERE id IN ({placeholders})
|
|
|
"""
|
|
|
@@ -146,6 +146,7 @@ class SampleService:
|
|
|
status = doc.get('conversion_status')
|
|
|
whether_to_enter = doc.get('whether_to_enter', 0)
|
|
|
md_url = doc.get('md_url')
|
|
|
+ source_type = doc.get('source_type')
|
|
|
|
|
|
# A. 检查是否已入库
|
|
|
if whether_to_enter == 1:
|
|
|
@@ -168,7 +169,38 @@ class SampleService:
|
|
|
error_details.append(f"· {title}: 转换结果地址丢失")
|
|
|
continue
|
|
|
|
|
|
- # B. 从 MinIO 获取 Markdown 内容
|
|
|
+ # C. 确定入库策略 (严格使用弹窗传入的参数)
|
|
|
+ # 不从数据库读取旧的 kb_method,保证入库逻辑由本次操作决定
|
|
|
+ current_kb_id = kb_id or doc.get('kb_id')
|
|
|
+ current_kb_method = kb_method # 直接使用前端传来的切分方式
|
|
|
+
|
|
|
+ if not current_kb_id:
|
|
|
+ logger.warning(f"文档 {title}({doc_id}) 未指定知识库,跳过入库")
|
|
|
+ failed_count += 1
|
|
|
+ error_details.append(f"· {title}: 未指定目标知识库")
|
|
|
+ continue
|
|
|
+
|
|
|
+ if not current_kb_method:
|
|
|
+ logger.warning(f"文档 {title}({doc_id}) 未指定切分方式,跳过入库")
|
|
|
+ failed_count += 1
|
|
|
+ error_details.append(f"· {title}: 未指定切分策略")
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 获取知识库信息 (collection_name_parent, collection_name_children)
|
|
|
+ kb_sql = "SELECT collection_name_parent, collection_name_children FROM t_samp_knowledge_base WHERE id = %s AND is_deleted = 0"
|
|
|
+ cursor.execute(kb_sql, (current_kb_id,))
|
|
|
+ kb_res = cursor.fetchone()
|
|
|
+
|
|
|
+ if not kb_res:
|
|
|
+ logger.warning(f"找不到指定的知识库: id={current_kb_id}")
|
|
|
+ failed_count += 1
|
|
|
+ error_details.append(f"· {title}: 指定的知识库不存在或已被删除")
|
|
|
+ continue
|
|
|
+
|
|
|
+ collection_name_parent = kb_res['collection_name_parent']
|
|
|
+ collection_name_children = kb_res['collection_name_children']
|
|
|
+
|
|
|
+ # D. 从 MinIO 获取 Markdown 内容
|
|
|
try:
|
|
|
md_content = self.minio_manager.get_object_content(md_url)
|
|
|
if not md_content:
|
|
|
@@ -179,39 +211,32 @@ class SampleService:
|
|
|
error_details.append(f"· {title}: 读取云端文件失败")
|
|
|
continue
|
|
|
|
|
|
- # C. 调用 MilvusService 进行切分和入库
|
|
|
+ # E. 调用 MilvusService 进行切分和入库
|
|
|
try:
|
|
|
- # 如果有 kb_id,需要根据它获取 collection_name
|
|
|
- collection_name = None
|
|
|
- if kb_id:
|
|
|
- kb_sql = "SELECT collection_name FROM t_samp_knowledge_base WHERE id = %s"
|
|
|
- cursor.execute(kb_sql, (kb_id,))
|
|
|
- kb_res = cursor.fetchone()
|
|
|
- if kb_res:
|
|
|
- collection_name = kb_res['collection_name']
|
|
|
-
|
|
|
# 准备元数据
|
|
|
+ current_date = int(datetime.now().strftime('%Y%m%d'))
|
|
|
doc_info = {
|
|
|
"doc_id": doc_id,
|
|
|
"doc_name": title,
|
|
|
- "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else 20260127,
|
|
|
- "tags": doc.get('source_type') or 'unknown',
|
|
|
+ "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else current_date,
|
|
|
+ "tags": source_type or 'unknown',
|
|
|
"user_id": username, # 传递操作人作为 created_by
|
|
|
- "kb_id": kb_id,
|
|
|
- "kb_method": kb_method,
|
|
|
- "collection_name": collection_name
|
|
|
+ "kb_id": current_kb_id,
|
|
|
+ "kb_method": current_kb_method,
|
|
|
+ "collection_name_parent": collection_name_parent,
|
|
|
+ "collection_name_children": collection_name_children
|
|
|
}
|
|
|
await self.milvus_service.insert_knowledge(md_content, doc_info)
|
|
|
|
|
|
- # D. 添加到任务管理中心 (类型为 data)
|
|
|
+ # F. 添加到任务管理中心 (类型为 data)
|
|
|
try:
|
|
|
await task_service.add_task(doc_id, 'data')
|
|
|
except Exception as task_err:
|
|
|
logger.error(f"添加文档 {title} 到任务中心失败: {task_err}")
|
|
|
|
|
|
- # E. 更新数据库状态
|
|
|
+ # G. 更新数据库状态
|
|
|
update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 1, kb_id = %s, kb_method = %s, updated_by = %s, updated_time = NOW() WHERE id = %s"
|
|
|
- cursor.execute(update_sql, (kb_id, kb_method, username, doc_id))
|
|
|
+ cursor.execute(update_sql, (current_kb_id, current_kb_method, username, doc_id))
|
|
|
success_count += 1
|
|
|
|
|
|
except Exception as milvus_err:
|
|
|
@@ -385,8 +410,9 @@ class SampleService:
|
|
|
LEFT JOIN {sub_table} s ON m.id = s.id
|
|
|
LEFT JOIN t_sys_user u1 ON m.created_by = u1.id
|
|
|
LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id
|
|
|
+ LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id
|
|
|
"""
|
|
|
- fields_sql = "m.*, s.*, u1.username as creator_name, u2.username as updater_name, m.id as id"
|
|
|
+ fields_sql = "m.*, s.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name, m.id as id"
|
|
|
where_clauses.append("m.source_type = %s")
|
|
|
params.append(table_type)
|
|
|
order_sql = "m.created_time DESC"
|
|
|
@@ -407,8 +433,8 @@ class SampleService:
|
|
|
where_clauses.append("s.level_4_classification = %s")
|
|
|
params.append(level_4_classification)
|
|
|
else:
|
|
|
- from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id"
|
|
|
- fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name"
|
|
|
+ from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id"
|
|
|
+ fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name"
|
|
|
order_sql = "m.created_time DESC"
|
|
|
title_field = "m.title"
|
|
|
|
|
|
@@ -431,7 +457,6 @@ class SampleService:
|
|
|
sql = f"SELECT {fields_sql} FROM {from_sql} {where_sql} ORDER BY {order_sql} LIMIT %s OFFSET %s"
|
|
|
params.extend([size, offset])
|
|
|
|
|
|
- logger.info(f"Executing SQL: {sql} with params: {params}")
|
|
|
cursor.execute(sql, tuple(params))
|
|
|
items = [self._format_document_row(row) for row in cursor.fetchall()]
|
|
|
|
|
|
@@ -546,12 +571,13 @@ class SampleService:
|
|
|
INSERT INTO t_samp_document_main (
|
|
|
id, title, source_type, file_url,
|
|
|
file_extension, created_by, updated_by, created_time, updated_time,
|
|
|
- conversion_status, whether_to_task
|
|
|
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0)
|
|
|
+ conversion_status, whether_to_task, kb_id
|
|
|
+ ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s)
|
|
|
""",
|
|
|
(
|
|
|
doc_id, doc_data.get('title'), table_type, file_url,
|
|
|
- doc_data.get('file_extension'), user_id, user_id
|
|
|
+ doc_data.get('file_extension'), user_id, user_id,
|
|
|
+ doc_data.get('kb_id')
|
|
|
)
|
|
|
)
|
|
|
|
|
|
@@ -648,14 +674,14 @@ class SampleService:
|
|
|
# 1. 更新主表
|
|
|
cursor.execute(
|
|
|
"""
|
|
|
- UPDATE t_samp_document_main
|
|
|
- SET title = %s, file_url = %s, file_extension = %s,
|
|
|
- updated_by = %s, updated_time = NOW()
|
|
|
+ UPDATE t_samp_document_main SET
|
|
|
+ title = %s, file_url = %s, file_extension = %s,
|
|
|
+ updated_by = %s, updated_time = NOW(), kb_id = %s
|
|
|
WHERE id = %s
|
|
|
""",
|
|
|
(
|
|
|
doc_data.get('title'), file_url, doc_data.get('file_extension'),
|
|
|
- updater_id, doc_id
|
|
|
+ updater_id, doc_data.get('kb_id'), doc_id
|
|
|
)
|
|
|
)
|
|
|
|
|
|
@@ -754,7 +780,7 @@ class SampleService:
|
|
|
s.participating_units, s.reference_basis,
|
|
|
s.created_by, u1.username as creator_name, s.created_time,
|
|
|
s.updated_by, u2.username as updater_name, s.updated_time,
|
|
|
- m.file_url, m.conversion_status, m.md_url, m.json_url
|
|
|
+ m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
|
|
|
"""
|
|
|
field_map = {
|
|
|
'title': 's.chinese_name',
|
|
|
@@ -778,7 +804,7 @@ class SampleService:
|
|
|
s.note,
|
|
|
s.created_by, u1.username as creator_name, s.created_time,
|
|
|
s.updated_by, u2.username as updater_name, s.updated_time,
|
|
|
- m.file_url, m.conversion_status, m.md_url, m.json_url
|
|
|
+ m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
|
|
|
"""
|
|
|
field_map = {
|
|
|
'title': 's.plan_name',
|
|
|
@@ -799,7 +825,7 @@ class SampleService:
|
|
|
s.note,
|
|
|
s.created_by, u1.username as creator_name, s.created_time,
|
|
|
s.updated_by, u2.username as updater_name, s.updated_time,
|
|
|
- m.file_url, m.conversion_status, m.md_url, m.json_url
|
|
|
+ m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
|
|
|
"""
|
|
|
field_map = {
|
|
|
'title': 's.file_name',
|
|
|
@@ -860,11 +886,12 @@ class SampleService:
|
|
|
|
|
|
# 使用 LEFT JOIN 关联主表和用户表获取姓名
|
|
|
sql = f"""
|
|
|
- SELECT {fields}
|
|
|
+ SELECT {fields}, kb.name as kb_name
|
|
|
FROM {table_name} s
|
|
|
LEFT JOIN t_samp_document_main m ON s.id = m.id
|
|
|
LEFT JOIN t_sys_user u1 ON s.created_by = u1.id
|
|
|
LEFT JOIN t_sys_user u2 ON s.updated_by = u2.id
|
|
|
+ LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id
|
|
|
{where_sql}
|
|
|
ORDER BY s.created_time DESC
|
|
|
LIMIT %s OFFSET %s
|
|
|
@@ -1008,12 +1035,12 @@ class SampleService:
|
|
|
INSERT INTO t_samp_document_main (
|
|
|
id, title, source_type, file_url,
|
|
|
file_extension, created_by, updated_by, created_time, updated_time,
|
|
|
- conversion_status, whether_to_task
|
|
|
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0)
|
|
|
+ conversion_status, whether_to_task, kb_id
|
|
|
+ ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s)
|
|
|
""",
|
|
|
(
|
|
|
doc_id, data.get('title'), type, file_url,
|
|
|
- file_extension, user_id, user_id
|
|
|
+ file_extension, user_id, user_id, data.get('kb_id')
|
|
|
)
|
|
|
)
|
|
|
|
|
|
@@ -1122,10 +1149,10 @@ class SampleService:
|
|
|
cursor.execute(
|
|
|
"""
|
|
|
UPDATE t_samp_document_main
|
|
|
- SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW()
|
|
|
+ SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW(), kb_id = %s
|
|
|
WHERE id = %s
|
|
|
""",
|
|
|
- (data.get('title'), file_url, file_extension, updater_id, doc_id)
|
|
|
+ (data.get('title'), file_url, file_extension, updater_id, data.get('kb_id'), doc_id)
|
|
|
)
|
|
|
|
|
|
# 2. 更新子表 (移除 file_url)
|
|
|
@@ -1200,6 +1227,10 @@ class SampleService:
|
|
|
|
|
|
async def delete_basic_info(self, type: str, doc_id: str) -> Tuple[bool, str]:
|
|
|
"""删除基本信息"""
|
|
|
+ if not doc_id:
|
|
|
+ return False, "缺少 ID 参数"
|
|
|
+
|
|
|
+ logger.info(f"Deleting basic info: type={type}, id={doc_id}")
|
|
|
conn = get_db_connection()
|
|
|
if not conn:
|
|
|
return False, "数据库连接失败"
|
|
|
@@ -1210,21 +1241,44 @@ class SampleService:
|
|
|
if not table_name:
|
|
|
return False, "无效的类型"
|
|
|
|
|
|
- # 1. 删除主表记录 (由于设置了 ON DELETE CASCADE,子表记录会自动删除)
|
|
|
+ # 1. 显式删除子表记录 (防止 CASCADE 未生效)
|
|
|
+ try:
|
|
|
+ cursor.execute(f"DELETE FROM {table_name} WHERE id = %s", (doc_id,))
|
|
|
+ logger.info(f"Deleted from sub-table {table_name}, affected: {cursor.rowcount}")
|
|
|
+ except Exception as sub_e:
|
|
|
+ logger.warning(f"删除子表 {table_name} 记录失败 (可能不存在): {sub_e}")
|
|
|
+
|
|
|
+ # 2. 同步删除任务管理中心的数据 (优先删除关联数据)
|
|
|
+ try:
|
|
|
+ # 使用当前事务删除任务记录(如果 task_service 支持的话,目前它自建连接)
|
|
|
+ # 这里我们直接在当前 cursor 中也执行一次,确保事务一致性
|
|
|
+ cursor.execute("DELETE FROM t_task_management WHERE business_id = %s", (doc_id,))
|
|
|
+ logger.info(f"Deleted from t_task_management, affected: {cursor.rowcount}")
|
|
|
+ except Exception as task_e:
|
|
|
+ logger.warning(f"在主事务中删除任务记录失败: {task_e}")
|
|
|
+
|
|
|
+ # 3. 删除主表记录
|
|
|
cursor.execute("DELETE FROM t_samp_document_main WHERE id = %s", (doc_id,))
|
|
|
+ affected_main = cursor.rowcount
|
|
|
+ logger.info(f"Deleted from t_samp_document_main, affected: {affected_main}")
|
|
|
|
|
|
- # 同步删除任务管理中心的数据
|
|
|
+ if affected_main == 0:
|
|
|
+ logger.warning(f"未找到主表记录: {doc_id}")
|
|
|
+ # 即使主表没找到,我们也 commit 之前的操作并返回成功(幂等性)
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ # 4. 再次确保任务中心数据已删除 (调用原有服务)
|
|
|
try:
|
|
|
await task_service.delete_task(doc_id)
|
|
|
except Exception as task_err:
|
|
|
- logger.error(f"同步删除任务中心数据失败 (ID: {doc_id}): {task_err}")
|
|
|
+ logger.error(f"调用 task_service 删除任务失败: {task_err}")
|
|
|
|
|
|
- conn.commit()
|
|
|
return True, "删除成功"
|
|
|
except Exception as e:
|
|
|
- logger.exception("删除基本信息失败")
|
|
|
+ logger.exception(f"删除基本信息异常 (ID: {doc_id})")
|
|
|
conn.rollback()
|
|
|
- return False, str(e)
|
|
|
+ return False, f"删除失败: {str(e)}"
|
|
|
finally:
|
|
|
cursor.close()
|
|
|
conn.close()
|