2 Commits 108262d477 ... 2364a53777

Auteur SHA1 Message Date
  chenkun 2364a53777 Merge branch 'dev' of http://192.168.0.3:3000/CRBC-MaaS-Platform-Project/LQAdminPlatform into dev il y a 2 semaines
  chenkun 1446b324d0 优化界面2 il y a 2 semaines

+ 105 - 0
src/app/services/image_service.py

@@ -396,6 +396,111 @@ class ImageService:
             if conn:
                 conn.close()
 
+    async def category_batch_check(self, category_id: str) -> Tuple[bool, str, Dict[str, Any]]:
+        """按分类批量推送前的预检查 (获取图片总数和子分类列表)"""
+        conn = get_db_connection()
+        if not conn:
+            return False, "数据库连接失败", {}
+        
+        cursor = conn.cursor()
+        try:
+            # 1. 获取所有涉及的分类 ID
+            target_category_ids = []
+            if not category_id or category_id == '0':
+                cursor.execute("SELECT id FROM t_image_category")
+                target_category_ids = [str(row['id']) for row in cursor.fetchall()]
+            else:
+                target_category_ids = [category_id]
+                child_ids = await self._get_all_child_category_ids(category_id)
+                target_category_ids.extend(child_ids)
+            
+            if not target_category_ids:
+                return False, "分类不存在", {"image_count": 0}
+
+            # 2. 获取这些分类下的所有图片总数
+            placeholders = ', '.join(['%s'] * len(target_category_ids))
+            sql = f"SELECT COUNT(*) as count FROM t_image_info WHERE image_type IN ({placeholders})"
+            cursor.execute(sql, tuple(target_category_ids))
+            image_count = cursor.fetchone()['count']
+            
+            # 3. 获取涉及的子分类名称
+            sub_category_names = []
+            if len(target_category_ids) > 1:
+                sub_ids = target_category_ids[1:] if category_id and category_id != '0' else target_category_ids
+                sub_placeholders = ', '.join(['%s'] * len(sub_ids))
+                cursor.execute(f"SELECT type_name FROM t_image_category WHERE id IN ({sub_placeholders})", tuple(sub_ids))
+                sub_category_names = [row['type_name'] for row in cursor.fetchall()]
+
+            return True, "查询成功", {
+                "image_count": image_count,
+                "sub_categories": sub_category_names
+            }
+        except Exception as e:
+            logger.exception("按分类批量检查失败")
+            return False, f"操作失败: {str(e)}", {}
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
+    async def category_batch_add_to_task(self, category_id: str, username: str, project_name: str, tags: List[str] = None) -> Tuple[bool, str, Dict[str, Any]]:
+        """按分类批量将图片加入任务中心 (支持递归)"""
+        conn = get_db_connection()
+        if not conn:
+            return False, "数据库连接失败", {}
+        
+        cursor = conn.cursor()
+        try:
+            # 1. 获取所有涉及的分类 ID
+            target_category_ids = []
+            if not category_id or category_id == '0':
+                # 如果没选分类,默认获取所有
+                cursor.execute("SELECT id FROM t_image_category")
+                target_category_ids = [str(row['id']) for row in cursor.fetchall()]
+            else:
+                target_category_ids = [category_id]
+                child_ids = await self._get_all_child_category_ids(category_id)
+                target_category_ids.extend(child_ids)
+            
+            if not target_category_ids:
+                return False, "分类不存在", {"image_count": 0}
+
+            # 2. 获取这些分类下的所有图片 ID
+            placeholders = ', '.join(['%s'] * len(target_category_ids))
+            sql = f"SELECT id FROM t_image_info WHERE image_type IN ({placeholders})"
+            cursor.execute(sql, tuple(target_category_ids))
+            image_rows = cursor.fetchall()
+            image_ids = [str(row['id']) for row in image_rows]
+            
+            if not image_ids:
+                return True, "该分类下无图片,无法推送", {"image_count": 0}
+
+            # 3. 获取涉及的子分类名称 (用于前端展示)
+            sub_category_names = []
+            if len(target_category_ids) > 1:
+                # 排除当前分类本身,只取子分类名称
+                sub_ids = target_category_ids[1:] if category_id and category_id != '0' else target_category_ids
+                sub_placeholders = ', '.join(['%s'] * len(sub_ids))
+                cursor.execute(f"SELECT type_name FROM t_image_category WHERE id IN ({sub_placeholders})", tuple(sub_ids))
+                sub_category_names = [row['type_name'] for row in cursor.fetchall()]
+
+            # 4. 调用现有的批量加入逻辑
+            success, message = await self.batch_add_to_task(image_ids, username, project_name, tags)
+            
+            return success, message, {
+                "image_count": len(image_ids),
+                "sub_categories": sub_category_names
+            }
+        except Exception as e:
+            logger.exception("按分类批量加入任务失败")
+            return False, f"操作失败: {str(e)}", {}
+        finally:
+            if cursor:
+                cursor.close()
+            if conn:
+                conn.close()
+
     async def get_upload_url(self, filename: str, content_type: str, prefix: str = None) -> Tuple[bool, str, Dict[str, Any]]:
         """获取 MinIO 预签名上传 URL"""
         try:

+ 1 - 1
src/app/services/milvus_service.py

@@ -186,7 +186,7 @@ class MilvusService:
                 "issuing_authority": business_meta.get("issuing_authority") or "无",
                 "document_type": business_meta.get("document_type") or "未分类",
                 "professional_field": business_meta.get("professional_field") or "未分类",
-                "validity": business_meta.get("validity") or "现行",
+                "validity": business_meta.get("validity") or "XH",
                 "file_url": business_meta.get("file_url") or ""
             })
         elif source_type == 'construction_plan':

+ 133 - 89
src/app/services/sample_service.py

@@ -75,9 +75,24 @@ PROFESSIONAL_FIELD_MAP = {
     "其他": "QT"
 }
 
-# 专业领域反向映射 (数据库存储 -> 前端显示)
+# 四级分类反向映射 (数据库存储 -> 前端显示)
 PROFESSIONAL_FIELD_REVERSE_MAP = {v: k for k, v in PROFESSIONAL_FIELD_MAP.items()}
 
+# 时效性映射 (前端显示 -> 数据库存储)
+VALIDITY_MAP = {
+    "现行": "XH",
+    "废止": "FZ",
+    "试行": "SX"
+}
+
+# 时效性反向映射 (数据库存储 -> 前端显示)
+VALIDITY_REVERSE_MAP = {v: k for k, v in VALIDITY_MAP.items()}
+VALIDITY_REVERSE_MAP.update({
+    "现行": "现行",
+    "已废止": "废止",
+    "被替代": "废止"
+})
+
 # 方案类别映射 (前端显示 -> 数据库存储)
 PLAN_CATEGORY_MAP = {
     "超危大方案": "CH",
@@ -219,6 +234,11 @@ class SampleService:
         if prof_field_code in PROFESSIONAL_FIELD_REVERSE_MAP:
             item['professional_field'] = PROFESSIONAL_FIELD_REVERSE_MAP[prof_field_code]
 
+        # 处理时效性显示 (简写 -> 中文)
+        validity_code = item.get('validity')
+        if validity_code in VALIDITY_REVERSE_MAP:
+            item['validity'] = VALIDITY_REVERSE_MAP[validity_code]
+
         # 处理一级分类显示 (简写 -> 中文)
         level_1_code = item.get('level_1_classification')
         if level_1_code in FIRST_LEVEL_REVERSE_MAP:
@@ -438,6 +458,11 @@ class SampleService:
                         prof_field = business_metadata['professional_field']
                         business_metadata['professional_field'] = PROFESSIONAL_FIELD_MAP.get(prof_field, prof_field)
 
+                    # 强制转换 validity 为简写
+                    if 'validity' in business_metadata and business_metadata['validity']:
+                        validity = business_metadata['validity']
+                        business_metadata['validity'] = VALIDITY_MAP.get(validity, validity)
+
                     # 强制转换 plan_category 为简写
                     if 'plan_category' in business_metadata and business_metadata['plan_category']:
                         plan_cat = business_metadata['plan_category']
@@ -486,13 +511,7 @@ class SampleService:
                     }
                     await self.milvus_service.insert_knowledge(md_content, doc_info)
                     
-                    # F. 添加到任务管理中心 (类型为 data)
-                    try:
-                        await task_service.add_task(doc_id, 'data')
-                    except Exception as task_err:
-                        logger.error(f"添加文档 {title} 到任务中心失败: {task_err}")
-
-                    # G. 更新数据库状态
+                    # F. 更新数据库状态
                     update_sql = "UPDATE t_samp_document_main SET whether_to_enter = 1, kb_id = %s, kb_method = %s, updated_by = %s, updated_time = NOW() WHERE id = %s"
                     cursor.execute(update_sql, (current_kb_id, current_kb_method, username, doc_id))
                     success_count += 1
@@ -570,60 +589,89 @@ class SampleService:
             add_placeholders = ', '.join(['%s'] * len(ids_to_add))
             sql = f"UPDATE t_samp_document_main SET whether_to_task = 1, updated_by = %s, updated_time = NOW() WHERE id IN ({add_placeholders})"
             cursor.execute(sql, (username, *ids_to_add))
-            
-            # 3. 写入任务管理表 (单表逻辑)
+
+            # 2.5 清理选定文档之前的空任务记录(处理存量僵尸数据)
+            cursor.execute(f"DELETE FROM t_task_management WHERE business_id IN ({add_placeholders}) AND project_id IS NULL", ids_to_add)
+            
+            # 3. 批量获取元数据并写入任务管理表
+            EXCLUDE_FIELDS = {
+                'id', 'created_time', 'updated_time', 'created_by', 'updated_by',
+                'conversion_status', 'whether_to_enter', 'whether_to_task', 
+                'kb_method', 'whether_to_delete'
+            }
+            
+            # 3.1 批量查询主表信息
+            cursor.execute(f"SELECT * FROM t_samp_document_main WHERE id IN ({add_placeholders})", ids_to_add)
+            all_doc_main = cursor.fetchall()
+            doc_main_map = {doc['id']: doc for doc in all_doc_main}
+            
+            # 3.2 按来源类型分组查询子表信息
+            source_type_groups = {}
+            for doc in all_doc_main:
+                st = doc.get('source_type')
+                if st:
+                    if st not in source_type_groups: source_type_groups[st] = []
+                    source_type_groups[st].append(doc['id'])
+            
+            all_sub_data_map = {}
+            for st, ids in source_type_groups.items():
+                table_name = TABLE_MAP.get(st)
+                if table_name:
+                    placeholders_sub = ', '.join(['%s'] * len(ids))
+                    cursor.execute(f"SELECT * FROM {table_name} WHERE id IN ({placeholders_sub})", ids)
+                    sub_rows = cursor.fetchall()
+                    for row in sub_rows:
+                        all_sub_data_map[row['id']] = row
+            
+            # 3.3 批量准备数据
+            task_records = []
             for doc_id in ids_to_add:
-                try:
-                    # 获取业务元数据
-                    metadata_dict = {}
-                    try:
-                        # 定义需要过滤掉的非业务/内部状态字段
-                        EXCLUDE_FIELDS = {
-                            'id', 'created_time', 'updated_time', 'created_by', 'updated_by',
-                            'conversion_status', 'whether_to_enter', 'whether_to_task', 
-                            'kb_method', 'whether_to_delete'
-                        }
-                        
-                        # 查询主表和子表信息
-                        cursor.execute("SELECT * FROM t_samp_document_main WHERE id = %s", (doc_id,))
-                        doc_main = cursor.fetchone()
-                        if doc_main:
-                            # 基础元数据 (仅保留标题和来源类型等核心信息)
-                            for k, v in doc_main.items():
-                                if v is not None and v != '' and k not in EXCLUDE_FIELDS:
-                                    metadata_dict[k] = v
-                            
-                            # 子表元数据
-                            source_type = doc_main.get('source_type')
-                            table_name = TABLE_MAP.get(source_type)
-                            if table_name:
-                                cursor.execute(f"SELECT * FROM {table_name} WHERE id = %s", (doc_id,))
-                                sub_data = cursor.fetchone()
-                                if sub_data:
-                                    for k, v in sub_data.items():
-                                        if v is not None and v != '' and k not in EXCLUDE_FIELDS:
-                                            metadata_dict[k] = v
-                        
-                        # 递归格式化时间
-                        metadata_dict = task_service._serialize_datetime(metadata_dict)
-                    except Exception as meta_err:
-                        logger.warning(f"获取文档 {doc_id} 元数据失败: {meta_err}")
+                metadata_dict = {}
+                doc_main = doc_main_map.get(doc_id)
+                if doc_main:
+                    for k, v in doc_main.items():
+                        if v is not None and v != '' and k not in EXCLUDE_FIELDS:
+                            metadata_dict[k] = v
                     
-                    await task_service.add_task(
-                        business_id=doc_id, 
-                        task_type='data', 
-                        project_id=project_id,
-                        project_name=project_name,
-                        tag=tag_str,
-                        metadata=json.dumps(metadata_dict, ensure_ascii=False) if metadata_dict else None
-                    )
-                except Exception as e:
-                    logger.exception(f"添加文档 {doc_id} 到任务中心失败: {e}")
+                    sub_data = all_sub_data_map.get(doc_id)
+                    if sub_data:
+                        for k, v in sub_data.items():
+                            if v is not None and v != '' and k not in EXCLUDE_FIELDS:
+                                metadata_dict[k] = v
+                
+                metadata_dict = task_service._serialize_datetime(metadata_dict)
+                task_records.append((
+                    doc_id, None, project_id, project_name, 'data', 'pending', 
+                    tag_str, json.dumps(metadata_dict, ensure_ascii=False) if metadata_dict else None
+                ))
+
+            # 3.4 批量写入任务管理表
+            if task_records:
+                logger.info(f"正在批量写入 {len(task_records)} 条任务记录到数据库...")
+                sql_insert = """
+                    INSERT INTO t_task_management (business_id, task_id, project_id, project_name, type, annotation_status, tag, metadata)
+                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
+                    ON DUPLICATE KEY UPDATE 
+                        task_id = IFNULL(VALUES(task_id), task_id),
+                        project_id = IFNULL(VALUES(project_id), project_id),
+                        project_name = IFNULL(VALUES(project_name), project_name),
+                        annotation_status = IFNULL(VALUES(annotation_status), annotation_status),
+                        tag = IFNULL(VALUES(tag), tag),
+                        metadata = IFNULL(VALUES(metadata), metadata)
+                """
+                cursor.executemany(sql_insert, task_records)
             
             conn.commit()
+            logger.info(f"数据库写入完成,已提交。正在释放连接...")
+            cursor.close()
+            conn.close()
+            conn = None  # 防止 finally 中重复关闭
+            cursor = None
 
-            # 4. 自动推送至外部标注平台
+            # 4. 自动推送至外部标注平台 (在连接关闭后执行,防止死锁 and 长事务)
+            logger.info(f"开始推送项目 {project_name} (ID: {project_id}) 至外部平台...")
             push_success, push_msg = await task_service.send_to_external_platform(project_id)
+            logger.info(f"外部平台推送结果: success={push_success}, msg={push_msg}")
             
             msg = f"成功将 {len(ids_to_add)} 份文档加入项目: {project_name}"
             if push_success:
@@ -1017,6 +1065,10 @@ class SampleService:
             prof_field_cn = doc_data.get('professional_field')
             prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
 
+            # 处理时效性 (中文 -> 简写)
+            validity_cn = doc_data.get('validity')
+            validity_code = VALIDITY_MAP.get(validity_cn, validity_cn)
+
             # 处理方案类别 (中文 -> 简写)
             plan_category_cn = doc_data.get('plan_category')
             plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
@@ -1078,7 +1130,7 @@ class SampleService:
                         doc_data.get('issuing_authority'), release_date, self._to_date(doc_data.get('implementation_date')),
                         doc_data.get('drafting_unit'), doc_data.get('approving_department'), doc_data.get('participating_units'),
                         doc_type_code, prof_field_code, doc_data.get('engineering_phase'),
-                        doc_data.get('validity', '现行'), doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'),
+                        validity_code, doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'),
                         user_id, user_id
                     )
                 )
@@ -1116,12 +1168,6 @@ class SampleService:
                     )
                 )
             
-            # 3. 添加到任务管理中心 (类型为 data)
-            try:
-                await task_service.add_task(doc_id, 'data')
-            except Exception as task_err:
-                logger.error(f"添加文档 {doc_data.get('title')} 到任务中心失败: {task_err}")
-
             conn.commit()
             return True, "文档添加成功", doc_id
         except Exception as e:
@@ -1159,6 +1205,10 @@ class SampleService:
             prof_field_cn = doc_data.get('professional_field')
             prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
 
+            # 处理时效性 (中文 -> 简写)
+            validity_cn = doc_data.get('validity')
+            validity_code = VALIDITY_MAP.get(validity_cn, validity_cn)
+
             # 处理方案类别 (中文 -> 简写)
             plan_category_cn = doc_data.get('plan_category')
             plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
@@ -1209,7 +1259,7 @@ class SampleService:
                         doc_data.get('title'), doc_data.get('english_name'), doc_data.get('standard_no'), doc_data.get('issuing_authority'), 
                         release_date, self._to_date(doc_data.get('implementation_date')), doc_data.get('drafting_unit'), doc_data.get('approving_department'),
                         doc_data.get('participating_units'), doc_type_code, prof_field_code, doc_data.get('engineering_phase'),
-                        doc_data.get('validity'), doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'),
+                        validity_code, doc_data.get('reference_basis'), doc_data.get('source_url'), doc_data.get('note'),
                         updater_id, doc_id
                     )
                 )
@@ -1414,6 +1464,10 @@ class SampleService:
                     if filter_key == 'level_4_classification':
                         filter_value = FOURTH_LEVEL_MAP.get(filter_value, filter_value)
 
+                    # 处理时效性查询 (前端传中文 -> 数据库查简写)
+                    if filter_key == 'validity':
+                        filter_value = VALIDITY_MAP.get(filter_value, filter_value)
+
                     # 如果是 title, standard_no, issuing_authority,支持模糊查询
                     if filter_key in ['title', 'standard_no', 'issuing_authority']:
                         where_clauses.append(f"{db_field} LIKE %s")
@@ -1479,6 +1533,11 @@ class SampleService:
                 if level_4_code in FOURTH_LEVEL_REVERSE_MAP:
                     item['level_4_classification'] = FOURTH_LEVEL_REVERSE_MAP[level_4_code]
 
+                # 处理时效性显示 (简写 -> 中文)
+                validity_code = item.get('validity')
+                if validity_code in VALIDITY_REVERSE_MAP:
+                    item['validity'] = VALIDITY_REVERSE_MAP[validity_code]
+
                 for key in ['file_url', 'md_url', 'json_url']:
                     if item.get(key):
                         item[key] = self.minio_manager.get_full_url(item[key])
@@ -1613,6 +1672,10 @@ class SampleService:
             prof_field_cn = data.get('professional_field')
             prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
 
+            # 处理时效性 (中文 -> 简写)
+            validity_cn = data.get('validity')
+            validity_code = VALIDITY_MAP.get(validity_cn, validity_cn)
+
             # 处理方案类别 (中文 -> 简写)
             plan_category_cn = data.get('plan_category')
             plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
@@ -1633,10 +1696,6 @@ class SampleService:
             level_4_cn = data.get('level_4_classification')
             level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
 
-            # 处理四级分类 (中文 -> 简写)
-            level_4_cn = data.get('level_4_classification')
-            level_4_code = FOURTH_LEVEL_MAP.get(level_4_cn, level_4_cn)
-
             # 1. 插入主表 (解耦触发器,手动同步)
             cursor.execute(
                 """
@@ -1670,7 +1729,7 @@ class SampleService:
                     data.get('drafting_unit'), data.get('approving_department'),
                     data.get('participating_units'),
                     doc_type_code, prof_field_code, data.get('engineering_phase'),
-                    data.get('validity', '现行'), data.get('reference_basis'), data.get('source_url'), data.get('note'),
+                    validity_code or 'XH', data.get('reference_basis'), data.get('source_url'), data.get('note'),
                     user_id, user_id
                 )
             elif type == 'construction_plan':
@@ -1707,25 +1766,6 @@ class SampleService:
             
             cursor.execute(sql, params)
             
-            # 3. 添加到任务管理中心 (类型为 data)
-            try:
-                # 尝试调用异步方法,如果报错则记录日志
-                import asyncio
-                try:
-                    # 检查是否有正在运行的事件循环
-                    loop = asyncio.get_event_loop()
-                    if loop.is_running():
-                        # 在运行的循环中创建任务
-                        loop.create_task(task_service.add_task(doc_id, 'data'))
-                    else:
-                        # 否则使用 run 运行(不推荐在 web 环境下这样做,但这里是兜底)
-                        loop.run_until_complete(task_service.add_task(doc_id, 'data'))
-                except RuntimeError:
-                    # 没有事件循环时
-                    asyncio.run(task_service.add_task(doc_id, 'data'))
-            except Exception as task_err:
-                logger.error(f"添加基本信息 {data.get('title')} 到任务中心失败: {task_err}")
-
             conn.commit()
             return True, "新增成功", doc_id
         except Exception as e:
@@ -1761,6 +1801,10 @@ class SampleService:
             prof_field_cn = data.get('professional_field')
             prof_field_code = PROFESSIONAL_FIELD_MAP.get(prof_field_cn, prof_field_cn)
 
+            # 处理时效性 (中文 -> 简写)
+            validity_cn = data.get('validity')
+            validity_code = VALIDITY_MAP.get(validity_cn, validity_cn)
+
             # 处理方案类别 (中文 -> 简写)
             plan_category_cn = data.get('plan_category')
             plan_category_code = PLAN_CATEGORY_MAP.get(plan_category_cn, plan_category_cn)
@@ -1807,7 +1851,7 @@ class SampleService:
                 """
                 params = (
                     data.get('title'), data.get('standard_no'), data.get('issuing_authority'), self._to_date(data.get('release_date')), 
-                    doc_type_code, prof_field_code, data.get('validity'),
+                    doc_type_code, prof_field_code, validity_code,
                     data.get('english_name'), self._to_date(data.get('implementation_date')), data.get('drafting_unit'),
                     data.get('approving_department'), data.get('engineering_phase'),
                     data.get('participating_units'),

+ 96 - 66
src/app/services/task_service.py

@@ -66,6 +66,11 @@ class TaskService:
                 cursor.execute("CREATE UNIQUE INDEX uk_business_project ON t_task_management (business_id, project_id)")
                 logger.info("Created new composite unique index: uk_business_project")
             
+            cursor.execute("SHOW INDEX FROM t_task_management WHERE Key_name = 'idx_project_id'")
+            if not cursor.fetchone():
+                cursor.execute("CREATE INDEX idx_project_id ON t_task_management (project_id)")
+                logger.info("Created index: idx_project_id")
+            
             conn.commit()
             TaskService._schema_verified = True
         except Exception as e:
@@ -473,25 +478,45 @@ class TaskService:
             
             # 针对 'data' 类型的批量 Milvus 查询优化
             milvus_data_map = {}
+            missing_milvus_ids = []
             if internal_task_type == 'data':
                 all_task_ids = [r['id'] for r in rows]
                 sql_kb = """
                     SELECT kb.collection_name_parent, kb.collection_name_children
                     FROM t_samp_document_main d
                     LEFT JOIN t_samp_knowledge_base kb ON d.kb_id = kb.id
-                    WHERE d.id COLLATE utf8mb4_unicode_ci = %s COLLATE utf8mb4_unicode_ci
+                    WHERE d.id = %s
                 """
                 cursor.execute(sql_kb, (all_task_ids[0],))
                 kb_info = cursor.fetchone()
                 if kb_info:
                     milvus_data_map = self._get_milvus_content_batch(all_task_ids, kb_info)
+                
+                # 记录缺失 Milvus 数据的 ID
+                missing_milvus_ids = [tid for tid in all_task_ids if not milvus_data_map.get(tid)]
+            
+            # 针对缺失数据的批量标题查询
+            title_map = {}
+            if missing_milvus_ids:
+                placeholders = ', '.join(['%s'] * len(missing_milvus_ids))
+                cursor.execute(f"SELECT id, title FROM t_samp_document_main WHERE id IN ({placeholders})", missing_milvus_ids)
+                title_map = {r['id']: r['title'] for r in cursor.fetchall()}
+
+            # 针对 'image' 类型的批量 MinIO 查询优化
+            image_data_map = {}
+            if internal_task_type == 'image':
+                all_image_ids = [r['id'] for r in rows]
+                placeholders = ', '.join(['%s'] * len(all_image_ids))
+                cursor.execute(f"SELECT id, image_url FROM t_image_info WHERE id IN ({placeholders})", all_image_ids)
+                for img_row in cursor.fetchall():
+                    img_url = img_row['image_url']
+                    if img_url and not img_url.startswith('http'):
+                        img_url = self.minio_manager.get_full_url(img_url)
+                    image_data_map[img_row['id']] = [img_url] if img_url else []
 
             for item in rows:
                 task_id = item['id']
                 
-                # 记录原始数据状态
-                logger.debug(f"正在处理导出任务 {task_id}, tag: {item.get('tag')}, metadata_keys: {list(json.loads(item['metadata']).keys()) if item.get('metadata') else 'None'}")
-                
                 # 提取并处理标签
                 doc_tags = []
                 if item.get('tag'):
@@ -501,30 +526,24 @@ class TaskService:
                             for t in doc_tags: all_project_tags.add(t)
                     except: pass
                 
-                # 解析数据库元数据
+                # 解析数据库元数据 (提前序列化日期)
                 db_metadata = {}
                 if item.get('metadata'):
                     try:
                         db_metadata = json.loads(item['metadata']) if isinstance(item['metadata'], str) else item['metadata']
+                        if db_metadata:
+                            db_metadata = self._serialize_datetime(db_metadata)
                     except: pass
                 
                 # 获取任务内容
                 task_contents = []
-                annotation_results = []
                 if internal_task_type == 'data':
                     task_contents = milvus_data_map.get(task_id, [])
                     if not task_contents:
-                        cursor.execute("SELECT title FROM t_samp_document_main WHERE id = %s", (task_id,))
-                        res = cursor.fetchone()
-                        if res: task_contents = [res['title']]
+                        title = title_map.get(task_id)
+                        if title: task_contents = [title]
                 elif internal_task_type == 'image':
-                    cursor.execute("SELECT image_url FROM t_image_info WHERE id = %s", (task_id,))
-                    res = cursor.fetchone()
-                    if res:
-                        img_url = res['image_url']
-                        if img_url and not img_url.startswith('http'):
-                            img_url = self.minio_manager.get_full_url(img_url)
-                        task_contents = [img_url]
+                    task_contents = image_data_map.get(task_id, [])
 
                 # 构建最终任务列表
                 for idx, content in enumerate(task_contents):
@@ -553,15 +572,15 @@ class TaskService:
                     
                     final_tasks.append(task_item)
 
-            # 统一进行一次递归序列化处理
-            return self._serialize_datetime({
+            # 准备返回结果,不再进行全局递归序列化 (已在局部处理)
+            return {
                 "name": project_name,
                 "description": "",
                 "task_type": external_task_type,
                 "data": final_tasks,
                 "external_id": remote_project_id,
                 "tags": [{"tag": t} for t in sorted(list(all_project_tags))]
-            })
+            }
         except Exception as e:
             logger.exception(f"导出项目数据异常: {e}")
             return {}
@@ -604,25 +623,30 @@ class TaskService:
                 id_field = fields["id"]
                 content_field = fields["content"]
                 
-                # 使用 in 表达式进行批量查询
-                # 注意:如果 task_ids 非常多,可能需要分批(如每批 100 个)
-                id_list_str = ", ".join([f'"{tid}"' for tid in task_ids])
-                res = milvus_service.client.query(
-                    collection_name=coll_name,
-                    filter=f'{id_field} in [{id_list_str}]',
-                    output_fields=[id_field, content_field]
-                )
-                
-                if res:
-                    for s in res:
-                        tid = s.get(id_field)
-                        val = s.get(content_field)
-                        if tid in result_map and val:
-                            result_map[tid].append(val)
+                # 使用 in 表达式进行批量查询 (分批处理以防 ID 过多)
+                CHUNK_SIZE = 100
+                for i in range(0, len(task_ids), CHUNK_SIZE):
+                    chunk_ids = task_ids[i:i + CHUNK_SIZE]
+                    id_list_str = ", ".join([f'"{tid}"' for tid in chunk_ids])
                     
-                    # 如果当前集合已经查到了内容,就不再查兜底集合(逻辑同单条查询)
-                    if any(result_map.values()):
-                        return result_map
+                    logger.info(f"正在从 Milvus 集合 {coll_name} 查询分片内容 ({i}/{len(task_ids)})...")
+                    res = milvus_service.client.query(
+                        collection_name=coll_name,
+                        filter=f'{id_field} in [{id_list_str}]',
+                        output_fields=[id_field, content_field]
+                    )
+                    
+                    if res:
+                        for s in res:
+                            tid = s.get(id_field)
+                            val = s.get(content_field)
+                            if tid in result_map and val:
+                                result_map[tid].append(val)
+                
+                # 如果当前集合已经查到了内容,就不再查兜底集合 (除非结果仍为空)
+                if any(result_map.values()):
+                    logger.info(f"从 Milvus 集合 {coll_name} 查得 {sum(len(v) for v in result_map.values())} 条内容分片")
+                    return result_map
             except Exception as e:
                 logger.error(f"批量查询 Milvus 集合 {coll_name} 异常: {e}")
                 continue
@@ -854,12 +878,13 @@ class TaskService:
 
     async def send_to_external_platform(self, project_id: str) -> Tuple[bool, str]:
         """将项目数据推送至外部标注平台 (单表化)"""
+        # 1. 准备数据 (导出数据需要数据库连接)
         conn = get_db_connection()
         if not conn:
             return False, "数据库连接失败"
             
         try:
-            # 1. 准备数据 (复用数据库连接)
+            logger.info(f"开始导出项目 {project_id} 数据...")
             payload = await self.export_project_data(project_id=project_id, conn=conn)
             
             if not payload:
@@ -867,15 +892,22 @@ class TaskService:
                 
             if not payload.get('data'):
                 return False, f"项目数据为空 (查询到0条有效任务),无法推送"
+        except Exception as e:
+            logger.exception(f"导出项目数据异常: {e}")
+            return False, f"导出异常: {str(e)}"
+        finally:
+            # 及时释放连接,防止在 HTTP 请求期间占用
+            conn.close()
             
-            # 2. 获取配置
+        # 2. 获取配置
+        try:
             from app.core.config import config_handler
             api_base_url = config_handler.get('external_api', 'project_api_url', 'http://192.168.92.61:9003/api/external/projects').rstrip('/')
             api_url = f"{api_base_url}/init"
             token = config_handler.get('external_api', 'admin_token', '')
 
-            # 3. 发送请求
-            async with httpx.AsyncClient(timeout=60.0) as client:
+            # 3. 发送请求 (不持有数据库连接)
+            async with httpx.AsyncClient(timeout=120.0) as client: # 增加超时时间到 120s
                 headers = {
                     "Authorization": f"Bearer {token}",
                     "Content-Type": "application/json"
@@ -887,30 +919,30 @@ class TaskService:
                     res_data = response.json()
                     logger.info(f"外部平台推送成功响应: {res_data}")
                     remote_project_id = res_data.get('project_id')
-                    
-                    # 获取下载地址并回写 (兼容多种返回格式)
                     download_url = res_data.get('download_url') or res_data.get('file_url')
-                    if download_url:
-                        # 4. 回写外部项目 ID 和下载地址 (复用当前连接)
-                        cursor = conn.cursor()
-                        # 先检查受影响行数
-                        affected = cursor.execute(
-                            "UPDATE t_task_management SET task_id = %s, file_url = %s WHERE project_id = %s", 
-                            (remote_project_id, download_url, project_id)
-                        )
-                        conn.commit()
-                        cursor.close()
-                        logger.info(f"已回写 task_id: {remote_project_id} 和 file_url: {download_url}, 受影响行数: {affected}")
-                    elif remote_project_id:
-                        # 仅回写外部项目 ID
-                        cursor = conn.cursor()
-                        affected = cursor.execute(
-                            "UPDATE t_task_management SET task_id = %s WHERE project_id = %s", 
-                            (remote_project_id, project_id)
-                        )
-                        conn.commit()
-                        cursor.close()
-                        logger.info(f"仅回写 task_id: {remote_project_id}, 受影响行数: {affected}")
+                    
+                    # 4. 回写外部项目 ID 和下载地址 (重新获取连接)
+                    if remote_project_id:
+                        conn = get_db_connection()
+                        if conn:
+                            try:
+                                cursor = conn.cursor()
+                                if download_url:
+                                    affected = cursor.execute(
+                                        "UPDATE t_task_management SET task_id = %s, file_url = %s WHERE project_id = %s", 
+                                        (remote_project_id, download_url, project_id)
+                                    )
+                                    logger.info(f"已回写 task_id: {remote_project_id} 和 file_url: {download_url}, 受影响行数: {affected}")
+                                else:
+                                    affected = cursor.execute(
+                                        "UPDATE t_task_management SET task_id = %s WHERE project_id = %s", 
+                                        (remote_project_id, project_id)
+                                    )
+                                    logger.info(f"仅回写 task_id: {remote_project_id}, 受影响行数: {affected}")
+                                conn.commit()
+                            finally:
+                                cursor.close()
+                                conn.close()
                     
                     return True, f"推送成功!外部项目ID: {remote_project_id or '未知'}"
                 else:
@@ -921,7 +953,5 @@ class TaskService:
         except Exception as e:
             logger.exception(f"推送至外部平台异常: {e}")
             return False, f"推送异常: {str(e)}"
-        finally:
-            conn.close()
 
 task_service = TaskService()

+ 45 - 1
src/views/image_view.py

@@ -2,7 +2,7 @@ import logging
 from datetime import datetime, timezone
 from typing import Optional, List, Dict, Any
 
-from fastapi import APIRouter, Depends, HTTPException, Request, Response
+from fastapi import APIRouter, Depends, HTTPException, Request, Response, Query
 from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
 
 from app.services.image_service import ImageService
@@ -44,6 +44,11 @@ class BatchAddRequest(BaseModel):
     project_name: str
     tags: Optional[List[str]] = None
 
+class CategoryBatchAddRequest(BaseModel):
+    category_id: str
+    project_name: str
+    tags: Optional[List[str]] = None
+
 # --- 分类管理 API ---
 
 @router.get("/categories")
@@ -186,3 +191,42 @@ async def batch_add_to_task(req: BatchAddRequest, current_user: dict = Depends(g
     except Exception as e:
         logger.exception("批量加入任务失败")
         return ApiResponse(code=500, message=f"批量加入任务失败: {str(e)}", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
+
+@router.post("/category-batch-add-to-task")
+async def category_batch_add_to_task(req: CategoryBatchAddRequest, current_user: dict = Depends(get_current_user_with_refresh)):
+    """按分类批量加入任务中心"""
+    try:
+        user_id = current_user.get("sub", "admin")
+        username = current_user.get("username", user_id)
+        
+        service = ImageService()
+        success, message, data = await service.category_batch_add_to_task(
+            req.category_id, username, req.project_name, tags=req.tags
+        )
+        
+        return ApiResponse(
+            code=0 if success else 500, 
+            message=message, 
+            data=data,
+            timestamp=datetime.now(timezone.utc).isoformat()
+        ).model_dump()
+    except Exception as e:
+        logger.exception("按分类批量加入任务失败")
+        return ApiResponse(code=500, message=f"操作失败: {str(e)}", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()
+
+@router.get("/category-batch-check")
+async def category_batch_check(category_id: str = Query("", description="分类ID"), current_user: dict = Depends(get_current_user_with_refresh)):
+    """按分类批量推送前的预检查"""
+    try:
+        service = ImageService()
+        success, message, data = await service.category_batch_check(category_id)
+        
+        return ApiResponse(
+            code=0 if success else 500, 
+            message=message, 
+            data=data,
+            timestamp=datetime.now(timezone.utc).isoformat()
+        ).model_dump()
+    except Exception as e:
+        logger.exception("按分类批量检查失败")
+        return ApiResponse(code=500, message=f"操作失败: {str(e)}", timestamp=datetime.now(timezone.utc).isoformat()).model_dump()