|
|
@@ -2,6 +2,7 @@
|
|
|
import logging
|
|
|
import json
|
|
|
import httpx
|
|
|
+from datetime import datetime
|
|
|
from typing import List, Dict, Any, Tuple, Optional
|
|
|
from app.base.async_mysql_connection import get_db_connection
|
|
|
from app.base.minio_connection import get_minio_manager
|
|
|
@@ -11,97 +12,76 @@ logger = logging.getLogger(__name__)
|
|
|
class TaskService:
|
|
|
"""任务管理服务类"""
|
|
|
|
|
|
+ _schema_verified = False # 类级别变量,确保 DDL 逻辑只运行一次
|
|
|
+
|
|
|
def __init__(self):
|
|
|
self.minio_manager = get_minio_manager()
|
|
|
|
|
|
- async def get_task_list(self, task_type: str) -> List[Dict[str, Any]]:
|
|
|
- """获取任务列表
|
|
|
-
|
|
|
- Args:
|
|
|
- task_type: 任务类型, 'data' 或 'image'
|
|
|
- """
|
|
|
- conn = get_db_connection()
|
|
|
- if not conn:
|
|
|
- return []
|
|
|
+ async def _ensure_table_schema(self, cursor, conn):
|
|
|
+ """确保表结构和索引正确 (DDL 操作)"""
|
|
|
+ if TaskService._schema_verified:
|
|
|
+ return
|
|
|
|
|
|
- cursor = conn.cursor()
|
|
|
try:
|
|
|
- if task_type == 'data':
|
|
|
- # 类型为数据的,从 t_samp_document_main 拿名称
|
|
|
- sql = """
|
|
|
- SELECT
|
|
|
- t.id,
|
|
|
- t.business_id,
|
|
|
- t.task_id,
|
|
|
- t.project_id,
|
|
|
- t.type,
|
|
|
- t.annotation_status,
|
|
|
- t.project_id as project_name,
|
|
|
- d.title as name
|
|
|
- FROM t_task_management t
|
|
|
- JOIN t_samp_document_main d ON t.business_id COLLATE utf8mb4_unicode_ci = d.id COLLATE utf8mb4_unicode_ci
|
|
|
- WHERE t.type = 'data' AND d.whether_to_task = 1
|
|
|
- ORDER BY d.created_time DESC
|
|
|
- """
|
|
|
- elif task_type == 'image':
|
|
|
- # 类型为图片的,从 t_image_info 拿名称和 URL
|
|
|
- sql = """
|
|
|
- SELECT
|
|
|
- t.id,
|
|
|
- t.business_id,
|
|
|
- t.task_id,
|
|
|
- t.project_id,
|
|
|
- t.type,
|
|
|
- t.annotation_status,
|
|
|
- t.project_id as project_name,
|
|
|
- i.image_name as name,
|
|
|
- i.image_url
|
|
|
- FROM t_task_management t
|
|
|
- JOIN t_image_info i ON t.business_id COLLATE utf8mb4_unicode_ci = i.id COLLATE utf8mb4_unicode_ci
|
|
|
- WHERE t.type = 'image' AND i.whether_to_task = 1
|
|
|
- ORDER BY i.created_time DESC
|
|
|
- """
|
|
|
- else:
|
|
|
- return []
|
|
|
-
|
|
|
- cursor.execute(sql)
|
|
|
- tasks = cursor.fetchall()
|
|
|
+ # 1. 动态维护字段
|
|
|
+ cursor.execute("SHOW COLUMNS FROM t_task_management LIKE 'tag'")
|
|
|
+ if not cursor.fetchone():
|
|
|
+ cursor.execute("ALTER TABLE t_task_management ADD COLUMN tag json NULL COMMENT '标签' AFTER type")
|
|
|
|
|
|
- for item in tasks:
|
|
|
- # 统一返回结构,旧代码可能还在找 metadata 字典,这里给个空的
|
|
|
- item['metadata'] = {}
|
|
|
-
|
|
|
- # 如果是图片类型,处理 URL 转换以支持前端预览
|
|
|
- if task_type == 'image' and item.get('image_url'):
|
|
|
- image_url = item.get('image_url')
|
|
|
- if image_url and not image_url.startswith(('http://', 'https://')):
|
|
|
- item['image_url'] = self.minio_manager.get_full_url(image_url)
|
|
|
+ cursor.execute("SHOW COLUMNS FROM t_task_management LIKE 'metadata'")
|
|
|
+ if not cursor.fetchone():
|
|
|
+ cursor.execute("ALTER TABLE t_task_management ADD COLUMN metadata json NULL COMMENT '业务元数据' AFTER tag")
|
|
|
|
|
|
- return tasks
|
|
|
+ cursor.execute("SHOW COLUMNS FROM t_task_management LIKE 'project_name'")
|
|
|
+ if not cursor.fetchone():
|
|
|
+ cursor.execute("ALTER TABLE t_task_management ADD COLUMN project_name varchar(255) NULL COMMENT '项目显示名称' AFTER project_id")
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ # 2. 处理索引冲突
|
|
|
+ cursor.execute("SHOW INDEX FROM t_task_management WHERE Column_name = 'business_id'")
|
|
|
+ indexes = cursor.fetchall()
|
|
|
+ for idx in indexes:
|
|
|
+ if not idx['Non_unique'] and idx['Key_name'] != 'PRIMARY' and idx['Seq_in_index'] == 1:
|
|
|
+ cursor.execute(f"SHOW INDEX FROM t_task_management WHERE Key_name = '{idx['Key_name']}'")
|
|
|
+ if len(cursor.fetchall()) == 1:
|
|
|
+ cursor.execute(f"DROP INDEX {idx['Key_name']} ON t_task_management")
|
|
|
+ logger.info(f"Dropped old unique index: {idx['Key_name']}")
|
|
|
+
|
|
|
+ cursor.execute("SHOW INDEX FROM t_task_management WHERE Key_name = 'uk_business_project'")
|
|
|
+ if not cursor.fetchone():
|
|
|
+ cursor.execute("CREATE UNIQUE INDEX uk_business_project ON t_task_management (business_id, project_id)")
|
|
|
+ logger.info("Created new composite unique index: uk_business_project")
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+ TaskService._schema_verified = True
|
|
|
except Exception as e:
|
|
|
- logger.exception(f"获取任务列表失败 ({task_type}): {e}")
|
|
|
- return []
|
|
|
- finally:
|
|
|
- cursor.close()
|
|
|
- conn.close()
|
|
|
+ logger.warning(f"表结构维护失败: {e}")
|
|
|
+ conn.rollback()
|
|
|
|
|
|
- async def add_task(self, business_id: str, task_type: str, task_id: str = None, project_id: str = None) -> Tuple[bool, str, Optional[int]]:
|
|
|
- """添加或更新任务记录 (适配单表结构,直接使用 project_id 存项目名)"""
|
|
|
+ async def add_task(self, business_id: str, task_type: str, task_id: str = None, project_id: str = None, project_name: str = None, tag: str = None, metadata: str = None) -> Tuple[bool, str, Optional[int]]:
|
|
|
+ """添加或更新任务记录 (适配单表结构)"""
|
|
|
conn = get_db_connection()
|
|
|
if not conn:
|
|
|
return False, "数据库连接失败", None
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
try:
|
|
|
+ # 确保表结构(仅在第一次调用时执行)
|
|
|
+ await self._ensure_table_schema(cursor, conn)
|
|
|
+
|
|
|
+ # 执行插入/更新
|
|
|
sql = """
|
|
|
- INSERT INTO t_task_management (business_id, task_id, project_id, type, annotation_status)
|
|
|
- VALUES (%s, %s, %s, %s, %s)
|
|
|
+ INSERT INTO t_task_management (business_id, task_id, project_id, project_name, type, annotation_status, tag, metadata)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
ON DUPLICATE KEY UPDATE
|
|
|
task_id = IFNULL(VALUES(task_id), task_id),
|
|
|
- project_id = IFNULL(VALUES(project_id), project_id),
|
|
|
- annotation_status = IFNULL(VALUES(annotation_status), annotation_status)
|
|
|
+ project_name = IFNULL(VALUES(project_name), project_name),
|
|
|
+ annotation_status = IFNULL(VALUES(annotation_status), annotation_status),
|
|
|
+ tag = IFNULL(VALUES(tag), tag),
|
|
|
+ metadata = IFNULL(VALUES(metadata), metadata)
|
|
|
"""
|
|
|
- cursor.execute(sql, (business_id, task_id, project_id, task_type, 'pending'))
|
|
|
+ cursor.execute(sql, (business_id, task_id, project_id, project_name, task_type, 'pending', tag, metadata))
|
|
|
record_id = cursor.lastrowid
|
|
|
|
|
|
conn.commit()
|
|
|
@@ -125,11 +105,112 @@ class TaskService:
|
|
|
sql = "DELETE FROM t_task_management WHERE id = %s"
|
|
|
cursor.execute(sql, (id,))
|
|
|
conn.commit()
|
|
|
- return True, "删除成功"
|
|
|
+ return True, "成功"
|
|
|
except Exception as e:
|
|
|
- logger.exception(f"根据id删除任务失败: {e}")
|
|
|
conn.rollback()
|
|
|
- return False, f"删除失败: {str(e)}"
|
|
|
+ logger.exception(f"删除任务失败: {e}")
|
|
|
+ return False, str(e)
|
|
|
+ finally:
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ def _serialize_datetime(self, obj: Any) -> Any:
|
|
|
+ """递归遍历对象,将 datetime 转换为字符串"""
|
|
|
+ if isinstance(obj, dict):
|
|
|
+ return {k: self._serialize_datetime(v) for k, v in obj.items()}
|
|
|
+ elif isinstance(obj, list):
|
|
|
+ return [self._serialize_datetime(i) for i in obj]
|
|
|
+ elif isinstance(obj, datetime):
|
|
|
+ return obj.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
+ return obj
|
|
|
+
|
|
|
+ async def get_task_list(self, task_type: str) -> List[Dict[str, Any]]:
|
|
|
+ """获取项目列表 (按 project_id 聚合)"""
|
|
|
+ conn = get_db_connection()
|
|
|
+ if not conn or task_type not in ['data', 'image']:
|
|
|
+ return []
|
|
|
+
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ # 确保表结构
|
|
|
+ await self._ensure_table_schema(cursor, conn)
|
|
|
+
|
|
|
+ # 修改聚合逻辑:返回 project_id (UUID) 和 project_name (文字)
|
|
|
+ sql = """
|
|
|
+ SELECT
|
|
|
+ project_id,
|
|
|
+ MAX(project_name) as project_name,
|
|
|
+ MAX(tag) as tag,
|
|
|
+ MAX(id) as sort_id,
|
|
|
+ COUNT(*) as file_count
|
|
|
+ FROM t_task_management
|
|
|
+ WHERE type = %s
|
|
|
+ GROUP BY project_id
|
|
|
+ ORDER BY sort_id DESC
|
|
|
+ """
|
|
|
+ cursor.execute(sql, (task_type,))
|
|
|
+ rows = cursor.fetchall()
|
|
|
+
|
|
|
+ # 兼容旧数据:如果 project_name 为空,则用 project_id 代替
|
|
|
+ for row in rows:
|
|
|
+ if not row['project_name']:
|
|
|
+ row['project_name'] = row['project_id']
|
|
|
+
|
|
|
+ return self._serialize_datetime(rows)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取任务列表失败: {e}")
|
|
|
+ return []
|
|
|
+ finally:
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+ async def get_project_details(self, project_id: str, task_type: str) -> List[Dict[str, Any]]:
|
|
|
+ """获取项目详情 (按 project_id 查询)"""
|
|
|
+ conn = get_db_connection()
|
|
|
+ if not conn:
|
|
|
+ return []
|
|
|
+
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ # 确保表结构
|
|
|
+ await self._ensure_table_schema(cursor, conn)
|
|
|
+
|
|
|
+ # 修改查询逻辑:获取 project_name 并在结果中包含它
|
|
|
+ sql = """
|
|
|
+ SELECT
|
|
|
+ id, business_id, task_id, project_id, project_name,
|
|
|
+ type, annotation_status, tag, metadata
|
|
|
+ FROM t_task_management
|
|
|
+ WHERE project_id = %s AND type = %s
|
|
|
+ """
|
|
|
+ cursor.execute(sql, (project_id, task_type))
|
|
|
+ rows = cursor.fetchall()
|
|
|
+
|
|
|
+ # 处理 tag 和 metadata 的 JSON 解析
|
|
|
+ for row in rows:
|
|
|
+ if row.get('tag'):
|
|
|
+ try: row['tag'] = json.loads(row['tag']) if isinstance(row['tag'], str) else row['tag']
|
|
|
+ except: pass
|
|
|
+ if row.get('metadata'):
|
|
|
+ try:
|
|
|
+ meta = json.loads(row['metadata']) if isinstance(row['metadata'], str) else row['metadata']
|
|
|
+ row['metadata'] = meta
|
|
|
+ # 提取名称供前端显示
|
|
|
+ if row['type'] == 'data':
|
|
|
+ row['name'] = meta.get('title') or meta.get('filename') or row['business_id']
|
|
|
+ elif row['type'] == 'image':
|
|
|
+ row['name'] = meta.get('image_name') or row['business_id']
|
|
|
+ else:
|
|
|
+ row['name'] = row['business_id']
|
|
|
+ except:
|
|
|
+ row['name'] = row['business_id']
|
|
|
+ else:
|
|
|
+ row['name'] = row['business_id']
|
|
|
+
|
|
|
+ return self._serialize_datetime(rows)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"获取项目详情失败: {e}")
|
|
|
+ return []
|
|
|
finally:
|
|
|
cursor.close()
|
|
|
conn.close()
|
|
|
@@ -154,7 +235,86 @@ class TaskService:
|
|
|
cursor.close()
|
|
|
conn.close()
|
|
|
|
|
|
- # create_anno_project 已被废弃,改为单表 batch_add 逻辑
|
|
|
+ async def create_anno_project(self, data: Dict[str, Any]) -> Tuple[bool, str]:
|
|
|
+ """创建标注项目并同步任务数据"""
|
|
|
+ project_name = data.get('name')
|
|
|
+ if not project_name:
|
|
|
+ return False, "项目名称不能为空"
|
|
|
+
|
|
|
+ # 0. 统一使用 UUID 方案获取或生成 project_id
|
|
|
+ conn = get_db_connection()
|
|
|
+ if not conn:
|
|
|
+ return False, "数据库连接失败"
|
|
|
+
|
|
|
+ cursor = conn.cursor()
|
|
|
+ try:
|
|
|
+ project_id = None
|
|
|
+ cursor.execute("SELECT project_id FROM t_task_management WHERE project_name = %s LIMIT 1", (project_name,))
|
|
|
+ existing_project = cursor.fetchone()
|
|
|
+ if existing_project:
|
|
|
+ project_id = existing_project['project_id']
|
|
|
+ else:
|
|
|
+ import uuid
|
|
|
+ project_id = str(uuid.uuid4())
|
|
|
+
|
|
|
+ task_type = data.get('task_type', 'data')
|
|
|
+ # 映射回内部类型
|
|
|
+ internal_type_map = {
|
|
|
+ 'text_classification': 'data',
|
|
|
+ 'image_classification': 'image'
|
|
|
+ }
|
|
|
+ internal_task_type = internal_type_map.get(task_type, task_type)
|
|
|
+
|
|
|
+ tasks_data = data.get('data', [])
|
|
|
+ if not tasks_data:
|
|
|
+ return False, "任务数据不能为空"
|
|
|
+
|
|
|
+ # 提取全局标签名列表
|
|
|
+ global_tags = []
|
|
|
+ if data.get('tags'):
|
|
|
+ global_tags = [t['tag'] for t in data['tags'] if 'tag' in t]
|
|
|
+
|
|
|
+ # 批量写入任务
|
|
|
+ import json
|
|
|
+ tag_str = json.dumps(global_tags, ensure_ascii=False) if global_tags else None
|
|
|
+
|
|
|
+ for item in tasks_data:
|
|
|
+ business_id = item.get('id')
|
|
|
+ if not business_id: continue
|
|
|
+
|
|
|
+ # 检查是否已存在 (使用联合主键逻辑)
|
|
|
+ cursor.execute(
|
|
|
+ "SELECT id FROM t_task_management WHERE business_id = %s AND project_id = %s",
|
|
|
+ (business_id, project_id)
|
|
|
+ )
|
|
|
+ if cursor.fetchone():
|
|
|
+ continue
|
|
|
+
|
|
|
+ metadata_str = json.dumps(item.get('metadata', {}), ensure_ascii=False)
|
|
|
+
|
|
|
+ sql = """
|
|
|
+ INSERT INTO t_task_management
|
|
|
+ (business_id, type, project_id, project_name, tag, metadata, annotation_status)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, 'pending')
|
|
|
+ """
|
|
|
+ cursor.execute(sql, (business_id, internal_task_type, project_id, project_name, tag_str, metadata_str))
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+
|
|
|
+ # 自动推送至外部平台
|
|
|
+ success, msg = await self.send_to_external_platform(project_id)
|
|
|
+ if success:
|
|
|
+ return True, project_id
|
|
|
+ else:
|
|
|
+ return False, f"任务已保存但推送失败: {msg}"
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.exception(f"创建项目失败: {e}")
|
|
|
+ conn.rollback()
|
|
|
+ return False, str(e)
|
|
|
+ finally:
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
|
|
|
async def get_project_progress(self, project_id: str) -> Dict[str, Any]:
|
|
|
"""获取项目进度统计 (单表化)"""
|
|
|
@@ -267,7 +427,7 @@ class TaskService:
|
|
|
try:
|
|
|
# 1. 获取任务记录
|
|
|
sql_tasks = """
|
|
|
- SELECT business_id as id, type, task_id, metadata as raw_metadata
|
|
|
+ SELECT business_id as id, type, task_id, tag, metadata
|
|
|
FROM t_task_management
|
|
|
WHERE project_id = %s
|
|
|
"""
|
|
|
@@ -279,7 +439,6 @@ class TaskService:
|
|
|
|
|
|
# 2. 解析基本信息
|
|
|
first_row = rows[0]
|
|
|
- project_name = project_id # 直接使用传入的项目名
|
|
|
internal_task_type = first_row['type']
|
|
|
remote_project_id = first_row.get('task_id') or project_id
|
|
|
|
|
|
@@ -289,91 +448,91 @@ class TaskService:
|
|
|
|
|
|
# 3. 处理数据
|
|
|
final_tasks = []
|
|
|
+ all_project_tags = set()
|
|
|
|
|
|
- # 批量获取 Milvus 内容的优化逻辑
|
|
|
+ # 针对 'data' 类型的批量 Milvus 查询优化
|
|
|
+ milvus_data_map = {}
|
|
|
if internal_task_type == 'data':
|
|
|
- # 先收集所有需要查询的 task_id
|
|
|
all_task_ids = [r['id'] for r in rows]
|
|
|
-
|
|
|
- # 尝试通过第一个任务获取知识库信息(假设同一个项目下的任务属于同一个知识库)
|
|
|
- # 如果项目跨知识库,这里可以进一步优化为按 kb_id 分组批量查
|
|
|
sql_kb = """
|
|
|
- SELECT kb.collection_name_parent, kb.collection_name_children, d.title
|
|
|
+ SELECT kb.collection_name_parent, kb.collection_name_children
|
|
|
FROM t_samp_document_main d
|
|
|
LEFT JOIN t_samp_knowledge_base kb ON d.kb_id = kb.id
|
|
|
WHERE d.id COLLATE utf8mb4_unicode_ci = %s COLLATE utf8mb4_unicode_ci
|
|
|
"""
|
|
|
cursor.execute(sql_kb, (all_task_ids[0],))
|
|
|
kb_info = cursor.fetchone()
|
|
|
-
|
|
|
- # 批量抓取 Milvus 内容
|
|
|
- milvus_data_map = {}
|
|
|
if kb_info:
|
|
|
milvus_data_map = self._get_milvus_content_batch(all_task_ids, kb_info)
|
|
|
|
|
|
- for item in rows:
|
|
|
- task_id = item['id']
|
|
|
- metadata = {}
|
|
|
- if item.get('raw_metadata'):
|
|
|
- try:
|
|
|
- metadata = json.loads(item['raw_metadata'])
|
|
|
- except: pass
|
|
|
-
|
|
|
+ for item in rows:
|
|
|
+ task_id = item['id']
|
|
|
+
|
|
|
+ # 提取并处理标签
|
|
|
+ doc_tags = []
|
|
|
+ if item.get('tag'):
|
|
|
+ try:
|
|
|
+ doc_tags = json.loads(item['tag']) if isinstance(item['tag'], str) else item['tag']
|
|
|
+ if doc_tags:
|
|
|
+ for t in doc_tags: all_project_tags.add(t)
|
|
|
+ except: pass
|
|
|
+
|
|
|
+ # 解析数据库元数据
|
|
|
+ db_metadata = {}
|
|
|
+ if item.get('metadata'):
|
|
|
+ try:
|
|
|
+ db_metadata = json.loads(item['metadata']) if isinstance(item['metadata'], str) else item['metadata']
|
|
|
+ except: pass
|
|
|
+
|
|
|
+ # 获取任务内容
|
|
|
+ task_contents = []
|
|
|
+ if internal_task_type == 'data':
|
|
|
task_contents = milvus_data_map.get(task_id, [])
|
|
|
if not task_contents:
|
|
|
- # 兜底:如果批量没查到,或者不是 Milvus 任务,尝试查 title
|
|
|
cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (task_id,))
|
|
|
res = cursor.fetchone()
|
|
|
if res: task_contents = [res['title']]
|
|
|
-
|
|
|
- # 拆分推送
|
|
|
- for idx, content in enumerate(task_contents):
|
|
|
- if not content: continue
|
|
|
- task_metadata = metadata.copy()
|
|
|
- task_metadata.update({"original_id": task_id, "chunk_index": idx})
|
|
|
- final_tasks.append({
|
|
|
- "id": f"{task_id}_{idx}" if len(task_contents) > 1 else task_id,
|
|
|
- "content": content,
|
|
|
- "metadata": task_metadata
|
|
|
- })
|
|
|
-
|
|
|
- elif internal_task_type == 'image':
|
|
|
- for item in rows:
|
|
|
- task_id = item['id']
|
|
|
- metadata = {}
|
|
|
- if item.get('raw_metadata'):
|
|
|
- try:
|
|
|
- metadata = json.loads(item['raw_metadata'])
|
|
|
- except: pass
|
|
|
-
|
|
|
+ elif internal_task_type == 'image':
|
|
|
cursor.execute("SELECT image_url FROM t_image_info WHERE id = %s", (task_id,))
|
|
|
res = cursor.fetchone()
|
|
|
- task_contents = []
|
|
|
if res:
|
|
|
img_url = res['image_url']
|
|
|
if img_url and not img_url.startswith('http'):
|
|
|
img_url = self.minio_manager.get_full_url(img_url)
|
|
|
task_contents = [img_url]
|
|
|
|
|
|
- for idx, content in enumerate(task_contents):
|
|
|
- if not content: continue
|
|
|
- task_metadata = metadata.copy()
|
|
|
- task_metadata.update({"original_id": task_id, "chunk_index": idx})
|
|
|
- final_tasks.append({
|
|
|
- "id": f"{task_id}_{idx}" if len(task_contents) > 1 else task_id,
|
|
|
- "content": content,
|
|
|
- "metadata": task_metadata
|
|
|
- })
|
|
|
+ # 构建最终任务列表
|
|
|
+ for idx, content in enumerate(task_contents):
|
|
|
+ if not content: continue
|
|
|
+
|
|
|
+ # 合并元数据:数据库数据 + 动态 ID
|
|
|
+ task_metadata = {
|
|
|
+ "original_id": task_id,
|
|
|
+ "chunk_index": idx
|
|
|
+ }
|
|
|
+ if db_metadata:
|
|
|
+ task_metadata.update(db_metadata)
|
|
|
+
|
|
|
+ if doc_tags:
|
|
|
+ task_metadata['tags'] = [{"tag": tag} for tag in doc_tags]
|
|
|
+
|
|
|
+ final_tasks.append({
|
|
|
+ "id": f"{task_id}_{idx}" if len(task_contents) > 1 else task_id,
|
|
|
+ "content": content,
|
|
|
+ "metadata": task_metadata
|
|
|
+ })
|
|
|
|
|
|
- return {
|
|
|
- "name": project_name,
|
|
|
+ # 统一进行一次递归序列化处理
|
|
|
+ return self._serialize_datetime({
|
|
|
+ "name": project_id,
|
|
|
"description": "",
|
|
|
"task_type": external_task_type,
|
|
|
"data": final_tasks,
|
|
|
- "external_id": remote_project_id
|
|
|
- }
|
|
|
+ "external_id": remote_project_id,
|
|
|
+ "tags": [{"tag": t} for t in sorted(list(all_project_tags))]
|
|
|
+ })
|
|
|
except Exception as e:
|
|
|
- logger.exception(f"导出数据异常: {e}")
|
|
|
+ logger.exception(f"导出项目数据异常: {e}")
|
|
|
return {}
|
|
|
finally:
|
|
|
cursor.close()
|
|
|
@@ -503,7 +662,13 @@ class TaskService:
|
|
|
from app.core.config import config_handler
|
|
|
api_url = config_handler.get('external_platform', 'API_URL', 'http://192.168.92.61:9003/api/external/projects/init')
|
|
|
# 转换 init URL 为 export URL
|
|
|
- export_url = api_url.replace('/init', f'/{remote_project_id}/export')
|
|
|
+ # 如果 URL 包含 /init,则替换为 /export,否则直接追加
|
|
|
+ if '/init' in api_url:
|
|
|
+ export_url = api_url.replace('/init', f'/{remote_project_id}/export')
|
|
|
+ else:
|
|
|
+ # 兼容不包含 /init 的情况,直接拼接
|
|
|
+ base_url = api_url.rstrip('/')
|
|
|
+ export_url = f"{base_url}/{remote_project_id}/export"
|
|
|
token = config_handler.get('external_platform', 'ADMIN_TOKEN', '')
|
|
|
|
|
|
# 3. 发送请求
|