chenkun 3 tuần trước cách đây
mục cha
commit
c57f4e345d

Những thai đổi đã bị hủy bỏ vì nó quá lớn
+ 1074 - 884
logs/lq-admin-app.log.1


Những thai đổi đã bị hủy bỏ vì nó quá lớn
+ 0 - 1239
logs/lq-admin-app.log.5


+ 12 - 0
scripts/fix_t_task_management_20260205.sql

@@ -0,0 +1,12 @@
+-- 2026-02-05 修复 t_task_management 表缺失字段 (优化版:统一使用 task_id)
+-- 如果你的数据库提示 "Unknown column 'ext_info'",请运行以下脚本
+
+ALTER TABLE `t_task_management` 
+ADD COLUMN `project_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL AFTER `task_id`,
+ADD COLUMN `annotation_status` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '标注状态' AFTER `type`,
+ADD COLUMN `annotation_progress` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '进度' AFTER `annotation_status`,
+ADD COLUMN `callback_url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT '回调地址' AFTER `annotation_progress`,
+ADD COLUMN `ext_info` json NULL COMMENT '扩展信息' AFTER `callback_url`,
+ADD COLUMN `create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间' AFTER `ext_info`,
+ADD COLUMN `update_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间' AFTER `create_time`,
+ADD UNIQUE INDEX `idx_unique_biz_type_proj`(`business_id`, `type`, `project_id`) USING BTREE;

+ 10 - 2
scripts/lq_db_dev_20260202.sql

@@ -637,7 +637,7 @@ INSERT INTO `t_sys_menu` VALUES ('basic-info-regulation', 'basic-info-main', 'ba
 INSERT INTO `t_sys_menu` VALUES ('basic-info-standard', 'basic-info-main', 'basic-info-standard', '施工标准规范', '/admin/basic-info/basis', 'basic-info/Index', 'Document', 1, 'menu', 0, 1, '编制依据管理', 'system', '2026-01-12 16:50:02', 'ed6a79d3-0083-4d81-8b48-fc522f686f74', '2026-01-30 19:25:45');
 INSERT INTO `t_sys_menu` VALUES ('chunk-management-btn', 'e30e6e95-d084-4365-a48b-0989662f7eb6', 'chunk-management', '知识片段管理', '/admin/documents/snippet', 'documents/KnowledgeSnippet', 'Star', 4, 'menu', 0, 1, NULL, 'system', '2026-01-15 14:28:16', 'ed6a79d3-0083-4d81-8b48-fc522f686f74', '2026-01-30 19:44:34');
 INSERT INTO `t_sys_menu` VALUES ('dashboard-main', NULL, 'dashboard', '仪表盘', '/dashboard', 'dashboard/Index', 'House', 1, 'menu', 0, 1, '系统概览和统计信息', 'system', '2026-01-06 17:46:30', 'system', '2026-01-06 17:46:30');
-INSERT INTO `t_sys_menu` VALUES ('dfccba5b-d715-4cc4-a421-e596fcab2ffb', 'e30e6e95-d084-4365-a48b-0989662f7eb6', 'admin-tasks', '任务管理中心', '/admin/tasks', 'admin/TaskManagement', 'List', 10, 'menu', 0, 1, NULL, NULL, '2026-01-27 13:57:46', 'ed6a79d3-0083-4d81-8b48-fc522f686f74', '2026-01-30 17:52:16');
+INSERT INTO `t_sys_menu` VALUES ('dfccba5b-d715-4cc4-a421-e596fcab2ffb', 'e30e6e95-d084-4365-a48b-0989662f7eb6', 'admin-tasks', '标注任务中心', '/admin/tasks', 'admin/TaskManagement', 'List', 10, 'menu', 0, 1, NULL, NULL, '2026-01-27 13:57:46', 'ed6a79d3-0083-4d81-8b48-fc522f686f74', '2026-01-30 17:52:16');
 INSERT INTO `t_sys_menu` VALUES ('document-change-btn', 'document-management', 'document-change', '文档转换', NULL, NULL, 'change', 2, 'button', 0, 1, NULL, 'system', '2026-01-09 15:51:43', 'system', '2026-01-30 19:04:20');
 INSERT INTO `t_sys_menu` VALUES ('document-delete-btn', 'document-management', 'document-delete', '删除文档', NULL, NULL, 'Delete', 4, 'button', 0, 1, '删除文档', 'system', '2026-01-06 17:46:30', 'system', '2026-01-30 19:04:20');
 INSERT INTO `t_sys_menu` VALUES ('document-edit-btn', 'document-management', 'document-edit', '编辑文档', NULL, NULL, NULL, 5, 'button', 0, 1, NULL, 'system', '2026-01-15 11:17:52', 'system', '2026-01-30 19:04:20');
@@ -1323,9 +1323,17 @@ CREATE TABLE `t_task_management`  (
   `id` int(0) NOT NULL AUTO_INCREMENT,
   `business_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
   `task_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
+  `project_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL,
   `type` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
+  `annotation_status` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '标注状态',
+  `annotation_progress` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '单个条目或项目的进度百分比',
+  `external_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '外部系统ID',
+  `callback_url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT '回调地址',
+  `ext_info` json NULL COMMENT '扩展信息',
+  `create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',
+  `update_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新时间',
   PRIMARY KEY (`id`) USING BTREE,
-  UNIQUE INDEX `idx_unique_business_type`(`business_id`, `type`) USING BTREE,
+  UNIQUE INDEX `idx_unique_biz_type_proj`(`business_id`, `type`, `project_id`) USING BTREE,
   INDEX `idx_business_id`(`business_id`) USING BTREE,
   INDEX `idx_type`(`type`) USING BTREE
 ) ENGINE = InnoDB AUTO_INCREMENT = 20 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = '任务管理表' ROW_FORMAT = Dynamic;

+ 2 - 1
src/app/base/milvus_connection.py

@@ -71,7 +71,8 @@ def get_milvus_vectorstore(collection_name: str, consistency_level: str = "Stron
             connection_args=connection_args,
             consistency_level=consistency_level,
             builtin_function=BM25BuiltInFunction(),
-            vector_field=vector_field_name
+            vector_field=vector_field_name,
+            auto_id=True
         )
         return vectorstore
     except Exception as e:

+ 2 - 2
src/app/base/minio_connection.py

@@ -58,12 +58,12 @@ class MinioManager:
         unique_id = str(uuid.uuid4())
         ext = os.path.splitext(filename)[1]
         
-        # 如果提供了 prefix,则使用 prefix,否则默认使用 uploads/YYYYMMDD
+        # 如果提供了 prefix,则使用 prefix,否则默认使用 image
         if prefix:
             # 确保 prefix 不以 / 开头或结尾,方便统一拼接
             folder_path = prefix.strip("/")
         else:
-            folder_path = f"uploads/{datetime.now().strftime('%Y%m%d')}"
+            folder_path = "image"
             
         object_name = f"{self.base_path}/{folder_path}/{unique_id}{ext}"
         

+ 6 - 1
src/app/config/config.ini

@@ -103,4 +103,9 @@ MINERU_API_BATCH_RESULT=https://mineru.net/api/v4/extract-results/batch/{}
 # embedding模型配置
 EMBEDDING_BASE_URL=http://192.168.91.253:9003/v1
 EMBEDDING_MODEL=Qwen3-Embedding-8B
-EMBEDDING_API_KEY=dummy
+EMBEDDING_API_KEY=dummy
+
+# 外部标注平台联动配置
+[external_platform]
+API_URL=http://192.168.92.61:9003/api/external/projects/init
+ADMIN_TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzIwMjYwMTI5MTUxMTM4XzkzYjIyMjZkIiwidXNlcm5hbWUiOiJhZG1pbiIsImVtYWlsIjoiYWRtaW5AZXhhbXBsZS5jb20iLCJyb2xlIjoiYWRtaW4iLCJleHAiOjEwNDEwMTg5NjYxLCJpYXQiOjE3NzAyNzYwNjEsInR5cGUiOiJhY2Nlc3MifQ.QW5qwNXop3Id4fVE9rpelTgdyGZUzMralQAFOVj4Mtw

+ 5 - 1
src/app/sample/schemas/sample_schemas.py

@@ -82,4 +82,8 @@ class UploadUrlRequest(BaseModel):
 class UploadUrlResponse(BaseModel):
     upload_url: str
     file_url: str
-    object_name: str
+    object_name: str
+
+class ExportRequest(BaseModel):
+    project_id: str
+    format: Optional[str] = "json"

+ 2 - 2
src/app/schemas/base.py

@@ -3,7 +3,7 @@
 """
 from pydantic import BaseModel, Field
 from typing import Optional, Any, Dict
-from datetime import datetime
+from datetime import datetime, timezone
 
 
 class BaseSchema(BaseModel):
@@ -64,4 +64,4 @@ class ApiResponse(BaseModel):
     code: int
     message: str
     data: Optional[Any] = None
-    timestamp: str
+    timestamp: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat())

+ 22 - 21
src/app/services/image_service.py

@@ -312,8 +312,8 @@ class ImageService:
             cursor.close()
             conn.close()
 
-    async def batch_add_to_task(self, image_ids: List[str], username: str) -> Tuple[bool, str]:
-        """批量将图片加入任务中心 (设置 whether_to_task = 1)"""
+    async def batch_add_to_task(self, image_ids: List[str], username: str, project_name: str) -> Tuple[bool, str]:
+        """批量将图片加入任务中心 (单表化)"""
         conn = get_db_connection()
         if not conn:
             return False, "数据库连接失败"
@@ -323,39 +323,40 @@ class ImageService:
             if not image_ids:
                 return False, "未指定要加入任务的图片 ID"
             
-            # 1. 查询哪些图片已经是 whether_to_task = 1
-            placeholders = ', '.join(['%s'] * len(image_ids))
-            check_sql = f"SELECT id FROM t_image_info WHERE id IN ({placeholders}) AND whether_to_task = 1"
-            cursor.execute(check_sql, image_ids)
-            already_added_ids = [row['id'] for row in cursor.fetchall()]
+            # 0. 直接使用项目名称作为项目 ID
+            project_id = project_name
             
-            # 2. 找出需要更新的图片 ID
-            ids_to_add = [img_id for img_id in image_ids if img_id not in already_added_ids]
+            # 1. 待处理图片 ID
+            ids_to_add = image_ids
             
-            if not ids_to_add:
-                return True, f"所选的 {len(image_ids)} 张图片均已在任务中心,无需重复操作"
-            
-            # 3. 更新 whether_to_task 状态
+            # 2. 更新 whether_to_task 状态
             add_placeholders = ', '.join(['%s'] * len(ids_to_add))
             sql = f"UPDATE t_image_info SET whether_to_task = 1, updated_by = %s, updated_time = NOW() WHERE id IN ({add_placeholders})"
             cursor.execute(sql, (username, *ids_to_add))
             
-            # 4. 确保这些图片都在 t_task_management 表中
+            # 3. 确保这些图片都在 t_task_management 表中 (单表逻辑)
             for img_id in ids_to_add:
                 try:
-                    await task_service.add_task(img_id, 'image')
+                    await task_service.add_task(
+                        business_id=img_id, 
+                        task_type='image', 
+                        project_id=project_id
+                    )
                 except Exception as e:
                     logger.error(f"确保图片 {img_id} 在任务中心记录失败: {e}")
 
             conn.commit()
+
+            # 4. 自动推送至外部标注平台
+            push_success, push_msg = await task_service.send_to_external_platform(project_id)
             
-            skipped_count = len(already_added_ids)
-            if skipped_count > 0:
-                message = f"成功将 {len(ids_to_add)} 张图片加入任务中心,另有 {skipped_count} 张图片已在任务中心,已自动跳过"
+            msg = f"成功将 {len(ids_to_add)} 张图片加入项目: {project_name}"
+            if push_success:
+                msg += f" (已推送: {push_msg})"
             else:
-                message = f"成功将 {len(ids_to_add)} 张图片加入任务中心"
-                
-            return True, message
+                msg += f" (推送失败: {push_msg})"
+
+            return True, msg
         except Exception as e:
             logger.exception("批量加入任务失败")
             if conn:

+ 151 - 54
src/app/services/milvus_service.py

@@ -39,56 +39,64 @@ class MilvusService:
             self.ensure_collection_exists(name)
 
     async def insert_knowledge(self, content: str, doc_info: Dict[str, Any]):
-        """将 Markdown 内容切分并入库 (支持父子段分表)"""
+        """
+        将转换后的 Markdown 内容切分并入库到 Milvus
+        """
         try:
             doc_id = doc_info.get("doc_id")
             file_name = doc_info.get("file_name")
-            doc_version = doc_info.get("doc_version", int(time.time()))
-            tags = doc_info.get("tags", "")
-            user_id = doc_info.get("user_id", "system")
-            
             kb_method = doc_info.get("kb_method")
             
+            # --- 提取层级信息 (Hierarchy) ---
+            from langchain_text_splitters import MarkdownHeaderTextSplitter
+            headers_to_split_on = [
+                ("#", "H1"),
+                ("##", "H2"),
+                ("###", "H3"),
+                ("####", "H4"),
+            ]
+            md_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
+            md_header_splits = md_splitter.split_text(content)
+
             if kb_method == "parent_child":
                 # --- 方案 A: 父子段分表入库 ---
                 parent_col = doc_info.get("collection_name_parent") or f"{doc_info.get('collection_name', PARENT_COLLECTION_NAME)}_parent"
                 child_col = doc_info.get("collection_name_children") or f"{doc_info.get('collection_name', CHILD_COLLECTION_NAME)}_child"
                 
                 from langchain_text_splitters import RecursiveCharacterTextSplitter
-                parent_splitter = RecursiveCharacterTextSplitter(
-                    chunk_size=1000,
-                    chunk_overlap=100
-                )
-                parent_chunks = parent_splitter.split_text(content)
+                parent_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
                 
                 parent_docs = []
                 child_docs = []
+                global_idx = 0
                 
-                for p_idx, p_content in enumerate(parent_chunks):
-                    # 生成唯一的 parent_id
-                    p_id = hashlib.sha1(f"{doc_id}_p_{p_idx}".encode()).hexdigest()
+                for split in md_header_splits:
+                    # 获取当前片段的层级路径
+                    hierarchy = " -> ".join([v for k, v in split.metadata.items()])
                     
-                    # 准备父段文档 (Metadata 不包含向量,仅用于检索回显)
-                    p_metadata = self._prepare_metadata(doc_info, p_id, p_idx, p_id)
-                    parent_docs.append(Document(page_content=p_content, metadata=p_metadata))
+                    # 对每个标题块进行父段切分
+                    split_parent_chunks = parent_splitter.split_text(split.page_content)
                     
-                    # 2. 在每个父段内部切分子段 (较小块)
-                    child_splitter = RecursiveCharacterTextSplitter(
-                        chunk_size=300,
-                        chunk_overlap=30
-                    )
-                    child_chunks = child_splitter.split_text(p_content)
-                    
-                    for c_idx, c_content in enumerate(child_chunks):
-                        # 子段的 parent_id 指向父段的 p_id
-                        c_metadata = self._prepare_metadata(doc_info, p_id, c_idx, p_id)
-                        child_docs.append(Document(page_content=c_content, metadata=c_metadata))
+                    for p_idx, p_content in enumerate(split_parent_chunks):
+                        p_id = hashlib.sha1(f"{doc_id}_p_{global_idx}_{p_idx}".encode()).hexdigest()
+                        p_metadata = self._prepare_metadata(doc_info, p_id, global_idx, p_id, hierarchy)
+                        parent_docs.append(Document(page_content=p_content, metadata=p_metadata))
+                        
+                        # 2. 在每个父段内部切分子段 (较小块)
+                        child_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=30)
+                        child_chunks = child_splitter.split_text(p_content)
+                        
+                        for c_idx, c_content in enumerate(child_chunks):
+                            c_id = hashlib.sha1(f"{doc_id}_c_{p_id}_{c_idx}".encode()).hexdigest()
+                            c_metadata = self._prepare_metadata(doc_info, c_id, c_idx, p_id, hierarchy)
+                            child_docs.append(Document(page_content=c_content, metadata=c_metadata))
+                        
+                        global_idx += 1
 
                 # 确保两个集合都存在
                 self.ensure_collection_exists(parent_col)
                 self.ensure_collection_exists(child_col)
                 
-                # 分别入库
                 if parent_docs:
                     get_milvus_vectorstore(parent_col).add_documents(parent_docs)
                 if child_docs:
@@ -98,55 +106,98 @@ class MilvusService:
             
             else:
                 # --- 常规单表入库逻辑 ---
-                target_collection = doc_info.get("collection_name_parent") or doc_info.get("collection_name") or PARENT_COLLECTION_NAME
+                # 修改逻辑:符号和长度的片段只进入子表,其他进入父表
+                if kb_method in ["length", "symbol"]:
+                    target_collection = doc_info.get("collection_name_children") or f"{doc_info.get('collection_name', CHILD_COLLECTION_NAME)}_child"
+                else:
+                    target_collection = doc_info.get("collection_name_parent") or doc_info.get("collection_name") or PARENT_COLLECTION_NAME
                 
-                chunks = []
-                # 获取切分参数
                 chunk_size = doc_info.get("chunk_size", 500)
                 separator = doc_info.get("separator", "。")
                 
                 from langchain_text_splitters import RecursiveCharacterTextSplitter
-                
                 if kb_method == "length":
-                    splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=int(chunk_size * 0.1))
-                    chunks = splitter.split_text(content)
+                    text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=int(chunk_size * 0.1))
                 elif kb_method == "symbol":
-                    # 符号切分优先使用传入的符号
-                    splitter = RecursiveCharacterTextSplitter(
+                    text_splitter = RecursiveCharacterTextSplitter(
                         separators=["\n\n", "\n", separator, ";", "!", "?", "!", "?", ";"],
                         chunk_size=chunk_size,
                         chunk_overlap=0
                     )
-                    chunks = splitter.split_text(content)
                 else:
-                    chunks = [p.strip() for p in re.split(r"\n\s*\n+", content) if p.strip()]
-                
-                if not chunks:
-                    logger.warning(f"Document {file_name} has no content chunks.")
-                    return
+                    text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=0)
 
                 documents = []
-                for idx, chunk in enumerate(chunks):
-                    p_id = hashlib.sha1(f"{doc_id}_{idx}".encode()).hexdigest()
-                    metadata = self._prepare_metadata(doc_info, p_id, idx, p_id)
-                    documents.append(Document(page_content=chunk, metadata=metadata))
+                global_idx = 0
+                for split in md_header_splits:
+                    hierarchy = " -> ".join([v for k, v in split.metadata.items()])
+                    split_chunks = text_splitter.split_text(split.page_content)
+                    
+                    for chunk in split_chunks:
+                        p_id = hashlib.sha1(f"{doc_id}_{global_idx}".encode()).hexdigest()
+                        metadata = self._prepare_metadata(doc_info, p_id, global_idx, p_id, hierarchy)
+                        documents.append(Document(page_content=chunk, metadata=metadata))
+                        global_idx += 1
+
+                if not documents:
+                    logger.warning(f"Document {file_name} has no content chunks.")
+                    return
 
                 self.ensure_collection_exists(target_collection)
                 get_milvus_vectorstore(target_collection).add_documents(documents)
-                
                 logger.info(f"Successfully inserted {len(documents)} chunks for {file_name} into {target_collection}")
 
         except Exception as e:
             logger.error(f"Error inserting knowledge into Milvus: {e}")
             raise
 
-    def _prepare_metadata(self, doc_info: Dict[str, Any], p_id: str, index: int, parent_ref_id: str) -> Dict[str, Any]:
+    def _prepare_metadata(self, doc_info: Dict[str, Any], p_id: str, index: int, parent_ref_id: str, hierarchy: str = "") -> Dict[str, Any]:
         """统一准备元数据"""
         doc_id = doc_info.get("doc_id")
         file_name = doc_info.get("file_name")
         doc_version = doc_info.get("doc_version", int(time.time()))
         tags = doc_info.get("tags", "")
         user_id = doc_info.get("user_id", "system")
+        source_type = doc_info.get("source_type")
+        business_meta = doc_info.get("business_metadata", {})
+        
+        # 基础元数据
+        metadata_dict = {
+            "file_name": file_name,
+            "doc_version": doc_version,
+            "hierarchy": hierarchy or "正文",
+            "chunk_id": p_id
+        }
+        
+        # 注入业务元数据
+        if source_type == 'standard':
+            metadata_dict.update({
+                "chinese_name": business_meta.get("chinese_name") or file_name,
+                "standard_number": business_meta.get("standard_number") or "无",
+                "issuing_authority": business_meta.get("issuing_authority") or "无",
+                "document_type": business_meta.get("document_type") or "未分类",
+                "professional_field": business_meta.get("professional_field") or "未分类",
+                "validity": business_meta.get("validity") or "现行",
+                "file_url": business_meta.get("file_url") or ""
+            })
+        elif source_type == 'construction_plan':
+            metadata_dict.update({
+                "plan_name": business_meta.get("plan_name") or file_name,
+                "project_name": business_meta.get("project_name") or "无",
+                "project_section": business_meta.get("project_section") or "无",
+                "compiling_unit": business_meta.get("compiling_unit") or "无",
+                "compiling_date": str(business_meta.get("compiling_date")) if business_meta.get("compiling_date") else "无",
+                "plan_summary": business_meta.get("plan_summary") or "无",
+                "plan_category": business_meta.get("plan_category") or "未分类",
+                "level_1_classification": business_meta.get("level_1_classification") or "施工方案",
+                "level_2_classification": business_meta.get("level_2_classification") or "未分类",
+                "level_3_classification": business_meta.get("level_3_classification") or "未分类",
+                "level_4_classification": business_meta.get("level_4_classification") or "未分类",
+                "file_url": business_meta.get("file_url") or ""
+            })
+        else:
+            # 默认兼容
+            metadata_dict.update(business_meta)
         
         return {
             "document_id": doc_id,
@@ -159,11 +210,7 @@ class MilvusService:
             "created_time": int(time.time() * 1000),
             "updated_by": user_id,
             "updated_time": int(time.time() * 1000),
-            "metadata": {
-                "file_name": file_name,
-                "doc_version": doc_version,
-                "outline_path": ""
-            }
+            "metadata": metadata_dict
         }
 
     def ensure_collection_exists(self, name: str):
@@ -252,7 +299,11 @@ class MilvusService:
             try:
                 self.client.create_index(collection_name=name, index_params=index_params)
             except Exception as e:
-                logger.error(f"Failed to create index for {name}: {e}")
+                # 忽略 "已经存在索引" 的错误
+                if "creating multiple indexes on same field is not supported" in str(e):
+                    logger.warning(f"Indexes already exist for {name}, skipping.")
+                else:
+                    logger.error(f"Failed to create index for {name}: {e}")
         
         self.client.load_collection(collection_name=name)
 
@@ -667,6 +718,52 @@ class MilvusService:
             "updated_time": updated_time,
         }
 
+    def get_chunks_by_document_id(self, collection_name: str, document_id: str) -> List[Dict[str, Any]]:
+        """
+        根据 document_id 获取所有片段
+        """
+        if not self.client.has_collection(collection_name):
+            logger.warning(f"Collection {collection_name} does not exist.")
+            return []
+
+        try:
+            # 确保集合已加载
+            self.client.load_collection(collection_name)
+            
+            # 构建过滤表达式
+            # 注意:某些集合可能 document_id 在 metadata JSON 中,某些可能在顶层
+            # 我们的 ensure_collection_exists 默认在顶层定义了 document_id
+            filter_expr = f'document_id == "{document_id}"'
+            
+            # 查询
+            res = self.client.query(
+                collection_name=collection_name,
+                filter=filter_expr,
+                output_fields=["*"],
+                limit=1000 # 假设一个文档不会超过1000个片段
+            )
+            
+            # 排序(按 index)
+            res.sort(key=lambda x: x.get("index", 0))
+            
+            return res
+        except Exception as e:
+            logger.error(f"Error getting chunks for document_id {document_id} from {collection_name}: {e}")
+            # 如果顶层查询失败,尝试查询 metadata JSON 内部
+            try:
+                filter_expr_json = f'metadata["document_id"] == "{document_id}"'
+                res = self.client.query(
+                    collection_name=collection_name,
+                    filter=filter_expr_json,
+                    output_fields=["*"],
+                    limit=1000
+                )
+                res.sort(key=lambda x: x.get("index", 0))
+                return res
+            except Exception as e2:
+                logger.error(f"Error querying metadata.document_id: {e2}")
+                return []
+
     def hybrid_search(self, collection_name: str, query_text: str,
                      top_k: int = 3, ranker_type: str = "weighted",
                      dense_weight: float = 0.7, sparse_weight: float = 0.3,

+ 173 - 81
src/app/services/sample_service.py

@@ -105,14 +105,15 @@ class SampleService:
 
     # ==================== 文档管理 ====================
     
-    async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str, kb_id: str = None, kb_method: str = None) -> Tuple[int, str]:
+    async def batch_enter_knowledge_base(self, doc_ids: List[str], username: str, kb_method: str = "general", chunk_size: int = 500, separator: str = "。") -> Tuple[int, str]:
         """批量将文档入库到知识库
         
         Args:
             doc_ids: 文档ID列表
             username: 操作人
-            kb_id: 知识库ID
             kb_method: 切分方法
+            chunk_size: 切分长度
+            separator: 切分符号
         """
         conn = get_db_connection()
         if not conn:
@@ -120,7 +121,6 @@ class SampleService:
         
         cursor = conn.cursor()
         success_count = 0
-        skipped_count = 0
         already_entered_count = 0
         failed_count = 0
         error_details = []
@@ -129,7 +129,7 @@ class SampleService:
             # 1. 获取所有选中选中的文档详情
             placeholders = ','.join(['%s']*len(doc_ids))
             fetch_sql = f"""
-                SELECT id, title, source_type, md_url, conversion_status, whether_to_enter, created_time 
+                SELECT id, title, source_type, md_url, conversion_status, whether_to_enter, created_time, kb_id 
                 FROM t_samp_document_main 
                 WHERE id IN ({placeholders})
             """
@@ -146,6 +146,7 @@ class SampleService:
                 status = doc.get('conversion_status')
                 whether_to_enter = doc.get('whether_to_enter', 0)
                 md_url = doc.get('md_url')
+                source_type = doc.get('source_type')
                 
                 # A. 检查是否已入库
                 if whether_to_enter == 1:
@@ -157,18 +158,48 @@ class SampleService:
                 # B. 检查转换状态
                 if status != 2:
                     reason = "尚未转换成功" if status == 0 else "正在转换中" if status == 1 else "转换失败"
-                    logger.warning(f"文档 {title}({doc_id}) 状态为 {status},跳过入库: {reason}")
-                    skipped_count += 1
+                    logger.warning(f"文档 {title}({doc_id}) 状态为 {status},入库失败: {reason}")
+                    failed_count += 1
                     error_details.append(f"· {title}: {reason}")
                     continue
                 
                 if not md_url:
-                    logger.warning(f"文档 {title}({doc_id}) 缺少 md_url,跳过入库")
-                    skipped_count += 1
+                    logger.warning(f"文档 {title}({doc_id}) 缺少 md_url,入库失败")
+                    failed_count += 1
                     error_details.append(f"· {title}: 转换结果地址丢失")
                     continue
                 
-                # B. 从 MinIO 获取 Markdown 内容
+                # C. 确定入库策略 (从数据库读取已绑定的知识库)
+                current_kb_id = doc.get('kb_id')
+                current_kb_method = kb_method  # 直接使用前端传来的切分方式
+
+                if not current_kb_id:
+                    logger.warning(f"文档 {title}({doc_id}) 未指定知识库,跳过入库")
+                    failed_count += 1
+                    error_details.append(f"· {title}: 未指定目标知识库")
+                    continue
+
+                if not current_kb_method:
+                    logger.warning(f"文档 {title}({doc_id}) 未指定切分方式,跳过入库")
+                    failed_count += 1
+                    error_details.append(f"· {title}: 未指定切分策略")
+                    continue
+
+                # 获取知识库信息 (collection_name_parent, collection_name_children)
+                kb_sql = "SELECT collection_name_parent, collection_name_children FROM t_samp_knowledge_base WHERE id = %s AND is_deleted = 0"
+                cursor.execute(kb_sql, (current_kb_id,))
+                kb_res = cursor.fetchone()
+                
+                if not kb_res:
+                    logger.warning(f"找不到指定的知识库: id={current_kb_id}")
+                    failed_count += 1
+                    error_details.append(f"· {title}: 指定的知识库不存在或已被删除")
+                    continue
+                
+                collection_name_parent = kb_res['collection_name_parent']
+                collection_name_children = kb_res['collection_name_children']
+                
+                # D. 从 MinIO 获取 Markdown 内容
                 try:
                     md_content = self.minio_manager.get_object_content(md_url)
                     if not md_content:
@@ -179,39 +210,63 @@ class SampleService:
                     error_details.append(f"· {title}: 读取云端文件失败")
                     continue
                 
-                # C. 调用 MilvusService 进行切分和入库
+                # E. 调用 MilvusService 进行切分和入库
                 try:
-                    # 如果有 kb_id,需要根据它获取 collection_name
-                    collection_name = None
-                    if kb_id:
-                        kb_sql = "SELECT collection_name FROM t_samp_knowledge_base WHERE id = %s"
-                        cursor.execute(kb_sql, (kb_id,))
-                        kb_res = cursor.fetchone()
-                        if kb_res:
-                            collection_name = kb_res['collection_name']
-                    
+                    # 获取业务元数据
+                    business_metadata = {}
+                    if source_type == 'standard':
+                        std_sql = """
+                            SELECT chinese_name, standard_number, issuing_authority, 
+                                   document_type, professional_field, validity, source_url as file_url
+                            FROM t_samp_standard_base_info 
+                            WHERE id = %s
+                        """
+                        cursor.execute(std_sql, (doc_id,))
+                        business_metadata = cursor.fetchone() or {}
+                    elif source_type == 'construction_plan':
+                        plan_sql = """
+                            SELECT plan_name, project_name, project_section, compiling_unit, 
+                                   compiling_date, plan_summary, plan_category, 
+                                   level_1_classification, level_2_classification, 
+                                   level_3_classification, level_4_classification,
+                                   '' as file_url
+                            FROM t_samp_construction_plan_base_info 
+                            WHERE id = %s
+                        """
+                        cursor.execute(plan_sql, (doc_id,))
+                        business_metadata = cursor.fetchone() or {}
+                        # 方案库的 file_url 也可以从主表取,或者这里补一个
+                        if not business_metadata.get('file_url'):
+                            business_metadata['file_url'] = doc.get('file_url', '')
+
                     # 准备元数据
+                    current_date = int(datetime.now().strftime('%Y%m%d'))
                     doc_info = {
                         "doc_id": doc_id,
-                        "doc_name": title,
-                        "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else 20260127,
-                        "tags": doc.get('source_type') or 'unknown',
+                        "file_name": title,
+                        "doc_version": int(doc['created_time'].strftime('%Y%m%d')) if doc.get('created_time') else current_date,
+                        "tags": "",
                         "user_id": username,  # 传递操作人作为 created_by
-                        "kb_id": kb_id,
-                        "kb_method": kb_method,
-                        "collection_name": collection_name
+                        "kb_id": current_kb_id,
+                        "kb_method": current_kb_method,
+                        "collection_name_parent": collection_name_parent,
+                        "collection_name_children": collection_name_children,
+                        "chunk_size": chunk_size,
+                        "separator": separator,
+                        "source_type": source_type,
+                        "business_metadata": business_metadata  # 注入业务元数据
                     }
                     await self.milvus_service.insert_knowledge(md_content, doc_info)
                     
-                    # D. 添加到任务管理中心 (类型为 data)
+                    # F. 添加到任务管理中心 (类型为 data)
                     try:
                         await task_service.add_task(doc_id, 'data')
                     except Exception as task_err:
                         logger.error(f"添加文档 {title} 到任务中心失败: {task_err}")
 
-                    # E. 更新数据库状态
+                    # G. 更新数据库状态
                     update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 1, kb_id = %s, kb_method = %s, updated_by = %s, updated_time = NOW() WHERE id = %s"
-                    cursor.execute(update_sql, (kb_id, kb_method, username, doc_id))
+                    cursor.execute(update_sql, (current_kb_id, current_kb_method, username, doc_id))
                     success_count += 1
                     
                 except Exception as milvus_err:
@@ -223,14 +278,12 @@ class SampleService:
             conn.commit()
             
             # 构造详细的消息
-            if success_count == len(doc_ids) and failed_count == 0 and skipped_count == 0 and already_entered_count == 0:
+            if success_count == len(doc_ids) and failed_count == 0 and already_entered_count == 0:
                 msg = f"✅ 入库成功!共处理 {success_count} 份文档。"
             else:
                 msg = f"📊 入库处理完成:\n· 成功:{success_count} 份\n"
                 if already_entered_count > 0:
                     msg += f"· 跳过:{already_entered_count} 份 (已入库)\n"
-                if skipped_count > 0:
-                    msg += f"· 跳过:{skipped_count} 份 (转换中或失败)\n"
                 if failed_count > 0:
                     msg += f"· 失败:{failed_count} 份\n"
             
@@ -248,8 +301,8 @@ class SampleService:
             cursor.close()
             conn.close()
     
-    async def batch_add_to_task(self, doc_ids: List[str], username: str) -> Tuple[bool, str]:
-        """批量将文档加入任务中心 (设置 whether_to_task = 1)"""
+    async def batch_add_to_task(self, doc_ids: List[str], username: str, project_name: str) -> Tuple[bool, str]:
+        """批量将文档加入标注任务中心 (单表化)"""
         conn = get_db_connection()
         if not conn:
             return False, "数据库连接失败"
@@ -259,40 +312,50 @@ class SampleService:
             if not doc_ids:
                 return False, "未指定要加入任务的文档 ID"
             
-            # 1. 查询哪些文档已经是 whether_to_task = 1
-            placeholders = ', '.join(['%s'] * len(doc_ids))
-            check_sql = f"SELECT id FROM t_samp_document_main WHERE id IN ({placeholders}) AND whether_to_task = 1"
-            cursor.execute(check_sql, doc_ids)
-            already_added_ids = [row['id'] for row in cursor.fetchall()]
-            
-            # 2. 找出需要更新的文档 ID
-            ids_to_add = [doc_id for doc_id in doc_ids if doc_id not in already_added_ids]
+            # 0. 直接使用项目名称作为项目 ID (根据用户要求,project_id 字段即 project_name)
+            project_id = project_name
             
-            if not ids_to_add:
-                return True, f"所选的 {len(doc_ids)} 份文档均已在任务中心,无需重复操作"
+            # 1. 过滤掉未入库的文档
+            placeholders = ', '.join(['%s'] * len(doc_ids))
+            check_entered_sql = f"SELECT id FROM t_samp_document_main WHERE id IN ({placeholders}) AND whether_to_enter = 1"
+            cursor.execute(check_entered_sql, doc_ids)
+            entered_ids = [row['id'] for row in cursor.fetchall()]
             
-            # 3. 更新 whether_to_task 状态
+            unentered_count = len(doc_ids) - len(entered_ids)
+            if not entered_ids:
+                return False, "所选文档均未入库,无法加入标注任务中心"
+
+            # 2. 更新 whether_to_task 状态
+            ids_to_add = entered_ids
             add_placeholders = ', '.join(['%s'] * len(ids_to_add))
             sql = f"UPDATE t_samp_document_main SET whether_to_task = 1, updated_by = %s, updated_time = NOW() WHERE id IN ({add_placeholders})"
             cursor.execute(sql, (username, *ids_to_add))
-            affected_rows = cursor.rowcount
             
-            # 4. 确保这些文档都在 t_task_management 表中
+            # 3. 写入任务管理表 (单表逻辑)
             for doc_id in ids_to_add:
                 try:
-                    await task_service.add_task(doc_id, 'data')
+                    await task_service.add_task(
+                        business_id=doc_id, 
+                        task_type='data', 
+                        project_id=project_id
+                    )
                 except Exception as e:
-                    logger.error(f"确保文档 {doc_id} 在任务中心记录失败: {e}")
-
+                    logger.error(f"添加文档 {doc_id} 到任务中心失败: {e}")
+            
             conn.commit()
+
+            # 4. 自动推送至外部标注平台
+            push_success, push_msg = await task_service.send_to_external_platform(project_id)
             
-            skipped_count = len(already_added_ids)
-            if skipped_count > 0:
-                message = f"成功将 {len(ids_to_add)} 份文档加入任务中心,另有 {skipped_count} 份文档已在任务中心,已自动跳过"
+            msg = f"成功将 {len(ids_to_add)} 份文档加入项目: {project_name}"
+            if push_success:
+                msg += f" (已推送: {push_msg})"
             else:
-                message = f"成功将 {len(ids_to_add)} 份文档加入任务中心"
-                
-            return True, message
+                msg += f" (推送失败: {push_msg})"
+
+            if unentered_count > 0:
+                msg += f",{unentered_count} 份文档因未入库被跳过"
+            return True, msg
         except Exception as e:
             logger.exception("批量加入任务失败")
             if conn:
@@ -385,8 +448,9 @@ class SampleService:
                     LEFT JOIN {sub_table} s ON m.id = s.id
                     LEFT JOIN t_sys_user u1 ON m.created_by = u1.id
                     LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id
+                    LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id
                 """
-                fields_sql = "m.*, s.*, u1.username as creator_name, u2.username as updater_name, m.id as id"
+                fields_sql = "m.*, s.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name, m.id as id"
                 where_clauses.append("m.source_type = %s")
                 params.append(table_type)
                 order_sql = "m.created_time DESC"
@@ -407,8 +471,8 @@ class SampleService:
                         where_clauses.append("s.level_4_classification = %s")
                         params.append(level_4_classification)
             else:
-                from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id"
-                fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name"
+                from_sql = "t_samp_document_main m LEFT JOIN t_sys_user u1 ON m.created_by = u1.id LEFT JOIN t_sys_user u2 ON m.updated_by = u2.id LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id"
+                fields_sql = "m.*, u1.username as creator_name, u2.username as updater_name, kb.name as kb_name"
                 order_sql = "m.created_time DESC"
                 title_field = "m.title"
             
@@ -431,7 +495,6 @@ class SampleService:
             sql = f"SELECT {fields_sql} FROM {from_sql} {where_sql} ORDER BY {order_sql} LIMIT %s OFFSET %s"
             params.extend([size, offset])
             
-            logger.info(f"Executing SQL: {sql} with params: {params}")
             cursor.execute(sql, tuple(params))
             items = [self._format_document_row(row) for row in cursor.fetchall()]
             
@@ -546,12 +609,13 @@ class SampleService:
                 INSERT INTO t_samp_document_main (
                     id, title, source_type, file_url, 
                     file_extension, created_by, updated_by, created_time, updated_time,
-                    conversion_status, whether_to_task
-                ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0)
+                    conversion_status, whether_to_task, kb_id
+                ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s)
                 """,
                 (
                     doc_id, doc_data.get('title'), table_type, file_url,
-                    doc_data.get('file_extension'), user_id, user_id
+                    doc_data.get('file_extension'), user_id, user_id,
+                    doc_data.get('kb_id')
                 )
             )
 
@@ -648,14 +712,14 @@ class SampleService:
             # 1. 更新主表
             cursor.execute(
                 """
-                UPDATE t_samp_document_main 
-                SET title = %s, file_url = %s, file_extension = %s,
-                    updated_by = %s, updated_time = NOW()
+                UPDATE t_samp_document_main SET 
+                    title = %s, file_url = %s, file_extension = %s, 
+                    updated_by = %s, updated_time = NOW(), kb_id = %s
                 WHERE id = %s
                 """,
                 (
                     doc_data.get('title'), file_url, doc_data.get('file_extension'),
-                    updater_id, doc_id
+                    updater_id, doc_data.get('kb_id'), doc_id
                 )
             )
 
@@ -754,7 +818,7 @@ class SampleService:
                     s.participating_units, s.reference_basis,
                     s.created_by, u1.username as creator_name, s.created_time,
                     s.updated_by, u2.username as updater_name, s.updated_time,
-                    m.file_url, m.conversion_status, m.md_url, m.json_url
+                    m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
                 """
                 field_map = {
                     'title': 's.chinese_name',
@@ -778,7 +842,7 @@ class SampleService:
                     s.note, 
                     s.created_by, u1.username as creator_name, s.created_time,
                     s.updated_by, u2.username as updater_name, s.updated_time,
-                    m.file_url, m.conversion_status, m.md_url, m.json_url
+                    m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
                 """
                 field_map = {
                     'title': 's.plan_name',
@@ -799,7 +863,7 @@ class SampleService:
                     s.note, 
                     s.created_by, u1.username as creator_name, s.created_time,
                     s.updated_by, u2.username as updater_name, s.updated_time,
-                    m.file_url, m.conversion_status, m.md_url, m.json_url
+                    m.file_url, m.conversion_status, m.md_url, m.json_url, m.kb_id, m.whether_to_enter
                 """
                 field_map = {
                     'title': 's.file_name',
@@ -860,11 +924,12 @@ class SampleService:
             
             # 使用 LEFT JOIN 关联主表和用户表获取姓名
             sql = f"""
-                SELECT {fields} 
+                SELECT {fields}, kb.name as kb_name
                 FROM {table_name} s
                 LEFT JOIN t_samp_document_main m ON s.id = m.id
                 LEFT JOIN t_sys_user u1 ON s.created_by = u1.id
                 LEFT JOIN t_sys_user u2 ON s.updated_by = u2.id
+                LEFT JOIN t_samp_knowledge_base kb ON m.kb_id = kb.id
                 {where_sql} 
                 ORDER BY s.created_time DESC 
                 LIMIT %s OFFSET %s
@@ -1008,12 +1073,12 @@ class SampleService:
                 INSERT INTO t_samp_document_main (
                     id, title, source_type, file_url, 
                     file_extension, created_by, updated_by, created_time, updated_time,
-                    conversion_status, whether_to_task
-                ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0)
+                    conversion_status, whether_to_task, kb_id
+                ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW(), 0, 0, %s)
                 """,
                 (
                     doc_id, data.get('title'), type, file_url,
-                    file_extension, user_id, user_id
+                    file_extension, user_id, user_id, data.get('kb_id')
                 )
             )
             
@@ -1122,10 +1187,10 @@ class SampleService:
             cursor.execute(
                 """
                 UPDATE t_samp_document_main 
-                SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW()
+                SET title = %s, file_url = %s, file_extension = %s, updated_by = %s, updated_time = NOW(), kb_id = %s
                 WHERE id = %s
                 """,
-                (data.get('title'), file_url, file_extension, updater_id, doc_id)
+                (data.get('title'), file_url, file_extension, updater_id, data.get('kb_id'), doc_id)
             )
 
             # 2. 更新子表 (移除 file_url)
@@ -1200,6 +1265,10 @@ class SampleService:
 
     async def delete_basic_info(self, type: str, doc_id: str) -> Tuple[bool, str]:
         """删除基本信息"""
+        if not doc_id:
+            return False, "缺少 ID 参数"
+            
+        logger.info(f"Deleting basic info: type={type}, id={doc_id}")
         conn = get_db_connection()
         if not conn:
             return False, "数据库连接失败"
@@ -1210,21 +1279,44 @@ class SampleService:
             if not table_name:
                 return False, "无效的类型"
             
-            # 1. 删除主表记录 (由于设置了 ON DELETE CASCADE,子表记录会自动删除)
+            # 1. 显式删除子表记录 (防止 CASCADE 未生效)
+            try:
+                cursor.execute(f"DELETE FROM {table_name} WHERE id = %s", (doc_id,))
+                logger.info(f"Deleted from sub-table {table_name}, affected: {cursor.rowcount}")
+            except Exception as sub_e:
+                logger.warning(f"删除子表 {table_name} 记录失败 (可能不存在): {sub_e}")
+
+            # 2. 同步删除任务管理中心的数据 (优先删除关联数据)
+            try:
+                # 使用当前事务删除任务记录(如果 task_service 支持的话,目前它自建连接)
+                # 这里我们直接在当前 cursor 中也执行一次,确保事务一致性
+                cursor.execute("DELETE FROM t_task_management WHERE business_id = %s", (doc_id,))
+                logger.info(f"Deleted from t_task_management, affected: {cursor.rowcount}")
+            except Exception as task_e:
+                logger.warning(f"在主事务中删除任务记录失败: {task_e}")
+
+            # 3. 删除主表记录
             cursor.execute("DELETE FROM t_samp_document_main WHERE id = %s", (doc_id,))
+            affected_main = cursor.rowcount
+            logger.info(f"Deleted from t_samp_document_main, affected: {affected_main}")
             
-            # 同步删除任务管理中心的数据
+            if affected_main == 0:
+                logger.warning(f"未找到主表记录: {doc_id}")
+                # 即使主表没找到,我们也 commit 之前的操作并返回成功(幂等性)
+            
+            conn.commit()
+            
+            # 4. 再次确保任务中心数据已删除 (调用原有服务)
             try:
                 await task_service.delete_task(doc_id)
             except Exception as task_err:
-                logger.error(f"同步删除任务中心数据失败 (ID: {doc_id}): {task_err}")
+                logger.error(f"调用 task_service 删除任务失败: {task_err}")
 
-            conn.commit()
             return True, "删除成功"
         except Exception as e:
-            logger.exception("删除基本信息失败")
+            logger.exception(f"删除基本信息异常 (ID: {doc_id})")
             conn.rollback()
-            return False, str(e)
+            return False, f"删除失败: {str(e)}"
         finally:
             cursor.close()
             conn.close()

+ 58 - 0
src/app/services/snippet_service.py

@@ -287,6 +287,64 @@ class SnippetService:
 
         return items, meta
 
+    async def get_chunks_by_document(self, doc_id: str) -> List[Dict[str, Any]]:
+        """
+        提取单文档的所有知识片段
+        """
+        # 1. 从数据库中获取文档所属的知识库集合名称
+        async with get_db_connection() as db:
+            from app.sample.models.base_info import DocumentMain
+            from app.sample.models.knowledge_base import KnowledgeBase
+            from sqlalchemy import select
+            
+            # 联表查询获取 collection_name_parent
+            stmt = select(KnowledgeBase.collection_name_parent).join(
+                DocumentMain, DocumentMain.kb_id == KnowledgeBase.id
+            ).where(DocumentMain.id == doc_id)
+            
+            result = await db.execute(stmt)
+            collection_name = result.scalar()
+            
+            if not collection_name:
+                # 尝试查 collection_name_children
+                stmt_child = select(KnowledgeBase.collection_name_children).join(
+                    DocumentMain, DocumentMain.kb_id == KnowledgeBase.id
+                ).where(DocumentMain.id == doc_id)
+                result_child = await db.execute(stmt_child)
+                collection_name = result_child.scalar()
+                
+            if not collection_name:
+                # 如果还是没有,尝试从文档详情获取 kb_id 并查询
+                stmt_doc = select(DocumentMain.kb_id).where(DocumentMain.id == doc_id)
+                res_doc = await db.execute(stmt_doc)
+                kb_id = res_doc.scalar()
+                if not kb_id:
+                    return []
+                
+                stmt_kb = select(KnowledgeBase.collection_name_parent).where(KnowledgeBase.id == kb_id)
+                res_kb = await db.execute(stmt_kb)
+                collection_name = res_kb.scalar()
+
+        if not collection_name:
+            return []
+
+        # 2. 调用 MilvusService 获取片段
+        chunks = milvus_service.get_chunks_by_document_id(collection_name, doc_id)
+        
+        # 3. 格式化输出
+        formatted_chunks = []
+        for chunk in chunks:
+            formatted_chunks.append({
+                "id": chunk.get("pk") or chunk.get("id"),
+                "content": chunk.get("text") or chunk.get("content") or chunk.get("page_content"),
+                "index": chunk.get("index"),
+                "document_id": chunk.get("document_id"),
+                "metadata": chunk.get("metadata") or {},
+                "created_time": chunk.get("created_time")
+            })
+            
+        return formatted_chunks
+
     async def create(self, db: AsyncSession, payload: Any) -> Dict:
         """创建知识片段"""
         # 使用统一算法生成向量

+ 461 - 29
src/app/services/task_service.py

@@ -1,5 +1,7 @@
 
 import logging
+import json
+import httpx
 from typing import List, Dict, Any, Tuple, Optional
 from app.base.async_mysql_connection import get_db_connection
 from app.base.minio_connection import get_minio_manager
@@ -31,7 +33,10 @@ class TaskService:
                         t.id, 
                         t.business_id,
                         t.task_id, 
+                        t.project_id,
                         t.type,
+                        t.annotation_status,
+                        t.project_id as project_name,
                         d.title as name
                     FROM t_task_management t
                     JOIN t_samp_document_main d ON t.business_id COLLATE utf8mb4_unicode_ci = d.id COLLATE utf8mb4_unicode_ci
@@ -45,7 +50,10 @@ class TaskService:
                         t.id, 
                         t.business_id,
                         t.task_id, 
+                        t.project_id,
                         t.type,
+                        t.annotation_status,
+                        t.project_id as project_name,
                         i.image_name as name,
                         i.image_url
                     FROM t_task_management t
@@ -59,11 +67,15 @@ class TaskService:
             cursor.execute(sql)
             tasks = cursor.fetchall()
             
-            # 如果是图片类型,处理 URL 转换以支持前端预览
-            if task_type == 'image':
-                for item in tasks:
-                    if item.get('image_url'):
-                        item['image_url'] = self.minio_manager.get_full_url(item['image_url'])
+            for item in tasks:
+                # 统一返回结构,旧代码可能还在找 metadata 字典,这里给个空的
+                item['metadata'] = {}
+                
+                # 如果是图片类型,处理 URL 转换以支持前端预览
+                if task_type == 'image' and item.get('image_url'):
+                    image_url = item.get('image_url')
+                    if image_url and not image_url.startswith(('http://', 'https://')):
+                        item['image_url'] = self.minio_manager.get_full_url(image_url)
             
             return tasks
         except Exception as e:
@@ -73,43 +85,31 @@ class TaskService:
             cursor.close()
             conn.close()
 
-    async def add_task(self, business_id: str, task_type: str, task_id: str = None) -> Tuple[bool, str, Optional[int]]:
-        """添加或更新任务记录
-        
-        Returns:
-            Tuple[bool, str, Optional[int]]: (是否成功, 消息, 记录的自增id)
-        """
+    async def add_task(self, business_id: str, task_type: str, task_id: str = None, project_id: str = None) -> Tuple[bool, str, Optional[int]]:
+        """添加或更新任务记录 (适配单表结构,直接使用 project_id 存项目名)"""
         conn = get_db_connection()
         if not conn:
             return False, "数据库连接失败", None
         
         cursor = conn.cursor()
         try:
-            # 1. 插入或更新记录
-            # 使用 business_id 作为唯一标识(业务主键),id 为数据库自增主键
             sql = """
-                INSERT INTO t_task_management (business_id, task_id, type)
-                VALUES (%s, %s, %s)
-                ON DUPLICATE KEY UPDATE task_id = VALUES(task_id)
+                INSERT INTO t_task_management (business_id, task_id, project_id, type, annotation_status)
+                VALUES (%s, %s, %s, %s, %s)
+                ON DUPLICATE KEY UPDATE 
+                    task_id = IFNULL(VALUES(task_id), task_id),
+                    project_id = IFNULL(VALUES(project_id), project_id),
+                    annotation_status = IFNULL(VALUES(annotation_status), annotation_status)
             """
-            cursor.execute(sql, (business_id, task_id, task_type))
-            
-            # 2. 获取当前记录的 id (如果是更新,lastrowid 也是有效的)
+            cursor.execute(sql, (business_id, task_id, project_id, task_type, 'pending'))
             record_id = cursor.lastrowid
             
-            # 如果是更新且 lastrowid 没拿到,通过 business_id 查一下
-            if not record_id:
-                cursor.execute("SELECT id FROM t_task_management WHERE business_id = %s", (business_id,))
-                res = cursor.fetchone()
-                if res:
-                    record_id = res['id']
-
             conn.commit()
-            return True, "添加成功", record_id
+            return True, "成功", record_id
         except Exception as e:
-            logger.exception(f"添加任务记录失败: {e}")
             conn.rollback()
-            return False, f"添加失败: {str(e)}", None
+            logger.exception(f"添加任务失败: {e}")
+            return False, str(e), None
         finally:
             cursor.close()
             conn.close()
@@ -154,4 +154,436 @@ class TaskService:
             cursor.close()
             conn.close()
 
+    # create_anno_project 已被废弃,改为单表 batch_add 逻辑
+
+    async def get_project_progress(self, project_id: str) -> Dict[str, Any]:
+        """获取项目进度统计 (单表化)"""
+        conn = get_db_connection()
+        if not conn:
+            return {}
+        
+        cursor = conn.cursor()
+        try:
+            # 统计各状态数量
+            sql = """
+                SELECT 
+                    annotation_status, 
+                    COUNT(*) as count 
+                FROM t_task_management 
+                WHERE project_id = %s 
+                GROUP BY annotation_status
+            """
+            cursor.execute(sql, (project_id,))
+            stats = cursor.fetchall()
+            
+            if not stats:
+                return {}
+
+            total = sum(s['count'] for s in stats)
+            completed = sum(s['count'] for s in stats if s['annotation_status'] == 'completed')
+            
+            return {
+                "project_id": project_id,
+                "total": total,
+                "completed": completed,
+                "progress": f"{round(completed/total*100, 2)}%" if total > 0 else "0%",
+                "details": stats
+            }
+        except Exception as e:
+            logger.error(f"查询进度失败: {e}")
+            return {}
+        finally:
+            cursor.close()
+            conn.close()
+
+    def _get_milvus_content(self, task_id: str, kb_info: Dict[str, Any]) -> List[str]:
+        """
+        从 Milvus 获取文档分片内容
+        """
+        if not kb_info:
+            return []
+
+        # 优先使用数据库存储的子表名,父表名作为兜底
+        collections = [c for c in [kb_info.get('collection_name_children'), kb_info.get('collection_name_parent')] if c]
+        
+        if not collections:
+            return []
+
+        from app.services.milvus_service import milvus_service
+        contents = []
+        
+        # 简单的全局缓存,用于存储集合的字段探测结果,减少 describe_collection 调用
+        if not hasattr(self, '_collection_schema_cache'):
+            self._collection_schema_cache = {}
+
+        for coll_name in collections:
+            try:
+                if not milvus_service.client.has_collection(coll_name):
+                    continue
+                
+                # 获取或更新缓存的字段名
+                if coll_name not in self._collection_schema_cache:
+                    schema = milvus_service.client.describe_collection(coll_name)
+                    field_names = [f['name'] for f in schema.get('fields', [])]
+                    self._collection_schema_cache[coll_name] = {
+                        "id": "document_id", # 统一使用 document_id
+                        "content": "text" if "text" in field_names else "content"
+                    }
+                
+                fields = self._collection_schema_cache[coll_name]
+                id_field = fields["id"]
+                content_field = fields["content"]
+                
+                res = milvus_service.client.query(
+                    collection_name=coll_name,
+                    filter=f'{id_field} == "{task_id}"',
+                    output_fields=[content_field]
+                )
+                
+                if res:
+                    for s in res:
+                        val = s.get(content_field)
+                        if val: contents.append(val)
+                    
+                    if contents:
+                        return contents
+            except Exception as e:
+                logger.error(f"查询 Milvus 集合 {coll_name} 异常: {e}")
+                continue
+        
+        return []
+
+    async def export_project_data(self, project_id: str, conn=None) -> Dict[str, Any]:
+        """导出项目数据为标注平台要求的格式 (单表化)"""
+        should_close = False
+        if not conn:
+            conn = get_db_connection()
+            should_close = True
+            
+        if not conn:
+            return {}
+        
+        cursor = conn.cursor()
+        try:
+            # 1. 获取任务记录
+            sql_tasks = """
+                SELECT business_id as id, type, task_id, metadata as raw_metadata
+                FROM t_task_management
+                WHERE project_id = %s
+            """
+            cursor.execute(sql_tasks, (project_id,))
+            rows = cursor.fetchall()
+            
+            if not rows:
+                return {}
+            
+            # 2. 解析基本信息
+            first_row = rows[0]
+            project_name = project_id  # 直接使用传入的项目名
+            internal_task_type = first_row['type']
+            remote_project_id = first_row.get('task_id') or project_id
+            
+            # 映射任务类型
+            type_map = {'data': 'text_classification', 'image': 'image_classification'}
+            external_task_type = type_map.get(internal_task_type, internal_task_type)
+
+            # 3. 处理数据
+            final_tasks = []
+            
+            # 批量获取 Milvus 内容的优化逻辑
+            if internal_task_type == 'data':
+                # 先收集所有需要查询的 task_id
+                all_task_ids = [r['id'] for r in rows]
+                
+                # 尝试通过第一个任务获取知识库信息(假设同一个项目下的任务属于同一个知识库)
+                # 如果项目跨知识库,这里可以进一步优化为按 kb_id 分组批量查
+                sql_kb = """
+                    SELECT kb.collection_name_parent, kb.collection_name_children, d.title
+                    FROM t_samp_document_main d
+                    LEFT JOIN t_samp_knowledge_base kb ON d.kb_id = kb.id
+                    WHERE d.id COLLATE utf8mb4_unicode_ci = %s COLLATE utf8mb4_unicode_ci
+                """
+                cursor.execute(sql_kb, (all_task_ids[0],))
+                kb_info = cursor.fetchone()
+                
+                # 批量抓取 Milvus 内容
+                milvus_data_map = {}
+                if kb_info:
+                    milvus_data_map = self._get_milvus_content_batch(all_task_ids, kb_info)
+
+                for item in rows:
+                    task_id = item['id']
+                    metadata = {}
+                    if item.get('raw_metadata'):
+                        try:
+                            metadata = json.loads(item['raw_metadata'])
+                        except: pass
+                    
+                    task_contents = milvus_data_map.get(task_id, [])
+                    if not task_contents:
+                        # 兜底:如果批量没查到,或者不是 Milvus 任务,尝试查 title
+                        cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (task_id,))
+                        res = cursor.fetchone()
+                        if res: task_contents = [res['title']]
+                    
+                    # 拆分推送
+                    for idx, content in enumerate(task_contents):
+                        if not content: continue
+                        task_metadata = metadata.copy()
+                        task_metadata.update({"original_id": task_id, "chunk_index": idx})
+                        final_tasks.append({
+                            "id": f"{task_id}_{idx}" if len(task_contents) > 1 else task_id,
+                            "content": content,
+                            "metadata": task_metadata
+                        })
+            
+            elif internal_task_type == 'image':
+                for item in rows:
+                    task_id = item['id']
+                    metadata = {}
+                    if item.get('raw_metadata'):
+                        try:
+                            metadata = json.loads(item['raw_metadata'])
+                        except: pass
+                        
+                    cursor.execute("SELECT image_url FROM t_image_info WHERE id = %s", (task_id,))
+                    res = cursor.fetchone()
+                    task_contents = []
+                    if res:
+                        img_url = res['image_url']
+                        if img_url and not img_url.startswith('http'):
+                            img_url = self.minio_manager.get_full_url(img_url)
+                        task_contents = [img_url]
+
+                    for idx, content in enumerate(task_contents):
+                        if not content: continue
+                        task_metadata = metadata.copy()
+                        task_metadata.update({"original_id": task_id, "chunk_index": idx})
+                        final_tasks.append({
+                            "id": f"{task_id}_{idx}" if len(task_contents) > 1 else task_id,
+                            "content": content,
+                            "metadata": task_metadata
+                        })
+
+            return {
+                "name": project_name,
+                "description": "",
+                "task_type": external_task_type,
+                "data": final_tasks,
+                "external_id": remote_project_id
+            }
+        except Exception as e:
+            logger.exception(f"导出数据异常: {e}")
+            return {}
+        finally:
+            cursor.close()
+            if should_close:
+                conn.close()
+
+    def _get_milvus_content_batch(self, task_ids: List[str], kb_info: Dict[str, Any]) -> Dict[str, List[str]]:
+        """
+        批量从 Milvus 获取文档分片内容
+        """
+        if not kb_info or not task_ids:
+            return {}
+
+        collections = [c for c in [kb_info.get('collection_name_children'), kb_info.get('collection_name_parent')] if c]
+        if not collections:
+            return {}
+
+        from app.services.milvus_service import milvus_service
+        result_map = {tid: [] for tid in task_ids}
+        
+        if not hasattr(self, '_collection_schema_cache'):
+            self._collection_schema_cache = {}
+
+        for coll_name in collections:
+            try:
+                if not milvus_service.client.has_collection(coll_name):
+                    continue
+                
+                if coll_name not in self._collection_schema_cache:
+                    schema = milvus_service.client.describe_collection(coll_name)
+                    field_names = [f['name'] for f in schema.get('fields', [])]
+                    self._collection_schema_cache[coll_name] = {
+                        "id": "document_id",
+                        "content": "text" if "text" in field_names else "content"
+                    }
+                
+                fields = self._collection_schema_cache[coll_name]
+                id_field = fields["id"]
+                content_field = fields["content"]
+                
+                # 使用 in 表达式进行批量查询
+                # 注意:如果 task_ids 非常多,可能需要分批(如每批 100 个)
+                id_list_str = ", ".join([f'"{tid}"' for tid in task_ids])
+                res = milvus_service.client.query(
+                    collection_name=coll_name,
+                    filter=f'{id_field} in [{id_list_str}]',
+                    output_fields=[id_field, content_field]
+                )
+                
+                if res:
+                    for s in res:
+                        tid = s.get(id_field)
+                        val = s.get(content_field)
+                        if tid in result_map and val:
+                            result_map[tid].append(val)
+                    
+                    # 如果当前集合已经查到了内容,就不再查兜底集合(逻辑同单条查询)
+                    if any(result_map.values()):
+                        return result_map
+            except Exception as e:
+                logger.error(f"批量查询 Milvus 集合 {coll_name} 异常: {e}")
+                continue
+        
+        return result_map
+
+    async def get_project_progress(self, project_id: str) -> Dict[str, Any]:
+        """获取外部标注项目的进度"""
+        conn = get_db_connection()
+        if not conn:
+            return {"error": "数据库连接失败"}
+            
+        try:
+            # 1. 查询 remote_project_id (task_id)
+            cursor = conn.cursor()
+            cursor.execute("SELECT task_id FROM t_task_management WHERE project_id = %s AND task_id IS NOT NULL LIMIT 1", (project_id,))
+            row = cursor.fetchone()
+            cursor.close()
+            
+            if not row or not row['task_id']:
+                return {"error": "未找到已推送的外部项目ID"}
+                
+            remote_project_id = row['task_id']
+            
+            # 2. 获取配置
+            from app.core.config import config_handler
+            api_url = config_handler.get('external_platform', 'API_URL', 'http://192.168.92.61:9003/api/external/projects/init')
+            # 转换 init URL 为 progress URL
+            progress_url = api_url.replace('/init', f'/{remote_project_id}/progress')
+            token = config_handler.get('external_platform', 'ADMIN_TOKEN', '')
+
+            # 3. 发送请求
+            async with httpx.AsyncClient(timeout=10.0) as client:
+                headers = {"Authorization": f"Bearer {token}"}
+                response = await client.get(progress_url, headers=headers)
+                
+                if response.status_code == 200:
+                    return response.json()
+                else:
+                    logger.error(f"查询进度失败: {response.status_code} - {response.text}")
+                    return {"error": f"外部平台返回错误 ({response.status_code})"}
+                    
+        except Exception as e:
+            logger.exception(f"查询进度异常: {e}")
+            return {"error": str(e)}
+        finally:
+            conn.close()
+
+    async def export_labeled_data(self, project_id: str, export_format: str = 'json') -> Dict[str, Any]:
+        """触发外部标注项目的数据导出"""
+        conn = get_db_connection()
+        if not conn:
+            return {"error": "数据库连接失败"}
+            
+        try:
+            # 1. 查询 remote_project_id (task_id)
+            cursor = conn.cursor()
+            cursor.execute("SELECT task_id FROM t_task_management WHERE project_id = %s AND task_id IS NOT NULL LIMIT 1", (project_id,))
+            row = cursor.fetchone()
+            cursor.close()
+            
+            if not row or not row['task_id']:
+                return {"error": "未找到已推送的外部项目ID"}
+                
+            remote_project_id = row['task_id']
+            
+            # 2. 获取配置
+            from app.core.config import config_handler
+            api_url = config_handler.get('external_platform', 'API_URL', 'http://192.168.92.61:9003/api/external/projects/init')
+            # 转换 init URL 为 export URL
+            export_url = api_url.replace('/init', f'/{remote_project_id}/export')
+            token = config_handler.get('external_platform', 'ADMIN_TOKEN', '')
+
+            # 3. 发送请求
+            async with httpx.AsyncClient(timeout=30.0) as client:
+                headers = {
+                    "Authorization": f"Bearer {token}",
+                    "Content-Type": "application/json"
+                }
+                payload = {
+                    "format": export_format,
+                    "completed_only": True
+                }
+                response = await client.post(export_url, json=payload, headers=headers)
+                
+                if response.status_code in (200, 201):
+                    return response.json()
+                else:
+                    logger.error(f"导出数据失败: {response.status_code} - {response.text}")
+                    return {"error": f"外部平台返回错误 ({response.status_code})"}
+                    
+        except Exception as e:
+            logger.exception(f"导出数据异常: {e}")
+            return {"error": str(e)}
+        finally:
+            conn.close()
+
+    async def send_to_external_platform(self, project_id: str) -> Tuple[bool, str]:
+        """将项目数据推送至外部标注平台 (单表化)"""
+        conn = get_db_connection()
+        if not conn:
+            return False, "数据库连接失败"
+            
+        try:
+            # 1. 准备数据 (复用数据库连接)
+            payload = await self.export_project_data(project_id=project_id, conn=conn)
+            
+            if not payload:
+                return False, "项目导出失败,请检查项目ID是否正确"
+                
+            if not payload.get('data'):
+                return False, f"项目数据为空 (查询到0条有效任务),无法推送"
+            
+            # 2. 获取配置
+            from app.core.config import config_handler
+            api_url = config_handler.get('external_platform', 'API_URL', 'http://192.168.92.61:9003/api/external/projects/init')
+            token = config_handler.get('external_platform', 'ADMIN_TOKEN', '')
+
+            # 3. 发送请求
+            async with httpx.AsyncClient(timeout=60.0) as client:
+                headers = {
+                    "Authorization": f"Bearer {token}",
+                    "Content-Type": "application/json"
+                }
+                logger.info(f"正在推送项目 {project_id} 至外部平台: {api_url}, 数据条数: {len(payload['data'])}")
+                response = await client.post(api_url, json=payload, headers=headers)
+                
+                if response.status_code in (200, 201):
+                    res_data = response.json()
+                    remote_project_id = res_data.get('project_id')
+                    
+                    if remote_project_id:
+                        # 4. 回写外部项目 ID (复用当前连接)
+                        cursor = conn.cursor()
+                        cursor.execute(
+                            "UPDATE t_task_management SET task_id = %s WHERE project_id = %s", 
+                            (remote_project_id, project_id)
+                        )
+                        conn.commit()
+                        cursor.close()
+                    
+                    return True, f"推送成功!外部项目ID: {remote_project_id or '未知'}"
+                else:
+                    error_msg = response.text
+                    logger.error(f"推送失败: {response.status_code} - {error_msg}")
+                    return False, f"外部平台返回错误 ({response.status_code})"
+                    
+        except Exception as e:
+            logger.exception(f"推送至外部平台异常: {e}")
+            return False, f"推送异常: {str(e)}"
+        finally:
+            conn.close()
+
 task_service = TaskService()

+ 2 - 1
src/views/image_view.py

@@ -40,6 +40,7 @@ class UploadUrlRequest(BaseModel):
 
 class BatchAddRequest(BaseModel):
     ids: List[str]
+    project_name: str
 
 # --- 分类管理 API ---
 
@@ -209,7 +210,7 @@ async def batch_add_to_task(req: BatchAddRequest, credentials: HTTPAuthorization
         username = payload.get("username", user_id)
         
         service = ImageService()
-        success, message = await service.batch_add_to_task(req.ids, username)
+        success, message = await service.batch_add_to_task(req.ids, username, req.project_name)
         
         return ApiResponse(
             code=0 if success else 500, 

+ 77 - 8
src/views/sample_view.py

@@ -11,7 +11,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request, Response, Backgr
 from fastapi.responses import HTMLResponse
 from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
 
-from app.sample.schemas.sample_schemas import BatchEnterRequest, BatchDeleteRequest, ConvertRequest, DocumentAdd, UploadUrlRequest
+from app.sample.schemas.sample_schemas import BatchEnterRequest, BatchDeleteRequest, ConvertRequest, DocumentAdd, UploadUrlRequest, ExportRequest
 from app.services.sample_service import SampleService
 from app.services.jwt_token import verify_token
 from app.schemas.base import ApiResponse
@@ -34,16 +34,79 @@ async def get_tasks(type: str, credentials: HTTPAuthorizationCredentials = Depen
     try:
         payload = verify_token(credentials.credentials)
         if not payload:
-            return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
+            return ApiResponse(code=401, message="无效的访问令牌").model_dump()
         
         tasks = await task_service.get_task_list(type)
-        return ApiResponse(code=0, message="成功", data=tasks, timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
+        return ApiResponse(code=0, message="成功", data=tasks).model_dump()
     except Exception as e:
         logger.exception("获取任务列表失败")
-        return ApiResponse(code=500, message=str(e), timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
+        return ApiResponse(code=500, message=str(e)).model_dump()
 
 
-# --- 文档管理中心 API ---
+# --- 外部联动接口 API ---
+
+@router.post("/external/projects/init")
+async def init_external_project(request: Request, credentials: HTTPAuthorizationCredentials = Depends(security)):
+    """项目初始化接口:由标注平台调用,同步数据"""
+    try:
+        payload = verify_token(credentials.credentials)
+        if not payload:
+            return ApiResponse(code=401, message="无效的访问令牌").model_dump()
+        
+        # 简单验证是否为管理员(根据业务需求调整)
+        if not payload.get("is_superuser") and payload.get("role") != "admin":
+            return ApiResponse(code=403, message="权限不足").model_dump()
+
+        data = await request.json()
+        success, result = await task_service.create_anno_project(data)
+        
+        if success:
+            return ApiResponse(code=0, message="项目初始化成功", data={"project_id": result}).model_dump()
+        else:
+            return ApiResponse(code=500, message=f"项目初始化失败: {result}").model_dump()
+    except Exception as e:
+        logger.exception("项目初始化接口异常")
+        return ApiResponse(code=500, message=str(e)).model_dump()
+
+@router.get("/external/projects/progress")
+async def get_external_project_progress(
+    project_id: str, 
+    credentials: HTTPAuthorizationCredentials = Depends(security)
+):
+    """查询项目进度"""
+    try:
+        payload = verify_token(credentials.credentials)
+        if not payload:
+            return ApiResponse(code=401, message="无效的访问令牌").model_dump()
+            
+        progress = await task_service.get_project_progress(project_id=project_id)
+        if "error" in progress:
+            return ApiResponse(code=500, message=progress["error"]).model_dump()
+            
+        return ApiResponse(code=0, message="成功", data=progress).model_dump()
+    except Exception as e:
+        logger.exception("查询进度接口异常")
+        return ApiResponse(code=500, message=str(e)).model_dump()
+
+@router.post("/external/projects/export")
+async def export_external_project(
+    req: ExportRequest, 
+    credentials: HTTPAuthorizationCredentials = Depends(security)
+):
+    """导出项目已完成的标注数据"""
+    try:
+        payload = verify_token(credentials.credentials)
+        if not payload:
+            return ApiResponse(code=401, message="无效的访问令牌").model_dump()
+            
+        result = await task_service.export_labeled_data(project_id=req.project_id, export_format=req.format)
+        if "error" in result:
+            return ApiResponse(code=500, message=result["error"]).model_dump()
+            
+        return ApiResponse(code=0, message="成功", data=result).model_dump()
+    except Exception as e:
+        logger.exception("导出数据接口异常")
+        return ApiResponse(code=500, message=str(e)).model_dump()
 
 @router.post("/documents/upload-url")
 async def get_upload_url(req: UploadUrlRequest, credentials: HTTPAuthorizationCredentials = Depends(security)):
@@ -51,7 +114,7 @@ async def get_upload_url(req: UploadUrlRequest, credentials: HTTPAuthorizationCr
     try:
         payload = verify_token(credentials.credentials)
         if not payload:
-            return ApiResponse(code=401, message="无效的访问令牌", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
+            return ApiResponse(code=401, message="无效的访问令牌").model_dump()
         
         sample_service = SampleService()
         success, message, data = await sample_service.get_upload_url(req.filename, req.content_type, prefix=req.prefix)
@@ -269,8 +332,14 @@ async def batch_delete_documents(req: BatchDeleteRequest, credentials: HTTPAutho
         logger.exception("批量删除失败")
         return ApiResponse(code=500, message=f"批量删除失败: {str(e)}", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
 
+from pydantic import BaseModel
+
+class BatchAddTaskRequest(BaseModel):
+    doc_ids: List[str]
+    project_name: str
+
 @router.post("/documents/batch-add-to-task")
-async def batch_add_to_task(req: BatchDeleteRequest, credentials: HTTPAuthorizationCredentials = Depends(security)):
+async def batch_add_to_task(req: BatchAddTaskRequest, credentials: HTTPAuthorizationCredentials = Depends(security)):
     """批量加入任务中心 (设置 whether_to_task = 1)"""
     try:
         payload = verify_token(credentials.credentials)
@@ -284,7 +353,7 @@ async def batch_add_to_task(req: BatchDeleteRequest, credentials: HTTPAuthorizat
         username = payload.get("username", user_id)
         
         sample_service = SampleService()
-        success, message = await sample_service.batch_add_to_task(req.ids, username)
+        success, message = await sample_service.batch_add_to_task(req.doc_ids, username, req.project_name)
         
         return ApiResponse(
             code=0 if success else 500, 

+ 18 - 0
src/views/snippet_view.py

@@ -52,6 +52,24 @@ async def get_snippets(
         meta=meta
     )
 
+@router.get("/document/{doc_id}", response_model=ResponseSchema)
+async def get_document_chunks(
+    doc_id: str = Path(..., description="文档ID"),
+    credentials: HTTPAuthorizationCredentials = Depends(security)
+):
+    """提取单文档的所有知识片段"""
+    payload_token = verify_token(credentials.credentials)
+    if not payload_token:
+        return ResponseSchema(code=401, message="无效的访问令牌")
+        
+    chunks = await snippet_service.get_chunks_by_document(doc_id)
+    
+    return ResponseSchema(
+        code=0, 
+        message="获取成功", 
+        data=chunks
+    )
+
 @router.get("/detail", response_model=ResponseSchema)
 async def get_snippet_detail(
     kb: str = Query(..., description="知识库名称"),

Một số tệp đã không được hiển thị bởi vì quá nhiều tập tin thay đổi trong này khác