Przeglądaj źródła

v0.0.4-更新RAG链路多查询对

WangXuMing 1 tydzień temu
rodzic
commit
9dd24515f7

+ 36 - 0
cleanup_logs.bat

@@ -0,0 +1,36 @@
+@echo off
+chcp 65001 >nul
+echo ========================================
+echo 日志文件清理工具
+echo ========================================
+echo.
+echo 此脚本将重命名已满的日志文件,解决轮转失败问题
+echo.
+pause
+
+echo.
+echo 正在停止可能占用日志文件的进程...
+echo 注意:如果有Python服务正在运行,请先手动停止(Ctrl+C)
+echo.
+
+cd /d "%~dp0"
+
+echo 正在重命名日志文件...
+echo.
+
+if exist "logs\agent_debug.log" (
+    ren "logs\agent_debug.log" "agent_debug_log_%date:~0,4%%date:~5,2%%date:~8,2%_%time:~0,2%%time:~3,2%%time:~6,2%.old"
+    echo ✓ agent_debug.log 已重命名
+)
+
+if exist "logs\agent_info.log" (
+    ren "logs\agent_info.log" "agent_info_log_%date:~0,4%%date:~5,2%%date:~8,2%_%time:~0,2%%time:~3,2%%time:~6,2%.old"
+    echo ✓ agent_info.log 已重命名
+)
+
+echo.
+echo ========================================
+echo 清理完成!现在可以重新启动服务了
+echo ========================================
+echo.
+pause

+ 13 - 1
core/base/task_models.py

@@ -35,7 +35,19 @@ class TaskFileInfo:
         self.file_content = file_info.get('file_content', b'')
 
         # 审查配置信息
-        self.review_config = file_info.get('review_config', [])
+        review_config_raw = file_info.get('review_config', [])
+
+        # 类型校验:确保review_config是列表
+        if isinstance(review_config_raw, list):
+            self.review_config = review_config_raw
+        else:
+            # 如果不是列表,记录警告并使用默认空列表
+            logger.warning(
+                f"review_config类型错误,期望list,实际{type(review_config_raw).__name__},"
+                f"值: {review_config_raw},将使用空列表"
+            )
+            self.review_config = []
+
         self.project_plan_type = file_info.get('project_plan_type', '')
         self.tendency_review_role = file_info.get('tendency_review_role', '')
         self.test_designation_chunk_flag = file_info.get('test_designation_chunk_flag', '')

+ 202 - 3
core/base/workflow_manager.py

@@ -1,16 +1,22 @@
 """
 基于LangGraph的工作流管理器
 负责任务的创建、编排和执行,使用LangGraph进行状态管理
+
+新增功能:
+- 任务终止管理
+- 终止信号设置和检测
 """
 
 import asyncio
+import time
 from typing import Dict, Optional
 from datetime import datetime
 from foundation.observability.logger.loggering import server_logger as logger
 from foundation.observability.monitoring.time_statistics import track_execution_time
+from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
 from .progress_manager import ProgressManager
 from .redis_duplicate_checker import RedisDuplicateChecker
-from .task_models import TaskFileInfo, TaskChain  
+from .task_models import TaskFileInfo, TaskChain
 from ..construction_review.workflows import DocumentWorkflow,AIReviewWorkflow
 
 class ProgressManagerRegistry:
@@ -47,13 +53,17 @@ class WorkflowManager:
         self.review_semaphore = asyncio.Semaphore(max_concurrent_reviews)
 
         # 服务组件
-        self.progress_manager = ProgressManager()  
+        self.progress_manager = ProgressManager()
         self.redis_duplicate_checker = RedisDuplicateChecker()
 
         # 活跃任务跟踪
         self.active_chains: Dict[str, TaskChain] = {}
         self._cleanup_task_started = False
 
+        # 任务终止管理
+        self._terminate_signal_prefix = "ai_review:terminate_signal:"
+        self._task_expire_time = 7200  # 2小时
+
     async def submit_task_processing(self, file_info: dict) -> str:
         """异步提交任务处理(用于file_upload层)"""
         from foundation.infrastructure.messaging.tasks import submit_task_processing_task
@@ -241,4 +251,193 @@ class WorkflowManager:
         finally:
             # 清理活跃任务
             if task_chain.callback_task_id in self.active_chains:
-                del self.active_chains[task_chain.callback_task_id]
+                del self.active_chains[task_chain.callback_task_id]
+
+    # ==================== 任务终止管理方法 ====================
+
+    async def set_terminate_signal(self, callback_task_id: str, operator: str = "unknown") -> Dict[str, any]:
+        """
+        设置任务终止信号
+
+        Args:
+            callback_task_id: 任务回调ID
+            operator: 操作人(用户ID或系统标识)
+
+        Returns:
+            Dict: 操作结果 {"success": bool, "message": str, "task_info": dict}
+
+        Note:
+            将终止信号写入 Redis,支持跨进程检测
+            AI审查节点在执行前会检查此信号
+        """
+        try:
+            # 检查任务是否在活跃列表中
+            if callback_task_id not in self.active_chains:
+                return {
+                    "success": False,
+                    "message": f"任务不存在或已完成: {callback_task_id}",
+                    "task_info": None
+                }
+
+            task_chain = self.active_chains[callback_task_id]
+
+            # 检查任务状态
+            if task_chain.status != "processing":
+                return {
+                    "success": False,
+                    "message": f"任务状态不是 processing,无需终止: {callback_task_id} (当前状态: {task_chain.status})",
+                    "task_info": {
+                        "callback_task_id": callback_task_id,
+                        "status": task_chain.status,
+                        "file_name": task_chain.file_name
+                    }
+                }
+
+            # 设置 Redis 终止信号
+            redis_client = await RedisConnectionFactory.get_connection()
+            terminate_key = f"{self._terminate_signal_prefix}{callback_task_id}"
+
+            # 存储终止信号和操作人、时间
+            terminate_data = {
+                "operator": operator,
+                "terminate_time": str(time.time()),
+                "task_id": callback_task_id
+            }
+
+            # 使用 hash 存储更多信息
+            await redis_client.hset(terminate_key, mapping=terminate_data)
+            # 设置过期时间(2小时)
+            await redis_client.expire(terminate_key, self._task_expire_time)
+
+            logger.info(f"已设置终止信号: {callback_task_id} (操作人: {operator}, 文件: {task_chain.file_name})")
+
+            return {
+                "success": True,
+                "message": f"终止信号已设置,任务将在当前节点完成后终止",
+                "task_info": {
+                    "callback_task_id": callback_task_id,
+                    "file_id": task_chain.file_id,
+                    "file_name": task_chain.file_name,
+                    "user_id": task_chain.user_id,
+                    "status": task_chain.status,
+                    "current_stage": task_chain.current_stage
+                }
+            }
+
+        except Exception as e:
+            logger.error(f"设置终止信号失败: {str(e)}", exc_info=True)
+            return {
+                "success": False,
+                "message": f"设置终止信号失败: {str(e)}",
+                "task_info": None
+            }
+
+    async def check_terminate_signal(self, callback_task_id: str) -> bool:
+        """
+        检查是否有终止信号
+
+        Args:
+            callback_task_id: 任务回调ID
+
+        Returns:
+            bool: 有终止信号返回 True
+
+        Note:
+            从 Redis 读取终止信号
+            工作流节点在执行前调用此方法检查是否需要终止
+        """
+        try:
+            redis_client = await RedisConnectionFactory.get_connection()
+            terminate_key = f"{self._terminate_signal_prefix}{callback_task_id}"
+
+            # 检查键是否存在
+            exists = await redis_client.exists(terminate_key)
+
+            if exists:
+                # 读取终止信息
+                terminate_info = await redis_client.hgetall(terminate_key)
+                logger.warning(f"检测到终止信号: {callback_task_id}, 操作人: {terminate_info.get(b'operator', b'unknown').decode()}")
+                return True
+
+            return False
+
+        except Exception as e:
+            logger.error(f"检查终止信号失败: {str(e)}", exc_info=True)
+            return False
+
+    async def clear_terminate_signal(self, callback_task_id: str):
+        """
+        清理 Redis 中的终止信号
+
+        Args:
+            callback_task_id: 任务回调ID
+        """
+        try:
+            redis_client = await RedisConnectionFactory.get_connection()
+            terminate_key = f"{self._terminate_signal_prefix}{callback_task_id}"
+            await redis_client.delete(terminate_key)
+            logger.debug(f"清理终止信号: {callback_task_id}")
+        except Exception as e:
+            logger.warning(f"清理终止信号失败: {str(e)}")
+
+    async def get_active_tasks(self) -> list:
+        """
+        获取活跃任务列表
+
+        Returns:
+            list: 活跃任务信息列表
+        """
+        try:
+            active_tasks = []
+            current_time = time.time()
+
+            for task_id, task_chain in self.active_chains.items():
+                if task_chain.status == "processing":
+                    task_info = {
+                        "callback_task_id": task_id,
+                        "file_id": task_chain.file_id,
+                        "file_name": task_chain.file_name,
+                        "user_id": task_chain.user_id,
+                        "status": task_chain.status,
+                        "current_stage": task_chain.current_stage,
+                        "start_time": task_chain.start_time,
+                        "running_duration": int(current_time - task_chain.start_time) if task_chain.start_time else 0
+                    }
+                    active_tasks.append(task_info)
+
+            return active_tasks
+
+        except Exception as e:
+            logger.error(f"获取活跃任务列表失败: {str(e)}", exc_info=True)
+            return []
+
+    async def get_task_info(self, callback_task_id: str) -> Optional[Dict]:
+        """
+        获取任务信息
+
+        Args:
+            callback_task_id: 任务回调ID
+
+        Returns:
+            Optional[Dict]: 任务信息字典,不存在返回 None
+        """
+        try:
+            task_chain = self.active_chains.get(callback_task_id)
+            if task_chain:
+                current_time = time.time()
+                return {
+                    "callback_task_id": callback_task_id,
+                    "file_id": task_chain.file_id,
+                    "file_name": task_chain.file_name,
+                    "user_id": task_chain.user_id,
+                    "status": task_chain.status,
+                    "current_stage": task_chain.current_stage,
+                    "start_time": task_chain.start_time,
+                    "running_duration": int(current_time - task_chain.start_time) if task_chain.start_time else 0,
+                    "results": task_chain.results
+                }
+            return None
+
+        except Exception as e:
+            logger.error(f"获取任务信息失败: {str(e)}", exc_info=True)
+            return None

+ 238 - 60
core/construction_review/component/ai_review_engine.py

@@ -49,6 +49,7 @@
 import asyncio
 import concurrent.futures
 import json
+import os
 import time
 from dataclasses import dataclass
 from enum import Enum
@@ -58,7 +59,7 @@ from core.base.task_models import TaskFileInfo
 from core.construction_review.component.infrastructure.milvus import MilvusConfig, MilvusManager
 from core.construction_review.component.infrastructure.parent_tool import (
     enhance_with_parent_docs,
-    extract_first_result
+    extract_query_pairs_results
 )
 from core.construction_review.component.infrastructure.relevance import is_relevant_async
 from core.construction_review.component.reviewers.base_reviewer import BaseReviewer
@@ -189,7 +190,7 @@ class AIReviewEngine(BaseReviewer):
     
 
     async def basic_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any],
-                                   review_location_label: str,state:str,stage_name:str) -> Dict[str, Any]:
+                                   review_location_label: str,state:Dict[str, Any],stage_name:str) -> Dict[str, Any]:
         """
         基础合规性检查
 
@@ -321,10 +322,14 @@ class AIReviewEngine(BaseReviewer):
             'completeness_check': completeness_result,
         }
     async def technical_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any],
-                                      review_location_label: str,state:str,stage_name:str) -> Dict[str, Any]:
+                                      review_location_label: str,state:Dict[str, Any],stage_name:str) -> Dict[str, Any]:
         """
         技术性合规检查(包含RAG增强审查)
 
+        支持基于 entity_results 列表的动态审查任务创建:
+        - entity_results 列表中每个查询对都会创建独立的审查任务
+        - 支持参数性和非参数性审查的动态任务创建
+
         Args:
             trace_id_idx: 追踪ID索引
             unit_content: 待审查单元内容
@@ -338,53 +343,121 @@ class AIReviewEngine(BaseReviewer):
         logger.info(f"开始技术性合规检查,内容长度: {len(review_content)}")
 
         # 先执行RAG增强检索,获取相关标准文档作为参考
-        logger.info(f"检查审查项列表:{self.task_info.get_review_config_list():}")  
+        logger.info(f"检查审查项列表:{self.task_info.get_review_config_list():}")
+        entity_results = []  # 初始化 entity_results 列表
+
         if 'non_parameter_compliance_check' in self.task_info.get_review_config_list() or 'parameter_compliance_check' in self.task_info.get_review_config_list():
             logger.info("执行专业性审查,开始RAG增强检索")
             rag_result = self.rag_enhanced_check(unit_content)
-            review_references = rag_result.get('text_content', '')
-            reference_source = rag_result.get('file_name', '')
-        else:
-            logger.info("未执行专业性审查,跳过RAG增强检索")
-            review_references = None
-            reference_source = None
+            entity_results = rag_result.get('entity_results', [])
+            logger.info(f"[RAG增强] 获取到 {len(entity_results)} 个查询对结果")
+
+            # 如果有 entity_results,记录详细信息
+            if entity_results:
+                for idx, entity_item in enumerate(entity_results):
+                    logger.info(f"[RAG增强] 查询对 {idx}: entity={entity_item.get('entity')}, combined_query={entity_item.get('combined_query')[:50]}...")
+
         async def check_with_semaphore(check_func, **kwargs):
             async with self.semaphore:
                 return await check_func(**kwargs)
 
         # 根据配置动态创建技术性检查任务
         technical_tasks = []
-        task_mapping = []  
+        task_mapping = []
 
         TASK_TIMEOUT = 150
 
-        if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
-            task_mapping.append('non_parameter_compliance')
-            technical_tasks.append(
-                asyncio.create_task(
-                    asyncio.wait_for(
-                        check_with_semaphore(self.check_non_parameter_compliance, trace_id_idx=trace_id_idx,
-                                           review_content=review_content, review_references=review_references,
-                                           reference_source=reference_source, review_location_label=review_location_label,
-                                           state=state, stage_name=stage_name),
-                        timeout=TASK_TIMEOUT
+        # 判断是否需要基于 entity_results 创建动态任务
+        if entity_results and len(entity_results) > 0:
+            logger.info(f"[技术审查] 基于 entity_results 创建动态审查任务,共 {len(entity_results)} 个查询对")
+
+            # 为每个查询对创建独立的审查任务
+            for idx, entity_item in enumerate(entity_results):
+                combined_query = entity_item.get('combined_query', '')
+                entity = entity_item.get('entity', f'entity_{idx}')
+                text_content = entity_item.get('text_content', '')
+                file_name = entity_item.get('file_name', '')
+
+                logger.info(f"[技术审查] 为查询对 {idx} ({entity}) 创建审查任务")
+
+                # 根据配置创建参数性或非参数性审查任务
+                if 'parameter_compliance_check' in self.task_info.get_review_config_list():
+                    # 参数性合规检查
+                    task_mapping.append(f'parameter_compliance_{idx}')
+                    technical_tasks.append(
+                        asyncio.create_task(
+                            asyncio.wait_for(
+                                check_with_semaphore(
+                                    self.check_parameter_compliance,
+                                    trace_id_idx=trace_id_idx,
+                                    review_content=review_content,
+                                    review_references=text_content,  # 使用查询对的检索结果作为参考
+                                    reference_source=file_name,
+                                    review_location_label=review_location_label,
+                                    state=state,
+                                    stage_name=stage_name,
+                                    entity_query=combined_query  # 传入组合查询条文
+                                ),
+                                timeout=TASK_TIMEOUT
+                            )
+                        )
+                    )
+
+                if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
+                    # 非参数性合规检查
+                    task_mapping.append(f'non_parameter_compliance_{idx}')
+                    technical_tasks.append(
+                        asyncio.create_task(
+                            asyncio.wait_for(
+                                check_with_semaphore(
+                                    self.check_non_parameter_compliance,
+                                    trace_id_idx=trace_id_idx,
+                                    review_content=review_content,
+                                    review_references=text_content,  # 使用查询对的检索结果作为参考
+                                    reference_source=file_name,
+                                    review_location_label=review_location_label,
+                                    state=state,
+                                    stage_name=stage_name,
+                                    entity_query=combined_query  # 传入组合查询条文
+                                ),
+                                timeout=TASK_TIMEOUT
+                            )
+                        )
+                    )
+
+            logger.info(f"[技术审查] 总共创建了 {len(technical_tasks)} 个动态审查任务")
+
+        else:
+            # 没有entity_results或未配置专业性审查,使用原有逻辑
+            logger.info("[技术审查] 使用通用审查模式(未使用 entity_results)")
+
+            if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
+                task_mapping.append('non_parameter_compliance')
+                technical_tasks.append(
+                    asyncio.create_task(
+                        asyncio.wait_for(
+                            check_with_semaphore(self.check_non_parameter_compliance, trace_id_idx=trace_id_idx,
+                                               review_content=review_content, review_references=None,
+                                               reference_source=None, review_location_label=review_location_label,
+                                               state=state, stage_name=stage_name),
+                            timeout=TASK_TIMEOUT
+                        )
                     )
                 )
-            )
 
-        if 'parameter_compliance_check' in self.task_info.get_review_config_list():
-            task_mapping.append('parameter_compliance')
-            technical_tasks.append(
-                asyncio.create_task(
-                    asyncio.wait_for(
-                        check_with_semaphore(self.check_parameter_compliance, trace_id_idx=trace_id_idx,
-                                           review_content=review_content, review_references=review_references,
-                                           reference_source=reference_source, review_location_label=review_location_label,
-                                           state=state, stage_name=stage_name),
-                        timeout=TASK_TIMEOUT
+            if 'parameter_compliance_check' in self.task_info.get_review_config_list():
+                task_mapping.append('parameter_compliance')
+                technical_tasks.append(
+                    asyncio.create_task(
+                        asyncio.wait_for(
+                            check_with_semaphore(self.check_parameter_compliance, trace_id_idx=trace_id_idx,
+                                               review_content=review_content, review_references=None,
+                                               reference_source=None, review_location_label=review_location_label,
+                                               state=state, stage_name=stage_name),
+                            timeout=TASK_TIMEOUT
+                        )
                     )
                 )
-            )
 
         # 一次性执行所有任务,避免重复协程调用
         if not technical_tasks:
@@ -394,7 +467,7 @@ class AIReviewEngine(BaseReviewer):
             }
 
         # 使用 asyncio.wait 替代 gather,提供更好的超时控制
-        # 整体超时时间 = 单个任务超时 + 缓冲时间
+        # 整体超时时间 = 单个任务超时 × 任务数量 + 缓冲时间
         total_timeout = TASK_TIMEOUT * len(technical_tasks) + 10
 
         done, pending = await asyncio.wait(technical_tasks, timeout=total_timeout)
@@ -417,25 +490,83 @@ class AIReviewEngine(BaseReviewer):
                 results.append(e)
 
         # 根据配置项分配结果
-        non_parameter_result = self._process_review_result(None)
-        parameter_result = self._process_review_result(None)
+        # 判断是否使用 entity_results 模式(多查询对模式)
+        if entity_results and len(entity_results) > 0:
+            # 多查询对模式:按查询对分组处理所有结果
+            logger.info(f"[技术审查] 处理多查询对审查结果,共 {len(entity_results)} 个查询对")
+
+            entity_review_results = []
+            result_index = 0
+
+            # 遍历每个查询对,提取对应的审查结果
+            for idx, entity_item in enumerate(entity_results):
+                entity_review_item = {
+                    'entity': entity_item.get('entity', f'entity_{idx}'),
+                    'combined_query': entity_item.get('combined_query', ''),
+                    'file_name': entity_item.get('file_name', ''),
+                    'text_content': entity_item.get('text_content', ''),
+                    'bfp_rerank_score': entity_item.get('bfp_rerank_score', 0.0)
+                }
 
-        result_index = 0
+                # 提取非参数性审查结果(如果配置了)
+                if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
+                    if result_index < len(results):
+                        entity_review_item['non_parameter_compliance'] = self._process_review_result(
+                            results[result_index]
+                        )
+                        logger.info(f"[技术审查] 查询对 {idx} 非参数性审查结果已处理")
+                        result_index += 1
+                    else:
+                        logger.warning(f"[技术审查] 查询对 {idx} 缺少非参数性审查结果")
+                        entity_review_item['non_parameter_compliance'] = self._process_review_result(None)
+
+                # 提取参数性审查结果(如果配置了)
+                if 'parameter_compliance_check' in self.task_info.get_review_config_list():
+                    if result_index < len(results):
+                        entity_review_item['parameter_compliance'] = self._process_review_result(
+                            results[result_index]
+                        )
+                        logger.info(f"[技术审查] 查询对 {idx} 参数性审查结果已处理")
+                        result_index += 1
+                    else:
+                        logger.warning(f"[技术审查] 查询对 {idx} 缺少参数性审查结果")
+                        entity_review_item['parameter_compliance'] = self._process_review_result(None)
 
-        if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
-            if result_index < len(results):
-                non_parameter_result = self._process_review_result(results[result_index])
-            result_index += 1
+                entity_review_results.append(entity_review_item)
 
-        if 'parameter_compliance_check' in self.task_info.get_review_config_list():
-            if result_index < len(results):
-                parameter_result = self._process_review_result(results[result_index])
-            result_index += 1
+            logger.info(f"[技术审查] 成功处理 {len(entity_review_results)} 个查询对的审查结果")
 
-        return {
-            'non_parameter_compliance': non_parameter_result,
-            'parameter_compliance': parameter_result
-        }
+            return {
+                'review_mode': 'entity_based',  # 标记为基于查询对的审查模式
+                'entity_review_results': entity_review_results,
+                'total_entities': len(entity_results),
+                'total_results_processed': result_index
+            }
+
+        else:
+            # 通用审查模式(未使用 entity_results)
+            logger.info("[技术审查] 处理通用审查模式结果")
+
+            non_parameter_result = self._process_review_result(None)
+            parameter_result = self._process_review_result(None)
+
+            result_index = 0
+
+            if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
+                if result_index < len(results):
+                    non_parameter_result = self._process_review_result(results[result_index])
+                result_index += 1
+
+            if 'parameter_compliance_check' in self.task_info.get_review_config_list():
+                if result_index < len(results):
+                    parameter_result = self._process_review_result(results[result_index])
+                result_index += 1
+
+            return {
+                'review_mode': 'general',  # 标记为通用审查模式
+                'non_parameter_compliance': non_parameter_result,
+                'parameter_compliance': parameter_result
+            }
 
     def rag_enhanced_check(self, unit_content: Dict[str, Any]) -> Dict[str, Any]:
         """
@@ -462,6 +593,12 @@ class AIReviewEngine(BaseReviewer):
 
         bfp_result_lists = entity_enhance.entities_enhance_retrieval(query_pairs)
 
+        # 🔍 保存关键节点结果(用于对比分析)
+        os.makedirs("temp/ai_review_engine", exist_ok=True)
+        with open("temp/ai_review_engine/bfp_result_lists.json", "w", encoding='utf-8') as f:
+            json.dump(bfp_result_lists, f, ensure_ascii=False, indent=4)
+        logger.info("[RAG增强] ✅ 已保存 bfp_result_lists 到 temp/ai_review_engine/bfp_result_lists.json")
+
         # Step 3: 检查检索结果
         if not bfp_result_lists:
             logger.warning("[RAG增强] 实体检索未返回结果")
@@ -479,9 +616,10 @@ class AIReviewEngine(BaseReviewer):
             enhanced_results = enhancement_result['enhanced_results']
             enhanced_count = enhancement_result['enhanced_count']
 
-            # 保存增强后的结果
-            with open(rf"temp\entity_bfp_recall\enhance_with_parent_docs.json", "w", encoding='utf-8') as f:
+            # 🔍 保存关键节点结果(用于对比分析)
+            with open("temp/ai_review_engine/enhance_with_parent_docs.json", "w", encoding='utf-8') as f:
                 json.dump(enhanced_results, f, ensure_ascii=False, indent=4)
+            logger.info(f"[RAG增强] ✅ 已保存 enhance_with_parent_docs 到 temp/ai_review_engine/enhance_with_parent_docs.json (共{enhanced_count}个)")
 
             logger.info(f"[RAG增强] 成功增强 {enhanced_count} 个结果")
             logger.info(f"[RAG增强] 使用了 {len(enhancement_result['parent_docs'])} 个父文档")
@@ -490,14 +628,32 @@ class AIReviewEngine(BaseReviewer):
             # 失败时使用原始结果
             enhanced_results = bfp_result_lists
 
-        # Step 5: 提取第一个结果返回 (使用增强后的结果)
-        final_result = extract_first_result(enhanced_results)
+        # Step 5: 提取查询对结果(只保留得分>0.8的结果)
+        entity_results = extract_query_pairs_results(enhanced_results, query_pairs, score_threshold=0.8)
 
         # 保存最终结果用于调试
-        with open(rf"temp\entity_bfp_recall\extract_first_result.json", "w", encoding='utf-8') as f:
-            json.dump(final_result, f, ensure_ascii=False, indent=4)
+        # with open(rf"temp\ai_review_engine\extract_query_pairs_results.json", "w", encoding='utf-8') as f:
+        #     json.dump(entity_results, f, ensure_ascii=False, indent=4)
 
-        return final_result
+        # 如果没有结果通过阈值过滤,返回空结果
+        if not entity_results:
+            logger.warning("[RAG增强] 没有结果通过阈值过滤(得分>0.8),返回空结果")
+            return {
+                'vector_search': [],
+                'retrieval_status': 'no_results',
+                'file_name': '',
+                'text_content': '',
+                'metadata': {},
+                'entity_results': [],
+                'total_entities': 0
+            }
+
+        # 返回格式:返回列表形式的 entity_results
+        return {
+            'retrieval_status': 'success',
+            'entity_results': entity_results,
+            'total_entities': len(entity_results)
+        }
 
 
     async def check_grammar(self, trace_id_idx: str, review_content: str, review_references: str,
@@ -817,7 +973,8 @@ class AIReviewEngine(BaseReviewer):
             return result
 
     async def check_non_parameter_compliance(self, trace_id_idx: str, review_content: str, review_references: str,
-                                         reference_source: str, review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
+                                         reference_source: str, review_location_label: str, state: str, stage_name: str,
+                                         entity_query: str = None) -> Dict[str, Any]:
         """
         非参数合规性检查 - 安全相关/强制性条文知识库
 
@@ -829,6 +986,7 @@ class AIReviewEngine(BaseReviewer):
             review_location_label: 审查位置标签
             state: 状态字典
             stage_name: 阶段名称
+            entity_query: 实体组合查询条文(可选,来自 entity_results)
 
         Returns:
             Dict[str, Any]: 非参数合规性检查结果
@@ -836,11 +994,21 @@ class AIReviewEngine(BaseReviewer):
         reviewer_type = Stage.TECHNICAL.value['reviewer_type']
         prompt_name = Stage.TECHNICAL.value['non_parameter']
         trace_id = prompt_name+trace_id_idx
-        return await self.review("non_parameter_compliance_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
+
+        # 如果有 entity_query,拼接到 review_content 前面
+        if entity_query:
+            logger.info(f"[非参数审查] 使用实体查询条文: {entity_query[:100]}...")
+            combined_content = f"【实体查询条文】\n{entity_query}\n\n【待审查内容】\n{review_content}"
+            logger.debug(f"[非参数审查] 组合后内容长度: {len(combined_content)} (原内容: {len(review_content)}, 查询条文: {len(entity_query)})")
+        else:
+            combined_content = review_content
+
+        return await self.review("non_parameter_compliance_check", trace_id, reviewer_type, prompt_name, combined_content, review_references,
                                reference_source, review_location_label, state, stage_name, timeout=45)
 
     async def check_parameter_compliance(self, trace_id_idx: str, review_content: str, review_references: str,
-                                        reference_source: str, review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
+                                        reference_source: str, review_location_label: str, state: str, stage_name: str,
+                                        entity_query: str = None) -> Dict[str, Any]:
         """
         参数合规性检查 - 实体概念/工程术语知识库
 
@@ -852,6 +1020,7 @@ class AIReviewEngine(BaseReviewer):
             review_location_label: 审查位置标签
             state: 状态字典
             stage_name: 阶段名称
+            entity_query: 实体组合查询条文(可选,来自 entity_results)
 
         Returns:
             Dict[str, Any]: 参数合规性检查结果
@@ -859,7 +1028,16 @@ class AIReviewEngine(BaseReviewer):
         reviewer_type = Stage.TECHNICAL.value['reviewer_type']
         prompt_name = Stage.TECHNICAL.value['parameter']
         trace_id = prompt_name+trace_id_idx
-        return await self.review("parameter_compliance_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
+
+        # 如果有 entity_query,拼接到 review_content 前面
+        if entity_query:
+            logger.info(f"[参数审查] 使用实体查询条文: {entity_query[:100]}...")
+            combined_content = f"【实体查询条文】\n{entity_query}\n\n【待审查内容】\n{review_content}"
+            logger.debug(f"[参数审查] 组合后内容长度: {len(combined_content)} (原内容: {len(review_content)}, 查询条文: {len(entity_query)})")
+        else:
+            combined_content = review_content
+
+        return await self.review("parameter_compliance_check", trace_id, reviewer_type, prompt_name, combined_content, review_references,
                                reference_source, review_location_label, state, stage_name, timeout=45)
 
     async def outline_check(self, trace_id_idx: str, outline_content: Dict[str, Any],

+ 109 - 21
core/construction_review/component/infrastructure/parent_tool.py

@@ -201,31 +201,119 @@ def enhance_with_parent_docs(
     }
 
 
-def extract_first_result(bfp_result_lists: List) -> Dict[str, Any]:
+def extract_query_pairs_results(bfp_result_lists: List, query_pairs: List[Dict] = None,
+                                score_threshold: float = 0.8) -> List[Dict[str, Any]]:
     """
-    从检索结果中提取第一个有效结果
+    从检索结果中提取每个查询对的最高得分结果(得分必须大于阈值),返回列表格式
+
+    得分优先级:
+    1. bfp_rerank_score (二次重排序得分,最高优先级)
+    2. rerank_score (第一次重排序得分,回退选项)
 
     Args:
-        bfp_result_lists: 检索结果列表
+        bfp_result_lists: 检索结果列表(二维列表,每个子列表对应一个查询对)
+        query_pairs: 查询对列表(用于映射实体名称、背景、参数)
+        score_threshold: 得分阈值,只返回得分大于该阈值的结果(默认0.8)
 
     Returns:
-        第一个结果的格式化字典
-    """
-    if not bfp_result_lists or not bfp_result_lists[0]:
-        logger.warning("[父文档工具] 第一个查询对无检索结果")
-        return {
-            'vector_search': [],
-            'retrieval_status': 'no_results',
-            'file_name': '',
-            'text_content': '',
-            'metadata': {}
+        List[Dict]: 结果列表,每个元素包含:
+        {
+            'entity': str,              # 实体名称
+            'background': str,          # 背景信息
+            'parameter': str,           # 技术参数
+            'combined_query': str,      # 组合查询条文 (entity + background + parameter)
+            'file_name': str,           # 文件名
+            'text_content': str,        # 检索到的文本内容
+            'metadata': dict,           # 元数据
+            'bfp_rerank_score': float,  # BFP重排序得分
+            'rerank_score': float,      # 重排序得分
+            'final_score': float,       # 最终得分
+            'score_type': str,          # 得分类型
+            'query_index': int          # 查询索引
         }
+        - 如果所有结果得分都低于阈值,返回空列表
+    """
+    if not bfp_result_lists:
+        logger.warning("[父文档工具] 检索结果为空")
+        return []
 
-    first_result = bfp_result_lists[0][0]
-    return {
-        'file_name': first_result['metadata'].get('file_name', 'unknown'),
-        'text_content': first_result['text_content'],
-        'metadata': first_result['metadata'],
-        'retrieval_status': 'success',
-        #'vector_search': bfp_result_lists
-    }
+    logger.info(f"[父文档工具] 开始提取查询对结果,得分阈值: {score_threshold}")
+
+    entity_results = []
+    total_count = 0
+    filtered_count = 0
+
+    for query_idx, results in enumerate(bfp_result_lists):
+        if not results:
+            logger.warning(f"[父文档工具] 查询对 {query_idx} 无检索结果")
+            continue
+
+        total_count += 1
+
+        # 获取查询对信息
+        query_pair = query_pairs[query_idx] if query_pairs and query_idx < len(query_pairs) else {}
+        entity = query_pair.get('entity', '')
+        background = query_pair.get('background', '')
+        parameter = query_pair.get('parameter', '')
+
+        # 组合完整查询条文: 实体 + 背景 + 参数
+        combined_parts = []
+        if entity:
+            combined_parts.append(entity)
+        if background:
+            combined_parts.append(background)
+        if parameter:
+            combined_parts.append(parameter)
+        combined_query = ' '.join(combined_parts)
+
+        # 优先使用结果中的 source_entity,回退到 query_pairs
+        first_result = results[0]
+        entity_name = first_result.get('source_entity', entity or f'query_{query_idx}')
+
+        # 在该查询对的结果中找得分最高的
+        # 优先使用 bfp_rerank_score,如果不存在或为0,则使用 rerank_score
+        best_result = max(results, key=lambda x: (
+            x.get('bfp_rerank_score', 0.0),
+            x.get('rerank_score', 0.0)
+        ))
+
+        # 确定最终使用的得分和类型
+        bfp_score = best_result.get('bfp_rerank_score') or 0.0
+        rerank_score = best_result.get('rerank_score') or 0.0
+
+        # 如果 bfp_rerank_score 有效(>0),使用它;否则使用 rerank_score
+        if bfp_score and bfp_score > 0:
+            final_score = bfp_score
+            score_type = 'bfp_rerank_score'
+        else:
+            final_score = rerank_score
+            score_type = 'rerank_score'
+
+        logger.info(f"[父文档工具] 实体 '{entity_name}' 最优结果: {score_type}={final_score:.6f} (bfp={bfp_score:.6f}, rerank={rerank_score:.6f})")
+
+        # 只保留得分超过阈值的结果
+        if final_score > score_threshold:
+            result_item = {
+                'entity': entity,
+                'background': background,
+                'parameter': parameter,
+                'combined_query': combined_query,
+                'file_name': best_result['metadata'].get('file_name', 'unknown'),
+                'text_content': best_result['text_content'],
+                'metadata': best_result['metadata'],
+                'bfp_rerank_score': bfp_score,
+                'rerank_score': rerank_score,
+                'final_score': final_score,
+                'score_type': score_type,
+                'query_index': query_idx
+            }
+            entity_results.append(result_item)
+            filtered_count += 1
+            logger.info(f"[父文档工具] ✅ 实体 '{entity_name}' 得分 {final_score:.6f} ({score_type}) > {score_threshold},保留结果")
+            logger.info(f"[父文档工具] 组合查询条文: {combined_query}")
+        else:
+            logger.warning(f"[父文档工具] ❌ 实体 '{entity_name}' 得分 {final_score:.6f} ({score_type}) <= {score_threshold},过滤结果")
+
+    logger.info(f"[父文档工具] 提取完成: 总计 {total_count} 个查询对,{filtered_count} 个结果通过阈值过滤")
+
+    return entity_results

+ 52 - 2
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -164,11 +164,50 @@ class InterTool:
         # 合并所有审查结果
         all_results = {}
         if basic_result:
+            logger.info(f"🔍 [DEBUG] basic_result 类型: {type(basic_result)}, 键: {list(basic_result.keys()) if isinstance(basic_result, dict) else 'N/A'}")
             all_results.update(basic_result)
-            
+
         logger.info(f"basic_result:{basic_result}")
         if technical_result:
-            all_results.update(technical_result)
+            logger.info(f"🔍 [DEBUG] technical_result 类型: {type(technical_result)}, 键: {list(technical_result.keys()) if isinstance(technical_result, dict) else 'N/A'}")
+
+            # 检查是否是 entity_based 模式
+            if technical_result.get('review_mode') == 'entity_based' and 'entity_review_results' in technical_result:
+                # entity_based 模式:从 entity_review_results 中提取实际审查结果
+                logger.info(f"🔍 [DEBUG] 检测到 entity_based 模式,从 entity_review_results 提取审查结果")
+                entity_review_results = technical_result.get('entity_review_results', [])
+                total_entities = technical_result.get('total_entities', 0)
+
+                for idx, entity_item in enumerate(entity_review_results):
+                    entity = entity_item.get('entity', f'entity_{idx}')
+                    entity_info = f"{entity}_{idx}"  # 使用 entity+索引 避免重复
+
+                    # 提取非参数性审查结果
+                    if 'non_parameter_compliance' in entity_item:
+                        result_key = f'non_parameter_compliance_{entity_info}'
+                        all_results[result_key] = entity_item['non_parameter_compliance']
+                        logger.info(f"🔍 [DEBUG] 提取审查结果: {result_key}")
+
+                    # 提取参数性审查结果
+                    if 'parameter_compliance' in entity_item:
+                        result_key = f'parameter_compliance_{entity_info}'
+                        all_results[result_key] = entity_item['parameter_compliance']
+                        logger.info(f"🔍 [DEBUG] 提取审查结果: {result_key}")
+
+                logger.info(f"🔍 [DEBUG] entity_based 模式处理完成,共提取 {len(entity_review_results)} 个实体的审查结果")
+
+            else:
+                # general 模式:过滤掉元数据字段,保留实际审查结果
+                filtered_technical = {}
+                metadata_keys = ['review_mode', 'entity_review_results', 'total_entities', 'total_results_processed']
+                for key, value in technical_result.items():
+                    if key not in metadata_keys:
+                        filtered_technical[key] = value
+                    else:
+                        logger.info(f"跳过技术审查元数据字段: {key} = {value} (类型: {type(value).__name__})")
+
+                logger.info(f"🔍 [DEBUG] 过滤后的 technical_result 键: {list(filtered_technical.keys())}")
+                all_results.update(filtered_technical)
 
         logger.info(f"开始格式化审查结果,合并后结果: {list(all_results.keys())}")
 
@@ -179,6 +218,11 @@ class InterTool:
                 logger.info(f"跳过分数字段: {check_key}")
                 continue
 
+            # 🔧 类型安全检查:确保 check_result 是字典类型
+            if not isinstance(check_result, dict):
+                logger.warning(f"⚠️ 检查项 {check_key} 的结果类型不是字典: {type(check_result).__name__}, 跳过")
+                continue
+
             # 检查 check_result 是否为 None 或不包含 details
             if not check_result or "details" not in check_result:
                 logger.warning(f"检查项 {check_key} 结果为空或缺少details字段,跳过")
@@ -193,6 +237,12 @@ class InterTool:
                 logger.info(f"解析检查项 {check_name} 的响应,长度: {len(response)}")
 
                 logger.info(f"检查项测试 {check_name}")
+
+                # 类型安全检查:确保 check_name 是字符串
+                if not isinstance(check_name, str):
+                    logger.warning(f"check_name 类型异常: {type(check_name).__name__}, 值: {check_name}, 将跳过此检查项")
+                    continue
+
                 if check_name in BASIC_CHECK_ITEMS:
                     logger.info(f"检查项 {check_name} 无参考来源,直接解析响应")
                     check_issues = self.parse_ai_review_response(response, check_name)

+ 131 - 33
core/construction_review/workflows/ai_review_workflow.py

@@ -130,6 +130,11 @@ class AIReviewWorkflow:
 
         self.max_review_units = max_review_units
         self.review_mode = review_mode
+
+        # 延迟导入 WorkflowManager(避免循环导入)
+        from core.base.workflow_manager import WorkflowManager
+        self.workflow_manager = WorkflowManager()
+
         self.graph = self._build_workflow()
 
 
@@ -141,8 +146,8 @@ class AIReviewWorkflow:
             StateGraph: 配置完成的LangGraph工作流图实例
 
         Note:
-            创建包含开始、初始化进度、AI审查、完成和错误处理节点的完整工作流
-            设置节点间的转换关系和条件边,支持错误处理流程
+            创建包含开始、初始化进度、AI审查、完成、错误处理和终止节点的完整工作流
+            设置节点间的转换关系和条件边,支持错误处理流程和任务终止流程
         """
         workflow = StateGraph(AIReviewState)
         workflow.add_node("start", self._start_node)
@@ -151,20 +156,25 @@ class AIReviewWorkflow:
         workflow.add_node("save_results", self._save_results_node)
         workflow.add_node("complete", self._complete_node)
         workflow.add_node("error_handler", self._error_handler_node)
+        workflow.add_node("terminate", self._terminate_node)  # 新增终止节点
+
         workflow.set_entry_point("start")
         workflow.add_edge("start", "initialize_progress")
         workflow.add_edge("initialize_progress", "ai_review")
+
         # 删除默认边,由条件边控制路由
         # workflow.add_edge("ai_review", "save_results")
         workflow.add_edge("save_results", "complete")
         workflow.add_edge("complete", END)
         workflow.add_edge("error_handler", END)
+        workflow.add_edge("terminate", END)  # 终止节点也到 END
 
-        # 添加条件边(错误处理)- 替代默认边
+        # 添加条件边(错误处理 + 终止检查)- 替代默认边
         workflow.add_conditional_edges(
             "ai_review",
-            self.inter_tool._check_ai_review_result,
+            self._should_terminate_or_error,
             {
+                "terminate": "terminate",  # 新增终止路径
                 "success": "save_results",
                 "error": "error_handler"
             }
@@ -269,6 +279,16 @@ class AIReviewWorkflow:
         """
         try:
             logger.info(f"AI审查节点开始执行,任务ID: {self.task_info.callback_task_id}")
+
+            # ⚠️ 检查终止信号(执行前)
+            if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
+                logger.warning(f"AI审查节点检测到终止信号,任务ID: {state['callback_task_id']}")
+                return {
+                    "status": "terminated",
+                    "current_stage": "ai_review",
+                    "messages": [AIMessage(content="检测到终止信号")]
+                }
+
             test_designation_chunk_flag = self.task_info.get_test_designation_chunk_flag()
             logger.info(f"测试定位标志: {test_designation_chunk_flag}")
 
@@ -299,12 +319,22 @@ class AIReviewWorkflow:
 
             logger.info(f"开始核心审查,任务ID: {state['callback_task_id']}")
             await self.core_fun._send_start_review_progress(state, total_units,'core_review')
-            # 2. 执行基础并发审查
+            # 2. 执行基础并发审查(内部会检测终止信号)
             logger.info(f"开始执行并发审查,任务ID: {state['callback_task_id']}")
-            successful_results = await self.core_fun._execute_concurrent_reviews(review_chunks, total_units, state)
+            successful_results = await self.core_fun._execute_concurrent_reviews(
+                review_chunks, total_units, state, check_terminate=True
+            )
             logger.info(f"并发审查完成,成功结果: {len(successful_results)}, 任务ID: {state['callback_task_id']}")
 
-            
+            # ⚠️ 再次检查终止信号(并发审查后)
+            if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
+                logger.warning(f"AI审查节点并发审查后检测到终止信号,任务ID: {state['callback_task_id']}")
+                return {
+                    "status": "terminated",
+                    "current_stage": "ai_review",
+                    "messages": [AIMessage(content="检测到终止信号")]
+                }
+
 
             # 开始大纲审查
             await self.core_fun._send_start_review_progress(state, total_units,'outline')
@@ -560,6 +590,69 @@ class AIReviewWorkflow:
             "messages": [AIMessage(content=f"错误处理: {state['error_message']}")]
         }
 
+    async def _terminate_node(self, state: AIReviewState) -> AIReviewState:
+        """
+        终止节点 - 处理任务终止请求
+
+        Args:
+            state: AI审查工作流状态
+
+        Returns:
+            AIReviewState: 更新后的工作流状态,标记为已终止
+
+        Note:
+            当检测到终止信号时,工作流跳转到此节点
+            设置状态为terminated,清理资源,通过进度管理器发送终止通知
+        """
+        logger.warning(f"AI审查任务已终止: {state['callback_task_id']}")
+
+        # 更新终止状态
+        if state["progress_manager"]:
+            await state["progress_manager"].update_stage_progress(
+                callback_task_id=state["callback_task_id"],
+                stage_name="AI审查",
+                current=state.get("current", 0),
+                status="terminated",
+                message="任务已被用户终止",
+                overall_task_status="terminated",
+                event_type="terminated"
+            )
+
+        # 清理 Redis 终止信号
+        await self.workflow_manager.clear_terminate_signal(state["callback_task_id"])
+
+        return {
+            "status": "terminated",
+            "current_stage": "terminated",
+            "messages": [AIMessage(content="任务已被终止")]
+        }
+
+    def _should_terminate_or_error(self, state: AIReviewState) -> str:
+        """
+        检查是否应该终止或发生错误
+
+        Args:
+            state: AI审查工作流状态
+
+        Returns:
+            str: "terminate", "success", 或 "error"
+
+        Note:
+            1. 优先检查终止信号
+            2. 检查是否有错误
+            3. 都没有则返回 success
+        """
+        # 同步检查状态字段(避免在条件边中使用异步)
+        if state.get("status") == "terminated":
+            return "terminate"
+
+        # 检查是否有错误消息
+        if state.get("error_message"):
+            return "error"
+
+        # 默认返回成功
+        return "success"
+
     def _get_workflow_graph(self):
         """获取工作流图(可视化输出)"""
         try:
@@ -678,7 +771,8 @@ class AIReviewCoreFun:
         
 
     async def _execute_concurrent_reviews(self, review_chunks: List[Dict[str, Any]],
-                                          total_units: int, state: AIReviewState) -> List[Dict[str, Any]]:
+                                          total_units: int, state: AIReviewState,
+                                          check_terminate: bool = False) -> List[Dict[str, Any]]:
         """
         执行并发审查
 
@@ -686,21 +780,31 @@ class AIReviewCoreFun:
             review_chunks: 审查单元列表
             total_units: 总单元数
             state: AI审查状态
+            check_terminate: 是否检查终止信号(默认False)
 
         Returns:
             List[Dict[str, Any]]: 审查结果列表(issues格式)
         """
-        
+
         try:
+            # 获取 workflow_manager 实例(延迟导入避免循环依赖)
+            from core.base.workflow_manager import WorkflowManager
+            workflow_manager = WorkflowManager()
 
             semaphore = asyncio.Semaphore(3)  # 并发审查数
 
             async def process_unit_and_notify(unit_index, unit_content):
                 """处理单个单元,完成后立即推送通知"""
                 async with semaphore:
+                    # ⚠️ 检查终止信号(每个单元审查前)
+                    if check_terminate:
+                        if await workflow_manager.check_terminate_signal(state["callback_task_id"]):
+                            logger.warning(f"并发审查检测到终止信号,停止后续审查: {state['callback_task_id']}")
+                            return None  # 返回 None 表示被终止
+
                     # 执行单个单元审查
                     result = await self._review_single_unit(unit_content, unit_index, total_units, state)
-
+                    
                     # 审查完成后立即推送通知
                     if result.overall_risk != "error":
                         section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
@@ -907,33 +1011,27 @@ class AIReviewCoreFun:
                 ),
             ]
 
-            # 使用 asyncio.wait 替代 gather,提供更好的超时控制
+            # 使用 asyncio.gather 保证结果顺序提供超时控制
             # 整体超时 = 两个任务的超时之和 + 缓冲时间
             total_timeout = REVIEW_TIMEOUT * len(review_tasks) + 10
 
-            done, pending = await asyncio.wait(review_tasks, timeout=total_timeout)
-
-            # 取消未完成的任务
-            for task in pending:
-                task.cancel()
-                logger.warning(f"[工作流] 审查任务超时,已取消: trace_id={trace_id_idx}")
-
-            # 收集结果
-            review_results = []
-            for task in done:
-                try:
-                    result = task.result()
-                    review_results.append(result)
-                except Exception as e:
-                    logger.error(f"[工作流] 审查任务执行失败: {str(e)}, trace_id={trace_id_idx}")
-                    review_results.append(e)
-
-            # 确保有两个结果(基础审查和技术审查)
-            while len(review_results) < 2:
-                review_results.append(Exception("Task not executed"))
-            # 处理异常结果
+            try:
+                # 使用 gather 确保结果顺序与任务顺序一致
+                review_results = await asyncio.wait_for(
+                    asyncio.gather(*review_tasks, return_exceptions=True),
+                    timeout=total_timeout
+                )
+            except asyncio.TimeoutError:
+                logger.error(f"[工作流] 审查任务整体超时: trace_id={trace_id_idx}")
+                # 填充超时结果
+                review_results = [
+                    Exception(f"基础审查超时({REVIEW_TIMEOUT}秒)"),
+                    Exception(f"技术审查超时({REVIEW_TIMEOUT}秒)")
+                ]
+
+            # 处理异常结果(gather 已经将异常作为结果返回)
             basic_result = review_results[0] if not isinstance(review_results[0], Exception) else {"error": str(review_results[0])}
-            technical_result = review_results[1] if len(review_results) > 1 and not isinstance(review_results[1], Exception) else {"error": str(review_results[1]) if len(review_results) > 1 else "No result"}
+            technical_result = review_results[1] if not isinstance(review_results[1], Exception) else {"error": str(review_results[1])}
 
             # RAG检查已注释,提供空结果
             rag_result = {"error": "RAG check disabled"}

+ 2 - 1
data_pipeline/RAG_recall/rag_miluvs/foundation/observability/logger/loggering.py

@@ -83,7 +83,8 @@ class CompatibleLogger(logging.Logger):
                 mode='a',
                 maxBytes=self.file_max_bytes,
                 backupCount=self.backup_count,
-                encoding='utf-8'
+                encoding='utf-8',
+                delay=True  # ✅ 延迟打开文件,避免Windows下文件占用问题
             )
             handler.setLevel(level)  # 设置级别为对应文件级别
             handler.setFormatter(self.formatter)

+ 11 - 2
foundation/ai/rag/retrieval/entities_enhance.py

@@ -12,7 +12,7 @@ class EntitiesEnhance():
         self.save_path = "temp\entity_bfp_recall\entity_bfp_recall.json"
         self.bfp_result_lists = []
     @track_execution_time
-    def entities_enhance_retrieval(self,query_pairs):
+    def entities_enhance_retrieval(self, query_pairs):
         def run_async(coro):
             """在合适的环境中运行异步函数"""
             try:
@@ -24,6 +24,9 @@ class EntitiesEnhance():
             except RuntimeError:
                 return asyncio.run(coro)
 
+        # 清空之前的结果
+        self.bfp_result_lists = []
+
         for query_pair in query_pairs:
             entity = query_pair['entity']
             search_keywords = query_pair['search_keywords']
@@ -38,8 +41,14 @@ class EntitiesEnhance():
 
             # BFP背景增强召回
             bfp_result = run_async(retrieval_manager.async_bfp_recall(entity_list, background, top_k=3))
+
+            # 为每个结果添加实体信息
+            for result in bfp_result:
+                result['source_entity'] = entity
+
             self.bfp_result_lists.append(bfp_result)
-        self.test_file(self.bfp_result_lists,seve=True)
+
+        self.test_file(self.bfp_result_lists, seve=True)
         return self.bfp_result_lists
             
 

+ 3 - 3
foundation/ai/rag/retrieval/retrieval.py

@@ -248,8 +248,8 @@ class RetrievalManager:
 
         # BFP召回结果已经通过multi_stage_recall进行了重排序,保持原有顺序
         # 只对第一次重排序得分大于0.8的文档进行二次重排序
-        high_score_results = [item for item in bfp_results if item.get('rerank_score', 0) > 0.8]
-        low_score_results = [item for item in bfp_results if item.get('rerank_score', 0) <= 0.8]
+        high_score_results = [item for item in bfp_results if (item.get('rerank_score') or 0) > 0.8]
+        low_score_results = [item for item in bfp_results if (item.get('rerank_score') or 0) <= 0.8]
 
         self.logger.info(f"筛选结果:高分文档(>0.8) {len(high_score_results)} 个,低分文档(≤0.8) {len(low_score_results)} 个")
 
@@ -257,7 +257,7 @@ class RetrievalManager:
         if not high_score_results:
             self.logger.info(f"没有得分大于0.8的文档,跳过二次重排序,返回top_k={top_k}个结果(按hybrid_similarity排序)")
             # 按 hybrid_similarity 降序排序,返回 top_k 个
-            sorted_results = sorted(bfp_results, key=lambda x: x.get('hybrid_similarity', 0), reverse=True)
+            sorted_results = sorted(bfp_results, key=lambda x: x.get('hybrid_similarity') or 0, reverse=True)
             return sorted_results[:top_k]
 
         # 检查background是否为空,如果为空则跳过二次重排序

+ 2 - 1
foundation/observability/logger/loggering.py

@@ -83,7 +83,8 @@ class CompatibleLogger(logging.Logger):
                 mode='a',
                 maxBytes=self.file_max_bytes,
                 backupCount=self.backup_count,
-                encoding='utf-8'
+                encoding='utf-8',
+                delay=True  # ✅ 延迟打开文件,避免Windows下文件占用问题
             )
             handler.setLevel(level)  # 设置级别为对应文件级别
             handler.setFormatter(self.formatter)

+ 2 - 0
server/app.py

@@ -26,6 +26,7 @@ from views.test_views import test_router
 from views.construction_review.file_upload import file_upload_router
 from views.construction_review.review_results import review_results_router
 from views.construction_review.launch_review import launch_review_router
+from views.construction_review.task_control import task_control_router
 
 
 class ServerUtils:
@@ -88,6 +89,7 @@ class RouteManager:
         self.app.include_router(file_upload_router)
         self.app.include_router(review_results_router)
         self.app.include_router(launch_review_router)
+        self.app.include_router(task_control_router)  # 新增:任务控制路由
 
     def _setup_exception_handlers(self):
         """配置全局异常处理"""

+ 78 - 0
utils_test/RAG_Test/patch_enhanced_results.py

@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+修复旧的 enhanced_results,添加 source_entity 字段
+"""
+
+import sys
+import os
+import json
+
+# 添加项目根目录到路径
+project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+sys.path.insert(0, project_root)
+
+
+def patch_enhanced_results():
+    """为旧的 enhanced_results 添加 source_entity 字段"""
+
+    enhanced_path = os.path.join(project_root, "temp", "entity_bfp_recall", "enhance_with_parent_docs.json")
+    pipeline_path = os.path.join(project_root, "temp", "entity_bfp_recall", "rag_pipeline_data.json")
+
+    # 加载数据
+    with open(enhanced_path, 'r', encoding='utf-8') as f:
+        enhanced_results = json.load(f)
+
+    with open(pipeline_path, 'r', encoding='utf-8') as f:
+        pipeline_data = json.load(f)
+
+    # 提取 query_pairs
+    query_extract_step = pipeline_data.get('steps', {}).get('1_query_extract', {})
+    query_pairs = query_extract_step.get('output', {}).get('query_pairs', [])
+
+    if not query_pairs:
+        print("❌ 未找到 query_pairs 信息")
+        return
+
+    print(f"✅ 找到 {len(query_pairs)} 个查询对:")
+    for idx, qp in enumerate(query_pairs):
+        entity = qp.get('entity', 'N/A')
+        print(f"   - 查询对 {idx}: entity='{entity}'")
+
+    # 为每个结果添加 source_entity 字段
+    print(f"\n🔧 开始修复 enhanced_results...")
+
+    for query_idx, results in enumerate(enhanced_results):
+        if query_idx < len(query_pairs):
+            entity = query_pairs[query_idx].get('entity', f'query_{query_idx}')
+
+            for result in results:
+                if 'source_entity' not in result:
+                    result['source_entity'] = entity
+
+            print(f"   ✅ 查询对 {query_idx} (entity='{entity}'): 已为 {len(results)} 个结果添加 source_entity")
+
+    # 保存修复后的数据
+    backup_path = enhanced_path + ".backup"
+    import shutil
+    shutil.copy(enhanced_path, backup_path)
+    print(f"\n✅ 原文件已备份到: {backup_path}")
+
+    with open(enhanced_path, 'w', encoding='utf-8') as f:
+        json.dump(enhanced_results, f, ensure_ascii=False, indent=4)
+
+    print(f"✅ 修复后的数据已保存到: {enhanced_path}")
+    print("\n🎉 现在可以重新运行 test_extract_modes.py 查看修复后的效果!")
+
+
+if __name__ == "__main__":
+    print("\n" + "="*80)
+    print("🔧 修复 enhanced_results,添加 source_entity 字段")
+    print("="*80 + "\n")
+
+    patch_enhanced_results()
+
+    print("\n" + "="*80)
+    print("✅ 修复完成!")
+    print("="*80 + "\n")

+ 40 - 20
utils_test/RAG_Test/rag_pipeline_web/rag_pipeline_server.py

@@ -22,11 +22,10 @@ sys.path.insert(0, project_root)
 from core.construction_review.component.infrastructure.milvus import MilvusConfig, MilvusManager
 from core.construction_review.component.infrastructure.parent_tool import (
     enhance_with_parent_docs,
-    extract_first_result
+    extract_query_pairs_results
 )
 from foundation.ai.rag.retrieval.entities_enhance import entity_enhance
 from foundation.ai.rag.retrieval.query_rewrite import query_rewrite_manager
-from foundation.ai.rag.retrieval.retrieval import retrieval_manager
 from foundation.observability.logger.loggering import server_logger as logger
 
 # 全局Milvus Manager
@@ -111,6 +110,12 @@ def rag_enhanced_check(query_content: str) -> dict:
         }
     }
 
+    # 🔍 保存关键节点结果(用于对比分析)
+    os.makedirs(os.path.join(project_root, "temp", "rag_pipeline_server"), exist_ok=True)
+    with open(os.path.join(project_root, "temp", "rag_pipeline_server", "bfp_result_lists.json"), "w", encoding='utf-8') as f:
+        json.dump(bfp_result_lists, f, ensure_ascii=False, indent=4)
+    logger.info("[RAG增强] ✅ 已保存 bfp_result_lists 到 temp/rag_pipeline_server/bfp_result_lists.json")
+
     # 检查检索结果
     if not bfp_result_lists:
         logger.warning("[RAG增强] 实体检索未返回结果")
@@ -124,10 +129,6 @@ def rag_enhanced_check(query_content: str) -> dict:
         }
         pipeline_data["total_execution_time"] = round(time.time() - pipeline_data["timestamp"], 3)
 
-        # 保存到文件
-        os.makedirs(os.path.join(project_root, "temp", "entity_bfp_recall"), exist_ok=True)
-        with open(os.path.join(project_root, "temp", "entity_bfp_recall", "rag_pipeline_data.json"), "w", encoding='utf-8') as f:
-            json.dump(pipeline_data, f, ensure_ascii=False, indent=2, default=str)
 
         return pipeline_data
 
@@ -140,7 +141,7 @@ def rag_enhanced_check(query_content: str) -> dict:
         parent_docs = enhancement_result['parent_docs']
 
         # 保存增强后的结果
-        with open(os.path.join(project_root, "temp", "entity_bfp_recall", "enhance_with_parent_docs.json"), "w", encoding='utf-8') as f:
+        with open(os.path.join(project_root, "temp", "rag_pipeline_server", "enhance_with_parent_docs.json"), "w", encoding='utf-8') as f:
             json.dump(enhanced_results, f, ensure_ascii=False, indent=4)
 
         logger.info(f"[RAG增强] 成功增强 {enhanced_count} 个结果")
@@ -167,26 +168,45 @@ def rag_enhanced_check(query_content: str) -> dict:
             "output": {"error": str(e), "enhanced_results": enhanced_results}
         }
 
-    # Step 4: 提取结果
+    # Step 4: 提取结果(按查询对区分,只保留得分>0.8的结果)
     step4_start = time.time()
-    final_result = extract_first_result(enhanced_results) if enhanced_results else {
-        'vector_search': [], 'retrieval_status': 'no_results',
-        'file_name': '', 'text_content': '', 'metadata': {}
-    }
+    entity_results = extract_query_pairs_results(enhanced_results, query_pairs, score_threshold=0.8) if enhanced_results else []
 
-    pipeline_data["steps"]["4_extract_first_result"] = {
-        "name": "结果提取",
+    pipeline_data["steps"]["4_extract_query_pairs_results"] = {
+        "name": "按查询对提取结果",
         "execution_time": round(time.time() - step4_start, 3),
-        "input": {"enhanced_results_count": len(enhanced_results) if enhanced_results else 0},
-        "output": {"final_result": final_result}
+        "input": {
+            "enhanced_results_count": len(enhanced_results) if enhanced_results else 0,
+            "query_pairs_count": len(query_pairs),
+            "score_threshold": 0.8
+        },
+        "output": {
+            "entity_results": entity_results,
+            "entities_count": len(entity_results)
+        }
     }
 
+    # 构建最终结果
+    if not entity_results:
+        final_result = {
+            'retrieval_status': 'no_results',
+            'entity_results': [],
+            'total_entities': 0,
+            'message': '没有结果通过阈值过滤(得分>0.8)'
+        }
+    else:
+        final_result = {
+            'retrieval_status': 'success',
+            'entity_results': entity_results,
+            'total_entities': len(entity_results)
+        }
+
     pipeline_data["final_result"] = final_result
     pipeline_data["total_execution_time"] = round(time.time() - pipeline_data["timestamp"], 3)
 
     # 保存到文件
-    os.makedirs(os.path.join(project_root, "temp", "entity_bfp_recall"), exist_ok=True)
-    with open(os.path.join(project_root, "temp", "entity_bfp_recall", "rag_pipeline_data.json"), "w", encoding='utf-8') as f:
+    os.makedirs(os.path.join(project_root, "temp", "rag_pipeline_server"), exist_ok=True)
+    with open(os.path.join(project_root, "temp", "rag_pipeline_server", "rag_pipeline_data.json"), "w", encoding='utf-8') as f:
         json.dump(pipeline_data, f, ensure_ascii=False, indent=2, default=str)
 
     return pipeline_data
@@ -203,7 +223,7 @@ class RAGPipelineHandler(SimpleHTTPRequestHandler):
             self.send_json_response({'status': 'ok', 'milvus_ready': milvus_manager is not None})
         elif parsed.path == '/api/data':
             # 返回最新的pipeline数据
-            data_path = os.path.join(project_root, "temp", "entity_bfp_recall", "rag_pipeline_data.json")
+            data_path = os.path.join(project_root, "temp", "rag_pipeline_server", "rag_pipeline_data.json")
             if os.path.exists(data_path):
                 with open(data_path, 'r', encoding='utf-8') as f:
                     data = json.load(f)
@@ -279,7 +299,7 @@ def run_server(port=8765):
     # 切换到项目根目录,确保内部模块的相对路径正确
     os.chdir(project_root)
     # 确保temp目录存在
-    os.makedirs(os.path.join(project_root, "temp", "entity_bfp_recall"), exist_ok=True)
+    os.makedirs(os.path.join(project_root, "temp", "rag_pipeline_server"), exist_ok=True)
 
     # 自定义Handler,指定静态文件目录
     class CustomHandler(RAGPipelineHandler):

+ 197 - 0
utils_test/RAG_Test/test_extract_modes.py

@@ -0,0 +1,197 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+测试 extract_first_result 的两种模式
+"""
+
+import sys
+import os
+import json
+
+# 添加项目根目录到路径
+project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+sys.path.insert(0, project_root)
+
+from core.construction_review.component.infrastructure.parent_tool import extract_first_result
+
+
+def load_enhanced_results():
+    """加载增强后的检索结果"""
+    result_path = os.path.join(project_root, "temp", "entity_bfp_recall", "enhance_with_parent_docs.json")
+
+    if not os.path.exists(result_path):
+        print(f"❌ 文件不存在: {result_path}")
+        print("请先运行 RAG 检索生成该文件")
+        return None
+
+    with open(result_path, 'r', encoding='utf-8') as f:
+        enhanced_results = json.load(f)
+
+    print(f"✅ 成功加载增强结果,共 {len(enhanced_results)} 个查询对")
+    for idx, results in enumerate(enhanced_results):
+        if results:
+            first_entity = results[0].get('source_entity', f'query_{idx}')
+            print(f"   - 查询对 {idx}: entity='{first_entity}', {len(results)} 个结果")
+
+    return enhanced_results
+
+
+def load_query_pairs():
+    """加载查询对"""
+    # 优先从 rag_pipeline_data.json 读取
+    pipeline_path = os.path.join(project_root, "temp", "entity_bfp_recall", "rag_pipeline_data.json")
+
+    if os.path.exists(pipeline_path):
+        with open(pipeline_path, 'r', encoding='utf-8') as f:
+            pipeline_data = json.load(f)
+
+        # 从 steps 中提取 query_pairs
+        query_extract_step = pipeline_data.get('steps', {}).get('1_query_extract', {})
+        query_pairs = query_extract_step.get('output', {}).get('query_pairs', [])
+
+        if query_pairs:
+            print(f"✅ 从 rag_pipeline_data.json 加载了 {len(query_pairs)} 个查询对")
+            for idx, qp in enumerate(query_pairs):
+                print(f"   - 查询对 {idx}: entity='{qp.get('entity', 'N/A')}'")
+            return query_pairs
+
+    # 降级:从 enhanced_results 中提取 entity 信息
+    print("⚠️  未找到 rag_pipeline_data.json,尝试从 enhanced_results 提取")
+    result_path = os.path.join(project_root, "temp", "entity_bfp_recall", "enhance_with_parent_docs.json")
+
+    if not os.path.exists(result_path):
+        return None
+
+    with open(result_path, 'r', encoding='utf-8') as f:
+        enhanced_results = json.load(f)
+
+    # 构造简化的 query_pairs
+    query_pairs = []
+    for idx, results in enumerate(enhanced_results):
+        if results:
+            # 优先使用 source_entity,回退到 query_N
+            entity = results[0].get('source_entity', f'query_{idx}')
+            query_pairs.append({
+                'entity': entity,
+                'search_keywords': [],
+                'background': ''
+            })
+
+    return query_pairs
+
+
+def test_mode_best_overall(enhanced_results, query_pairs):
+    """测试模式1: 全局最优"""
+    print("\n" + "="*80)
+    print("📊 测试模式1: best_overall (全局最优)")
+    print("="*80)
+
+    result = extract_first_result(enhanced_results, query_pairs, mode='best_overall')
+
+    print(f"\n✅ 返回结果:")
+    print(f"  - file_name: {result.get('file_name', 'N/A')}")
+    print(f"  - source_entity: {result.get('source_entity', 'N/A')}")
+    print(f"  - bfp_rerank_score: {result.get('bfp_rerank_score', 0.0):.6f}")
+    print(f"  - text_content 长度: {len(result.get('text_content', ''))}")
+    print(f"  - retrieval_status: {result.get('retrieval_status', 'N/A')}")
+
+    # 显示文本内容预览
+    text_preview = result.get('text_content', '')[:200]
+    print(f"\n  - 文本预览: {text_preview}...")
+
+    return result
+
+
+def test_mode_best_per_entity(enhanced_results, query_pairs):
+    """测试模式2: 分实体最优"""
+    print("\n" + "="*80)
+    print("📊 测试模式2: best_per_entity (分实体最优)")
+    print("="*80)
+
+    result = extract_first_result(enhanced_results, query_pairs, mode='best_per_entity')
+
+    print(f"\n✅ 返回结果:")
+    print(f"  - total_entities: {result.get('total_entities', 0)}")
+    print(f"  - retrieval_status: {result.get('retrieval_status', 'N/A')}")
+
+    entity_results = result.get('entity_results', {})
+    print(f"\n📋 各实体最优结果:")
+    for entity_name, entity_result in entity_results.items():
+        score = entity_result.get('bfp_rerank_score', 0.0)
+        file_name = entity_result.get('file_name', 'N/A')
+        text_len = len(entity_result.get('text_content', ''))
+
+        print(f"\n  🎯 实体: {entity_name}")
+        print(f"     - score: {score:.6f}")
+        print(f"     - file_name: {file_name}")
+        print(f"     - text_length: {text_len}")
+
+    return result
+
+
+def compare_with_current_result():
+    """对比当前 extract_first_result.json 的结果"""
+    print("\n" + "="*80)
+    print("📂 对比当前保存的结果")
+    print("="*80)
+
+    result_path = os.path.join(project_root, "temp", "entity_bfp_recall", "extract_first_result.json")
+
+    if not os.path.exists(result_path):
+        print("⚠️  当前没有保存的 extract_first_result.json")
+        return
+
+    with open(result_path, 'r', encoding='utf-8') as f:
+        current_result = json.load(f)
+
+    print(f"\n当前保存的结果:")
+    print(f"  - file_name: {current_result.get('file_name', 'N/A')}")
+    print(f"  - retrieval_status: {current_result.get('retrieval_status', 'N/A')}")
+    print(f"  - bfp_rerank_score: {current_result.get('bfp_rerank_score', 'N/A')}")
+    print(f"  - source_entity: {current_result.get('source_entity', 'N/A')}")
+
+
+if __name__ == "__main__":
+    print("\n" + "="*80)
+    print("🚀 开始测试 extract_first_result 的两种模式")
+    print("="*80)
+
+    # 加载数据
+    enhanced_results = load_enhanced_results()
+    if not enhanced_results:
+        sys.exit(1)
+
+    query_pairs = load_query_pairs()
+
+    # 测试模式1
+    result1 = test_mode_best_overall(enhanced_results, query_pairs)
+
+    # 测试模式2
+    result2 = test_mode_best_per_entity(enhanced_results, query_pairs)
+
+    # 对比当前结果
+    compare_with_current_result()
+
+    # 保存测试结果
+    test_output_path = os.path.join(project_root, "temp", "entity_bfp_recall", "test_extract_modes.json")
+    with open(test_output_path, 'w', encoding='utf-8') as f:
+        json.dump({
+            'best_overall': result1,
+            'best_per_entity': result2
+        }, f, ensure_ascii=False, indent=4)
+
+    print(f"\n✅ 测试完成,结果已保存到: {test_output_path}")
+    print("\n" + "="*80)
+    print("📝 建议使用哪种模式?")
+    print("="*80)
+    print("\n模式1 (best_overall):")
+    print("  - 适用场景: 只需要一个最相关的结果")
+    print("  - 优点: 返回全局最优的结果")
+    print("  - 缺点: 可能丢失其他实体的有用信息")
+    print("\n模式2 (best_per_entity):")
+    print("  - 适用场景: 需要保留所有查询对的最优结果")
+    print("  - 优点: 保留各实体的最优结果,信息更全面")
+    print("  - 缺点: 返回结构更复杂,需要后续处理")
+    print("\n💡 如果审查需要针对不同实体分别检查,建议使用 mode='best_per_entity'")
+    print("="*80 + "\n")

+ 157 - 0
views/construction_review/task_control.py

@@ -0,0 +1,157 @@
+"""
+施工方案审查任务控制接口
+提供任务终止、查询等控制功能
+"""
+
+import asyncio
+from typing import List, Optional, Dict, Any
+from pydantic import BaseModel, Field
+from fastapi import APIRouter, HTTPException, Query
+from foundation.observability.logger.loggering import server_logger as logger
+from core.base.workflow_manager import WorkflowManager
+
+task_control_router = APIRouter(prefix="/sgsc", tags=["任务控制"])
+
+# 初始化工作流管理器
+workflow_manager = WorkflowManager()
+
+
+class TerminateTaskRequest(BaseModel):
+    """终止任务请求模型"""
+    callback_task_id: str = Field(..., description="任务回调ID")
+    operator: str = Field(..., description="操作人(用户ID)")
+
+    class Config:
+        extra = "forbid"
+
+
+class TerminateTaskResponse(BaseModel):
+    """终止任务响应模型"""
+    success: bool = Field(..., description="操作是否成功")
+    message: str = Field(..., description="操作消息")
+    task_info: Optional[Dict[str, Any]] = Field(None, description="任务信息")
+
+
+class TaskListResponse(BaseModel):
+    """任务列表响应模型"""
+    total: int = Field(..., description="活跃任务总数")
+    tasks: List[Dict[str, Any]] = Field(..., description="任务列表")
+
+
+class TaskInfoResponse(BaseModel):
+    """任务信息响应模型"""
+    exists: bool = Field(..., description="任务是否存在")
+    task_info: Optional[Dict[str, Any]] = Field(None, description="任务信息")
+
+
+@task_control_router.post("/task/terminate", response_model=TerminateTaskResponse)
+async def terminate_task(request: TerminateTaskRequest):
+    """
+    终止正在执行的审查任务
+
+    Args:
+        request: 终止任务请求
+
+    Returns:
+        TerminateTaskResponse: 终止结果
+
+    Note:
+        - 终止信号通过 Redis 存储,支持跨进程
+        - 任务会在当前节点完成后停止
+        - 前端可以通过 SSE 接收到 terminated 状态
+    """
+    try:
+        logger.info(f"收到任务终止请求: {request.callback_task_id}, 操作人: {request.operator}")
+
+        # 设置终止信号
+        result = await workflow_manager.set_terminate_signal(
+            callback_task_id=request.callback_task_id,
+            operator=request.operator
+        )
+
+        return TerminateTaskResponse(
+            success=result["success"],
+            message=result["message"],
+            task_info=result.get("task_info")
+        )
+
+    except Exception as e:
+        logger.error(f"终止任务失败: {str(e)}", exc_info=True)
+        raise HTTPException(status_code=500, detail=f"终止任务失败: {str(e)}")
+
+
+@task_control_router.get("/task/list", response_model=TaskListResponse)
+async def get_active_tasks():
+    """
+    获取当前活跃任务列表
+
+    Returns:
+        TaskListResponse: 活跃任务列表
+
+    Note:
+        只返回 status=processing 的任务
+    """
+    try:
+        logger.info("查询活跃任务列表")
+
+        # 获取活跃任务
+        active_tasks = await workflow_manager.get_active_tasks()
+
+        return TaskListResponse(
+            total=len(active_tasks),
+            tasks=active_tasks
+        )
+
+    except Exception as e:
+        logger.error(f"获取活跃任务列表失败: {str(e)}", exc_info=True)
+        raise HTTPException(status_code=500, detail=f"获取任务列表失败: {str(e)}")
+
+
+@task_control_router.get("/task/info", response_model=TaskInfoResponse)
+async def get_task_info(
+    callback_task_id: str = Query(..., description="任务回调ID")
+):
+    """
+    获取指定任务的详细信息
+
+    Args:
+        callback_task_id: 任务回调ID
+
+    Returns:
+        TaskInfoResponse: 任务信息
+
+    Note:
+        如果任务不存在或已完成,exists 返回 False
+    """
+    try:
+        logger.info(f"查询任务信息: {callback_task_id}")
+
+        # 获取任务信息
+        task_info = await workflow_manager.get_task_info(callback_task_id)
+
+        if task_info:
+            return TaskInfoResponse(
+                exists=True,
+                task_info=task_info
+            )
+        else:
+            return TaskInfoResponse(
+                exists=False,
+                task_info=None
+            )
+
+    except Exception as e:
+        logger.error(f"获取任务信息失败: {str(e)}", exc_info=True)
+        raise HTTPException(status_code=500, detail=f"获取任务信息失败: {str(e)}")
+
+
+# ==================== 测试接口 ====================
+
+@task_control_router.get("/task/health")
+async def health_check():
+    """健康检查接口"""
+    return {
+        "status": "ok",
+        "service": "task_control",
+        "message": "任务控制服务运行正常"
+    }