Эх сурвалжийг харах

v0.0.5-添加 core_functions 和 types 目录及内容

WangXuMing 3 сар өмнө
parent
commit
0b2a6ffe03

+ 15 - 0
core/construction_review/workflows/core_functions/__init__.py

@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+'''
+@Project   : lq-agent-api
+@File      : __init__.py
+@IDE       : VsCode
+@Date      : 2025-01-08
+
+核心功能模块 - 包含工作流的核心业务逻辑类
+'''
+
+from .ai_review_core_fun import AIReviewCoreFun
+
+__all__ = ['AIReviewCoreFun']

+ 1108 - 0
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -0,0 +1,1108 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+'''
+@Project   : lq-agent-api
+@File      : ai_review_core_fun.py
+@IDE       : VsCode
+@Date      : 2025-01-08
+
+AI审查核心功能类 - 负责具体的审查逻辑和数据处理
+
+📋 方法总览 (Method Overview)
+
+⚙️ AIReviewCoreFun 核心功能类:
+├── _execute_concurrent_reviews()   # 执行并发审查
+├── _prepare_review_units()         # 准备审查单元数据
+├── _review_single_unit()           # 审查单个单元的核心业务逻辑
+├── _send_start_review_progress()   # 发送开始审查的进度更新
+├── _send_unit_review_progress()    # 发送单元审查详细信息
+├── _send_unit_overall_progress()   # 发送单元完成进度更新
+├── _filter_review_units()          # 根据配置筛选要审查的单元
+├── _remove_basis_chunks()          # 筛除编制依据章节的chunks
+├── _designation_test_chunks()      # 筛选设计测试章节
+├── _replace_review_suffix()        # 将列表中字符串的下划线后内容替换为映射表中的对应值
+├── _process_basis_chapter()        # 处理编制依据章节(拼接后一次性审查)
+├── _process_normal_chapter()       # 处理普通章节(逐块审查)
+├── _execute_chunk_methods()        # 并发执行单个块的所有审查方法
+├── _execute_single_review()        # 执行单个块的审查任务
+├── _execute_technical_review()     # 执行技术性审查(参数/非参数合规性检查)
+├── _group_chunks_by_chapter()      # 按章节代码对chunks进行分组
+├── _extract_issues_from_result()   # 从审查结果中提取issues列表
+├── _format_chunk_results_to_issues() # 格式化单个块的审查结果为issues列表
+└── _dummy_review_task()            # 空任务(方法不存在时使用)
+'''
+
+import asyncio
+import random
+from typing import Dict, Union, List, Any, Optional
+from dataclasses import dataclass
+from langchain_core.messages import AIMessage
+
+from foundation.observability.logger.loggering import server_logger as logger
+from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
+from core.base.task_models import TaskFileInfo
+from ...component.reviewers.utils.inter_tool import InterTool
+from ..types import AIReviewState
+
+# 常量定义
+REVIEW_TIMEOUT = 120  # 单个审查任务超时时间(秒)
+
+
+@dataclass
+class ReviewResult:
+    """审查结果"""
+    unit_index: int
+    unit_content: Dict[str, Any]
+    basic_compliance: Dict[str, Any]
+    technical_compliance: Dict[str, Any]
+    rag_enhanced: Dict[str, Any]
+    overall_risk: str
+
+
+class AIReviewCoreFun:
+    """AI审查核心功能类 - 负责具体的审查逻辑和数据处理"""
+
+    def __init__(self, task_file_info: TaskFileInfo, ai_review_engine, max_review_units: int = None, review_mode: str = "all"):
+        """
+        初始化AI审查核心功能类
+
+        Args:
+            task_file_info: TaskFileInfo 实例,包含任务相关信息
+            ai_review_engine: AI审查引擎实例
+            max_review_units: 最大审查单元数量(None表示审查所有)
+            review_mode: 审查模式 ("all"=全部, "first"=前N个, "random"=随机N个)
+        """
+
+        self.task_info = task_file_info  # ✅ 保存 TaskFileInfo 实例
+        self.ai_review_engine = ai_review_engine
+        self.max_review_units = max_review_units
+        self.review_mode = review_mode
+        self.message_lock = asyncio.Lock()
+        self.inter_tool = InterTool()
+
+        # ✅ 便捷访问属性
+        self.file_id = task_file_info.file_id
+        self.callback_task_id = task_file_info.callback_task_id
+        self.user_id = task_file_info.user_id
+        self.review_config = task_file_info.review_config
+        self.project_plan_type = task_file_info.project_plan_type
+        
+        self.max_concurrent = 20
+
+        # 延迟导入 WorkflowManager(避免循环导入)
+        from core.base.workflow_manager import WorkflowManager
+        self.workflow_manager = WorkflowManager()
+
+    async def _process_basis_chapter(
+        self,
+        chapter_code: str,
+        chapter_content: List[Dict[str, Any]],
+        func_names: List[str],
+        state: "AIReviewState",
+        all_issues: List[Dict],
+        completed_chunks: int,
+        total_chunks: int
+    ) -> None:
+        """
+        处理编制依据章节(basis)
+
+        特点:
+        - 收集所有chunk的content
+        - 拼接后一次性审查
+        - 整个章节推送一次进度
+
+        Args:
+            chapter_code: 章节代码(应该是"basis")
+            chapter_content: 章节的所有chunk
+            func_names: 需要执行的审查方法
+            state: AI审查状态
+            all_issues: 累积的issues列表
+            completed_chunks: 已完成的块数
+            total_chunks: 总块数
+        """
+        logger.info(f"🔍 处理编制依据章节: {chapter_code}, 共 {len(chapter_content)} 个chunk")
+
+        # 1. 收集所有chunk的content
+        basis_contents = []
+        for chunk in chapter_content:
+            content = chunk.get("content", "")
+            if content:
+                basis_contents.append(content)
+
+        # 2. 拼接content
+        if not basis_contents:
+            logger.warning(f"编制依据章节 {chapter_code} 没有内容,跳过")
+            return
+
+        combined_content = "\n\n".join(basis_contents)
+        logger.info(f"编制依据内容拼接完成,总长度: {len(combined_content)} 字符")
+
+        # 3. 执行需要的审查方法
+        chapter_issues = []
+
+        for func_name in func_names:
+            if func_name == "reference_basis_reviewer":
+                # 编制依据审查
+                logger.info(f"执行编制依据审查: {chapter_code}")
+                try:
+                    review_data = {
+                        "content": combined_content,
+                        "max_concurrent": self.max_concurrent
+                    }
+                    result = await self.ai_review_engine.reference_basis_reviewer(
+                        review_data=review_data,
+                        trace_id=f"{state['callback_task_id']}_{chapter_code}",
+                        state=state,
+                        stage_name=f"{chapter_code}_编制依据审查"
+                    )
+
+                    # 提取issues
+                    issues = self._extract_issues_from_result(result)
+                    chapter_issues.extend(issues)
+
+                except Exception as e:
+                    logger.error(f"编制依据审查失败: {str(e)}", exc_info=True)
+
+            elif func_name == "timeliness_basis_reviewer":
+                # 时效性审查
+                logger.info(f"执行时效性审查: {chapter_code}")
+                try:
+                    review_data = {
+                        "content": combined_content,
+                        "max_concurrent": self.max_concurrent
+                    }
+                    result = await self.ai_review_engine.timeliness_basis_reviewer(
+                        review_data=review_data,
+                        trace_id=f"{state['callback_task_id']}_{chapter_code}",
+                        state=state,
+                        stage_name=f"{chapter_code}_时效性审查"
+                    )
+
+                    # 提取issues
+                    issues = self._extract_issues_from_result(result)
+                    chapter_issues.extend(issues)
+
+                except Exception as e:
+                    logger.error(f"时效性审查失败: {str(e)}", exc_info=True)
+
+            else:
+                # 其他方法:不支持对basis章节的操作
+                logger.warning(f"方法 {func_name} 不支持编制依据章节模式,跳过")
+
+        # 4. 累积到总issues
+        all_issues.extend(chapter_issues)
+
+        # 5. 推送章节级别的进度
+        if state.get("progress_manager") and chapter_content:
+            # 计算当前进度
+            current = int(((completed_chunks + len(chapter_content)) / total_chunks) * 100)
+
+            # 推送进度
+            await state["progress_manager"].update_stage_progress(
+                callback_task_id=state["callback_task_id"],
+                stage_name=f"AI审查:{chapter_code}",
+                current=current,
+                status="completed",
+                message=f"编制依据章节审查完成,共 {len(chapter_content)} 个块",
+                event_type="processing"
+            )
+
+    async def _process_normal_chapter(
+        self,
+        chapter_code: str,
+        chapter_content: List[Dict[str, Any]],
+        func_names: List[str],
+        state: AIReviewState,
+        all_issues: List[Dict]
+    ) -> int:
+        """
+        处理普通章节(非basis)
+
+        特点:
+        - 逐块审查
+        - 每块完成后立即推送进度
+
+        Args:
+            chapter_code: 章节代码
+            chapter_content: 章节的所有chunk
+            func_names: 需要执行的审查方法
+            state: AI审查状态
+            all_issues: 累积的issues列表
+
+        Returns:
+            int: 处理的块数量
+        """
+        logger.info(f"📝 处理普通章节: {chapter_code}, 共 {len(chapter_content)} 个chunk")
+        total_chunks = len(chapter_content)
+
+        # 按块串行遍历
+        for chunk_index, chunk in enumerate(chapter_content):
+            chunk_label = chunk.get("section_label", f"chunk_{chunk_index}")
+            logger.info(f"  📄 处理块 {chunk_index+1}/{total_chunks}: {chunk_label}")
+
+            # 终止信号检查(块级别)
+            if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
+                logger.warning("块审查检测到终止信号")
+                return chunk_index  # 返回已处理的块数
+
+            # 并发执行当前块的所有审查方法
+            chunk_results = await self._execute_chunk_methods(
+                chapter_code, chunk, chunk_index, func_names, state
+            )
+
+            # 格式化当前块的结果为issues(使用 check_item 模式)
+            chunk_page = chunk.get('page', '')
+            review_location_label = f"第{chunk_page}页:{chunk_label}"
+            issues = self.inter_tool._format_review_results_to_issues(
+                callback_task_id=state["callback_task_id"],
+                unit_index=chunk_index,
+                review_location_label=review_location_label,
+                chapter_code=chapter_code,
+                unit_content=chunk,
+                basic_result={},  # check_item 模式下不需要
+                technical_result={},  # check_item 模式下不需要
+                merged_results=chunk_results  # 直接使用合并结果
+            )
+
+            # 推送当前块的进度
+            current = int(((chunk_index + 1) / total_chunks) * 100)
+            await self._send_unit_review_progress(
+                state, chunk_index, total_chunks, chunk_label, issues, current
+            )
+
+            # 累积issues
+            if issues:
+                all_issues.extend(issues)
+
+        return total_chunks
+
+    def _extract_issues_from_result(self, result: Any) -> List[Dict]:
+        """
+        从审查结果中提取issues列表
+
+        Args:
+            result: 审查结果(可能是ReviewResult、dict、entity_based结果等)
+
+        Returns:
+            List[Dict]: issues列表
+        """
+        issues = []
+
+        if result is None:
+            return issues
+
+        # ReviewResult对象
+        if hasattr(result, 'details'):
+            issues.append(result.details)
+
+        # 字典结果
+        elif isinstance(result, dict):
+            # 检查是否是entity_based模式
+            if result.get('review_mode') == 'entity_based':
+                for entity_item in result.get('entity_review_results', []):
+                    entity_result = entity_item.get('result')
+                    if entity_result:
+                        issues.extend(self._extract_issues_from_result(entity_result))
+            else:
+                # 普通字典
+                issues.append(result)
+
+        return issues
+
+    def _format_chunk_results_to_issues(
+        self,
+        state: AIReviewState,
+        chunk_index: int,
+        chunk: Dict[str, Any],
+        chapter_code: str,
+        chunk_results: Dict[str, Any]
+    ) -> List[Dict]:
+        """
+        格式化单个块的所有审查结果为issues列表
+
+        Args:
+            state: AI审查状态
+            chunk_index: 块索引
+            chunk: 块内容
+            chapter_code: 章节代码
+            chunk_results: 块审查结果字典 {func_name: result}
+
+        Returns:
+            List[Dict]: issues列表
+        """
+        issues = []
+
+        for func_name, result in chunk_results.items():
+            if result is None:
+                continue
+
+            # 处理错误结果
+            if isinstance(result, dict) and "error" in result:
+                logger.warning(f"审查方法 {func_name} 返回错误: {result['error']}")
+                continue
+
+            # 提取issues
+            extracted = self._extract_issues_from_result(result)
+            issues.extend(extracted)
+
+        return issues
+
+    def _group_chunks_by_chapter(self, chunks: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
+        """
+        按章节代码对chunks进行分组
+
+        Args:
+            chunks: chunks列表
+
+        Returns:
+            Dict[str, List[Dict[str, Any]]]: 按章节分组的chunks
+        """
+        chapter_map = {}
+
+        for chunk in chunks:
+            chapter_code = chunk.get("chapter_classification", "unknown")
+
+            if chapter_code not in chapter_map:
+                chapter_map[chapter_code] = []
+
+            chapter_map[chapter_code].append(chunk)
+
+        return chapter_map
+
+    async def _execute_chunk_methods(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_names: List[str], state: AIReviewState) -> Dict[str, Any]:
+        """
+        并发执行单个块的所有审查方法
+
+        Args:
+            chapter_code: 章节代码
+            chunk: 单个块内容
+            chunk_index: 块索引
+            func_names: 需要执行的函数名列表
+            state: AI审查状态
+
+        Returns:
+            Dict[str, Any]: 该块所有方法的执行结果
+        """
+        results = {}
+        semaphore = asyncio.Semaphore(5)  # 单个块内限制并发数为5
+        rag_enhanced_content = None  # 初始化变量,避免作用域错误
+        if 'check_parameter_compliance' in func_names or 'check_non_parameter_compliance' in func_names:
+                logger.debug("开始执行RAG检索增强")
+                rag_enhanced_content = self.ai_review_engine.rag_enhanced_check(chunk.get('content', ''))
+        async def execute_with_semaphore(func_name):
+            async with semaphore:
+                try:
+                    # 创建并执行单个审查任务
+                    result = await self._execute_single_review(chapter_code, chunk, chunk_index, func_name, state,rag_enhanced_content)
+                    return func_name, result
+                except Exception as e:
+                    logger.error(f"审查任务执行失败 [{chapter_code}.chunk{chunk_index}.{func_name}]: {str(e)}")
+                    return func_name, {"error": str(e)}
+
+        # 创建并发任务
+        async_tasks = [execute_with_semaphore(func_name) for func_name in func_names]
+
+        # 等待当前块所有方法完成
+        completed_results = await asyncio.gather(*async_tasks, return_exceptions=True)
+
+        # 整理结果
+        for result in completed_results:
+            if isinstance(result, Exception):
+                logger.error(f"任务异常: {str(result)}")
+                continue
+
+            if result and len(result) == 2:
+                func_name, task_result = result
+                results[func_name] = task_result
+
+        return results
+
+    async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None) -> Any:
+        """
+        执行单个块的审查任务
+
+        Args:
+            chapter_code: 章节代码
+            chunk: 单个块内容
+            chunk_index: 块索引
+            func_name: 函数名称
+            state: AI审查状态
+
+        Returns:
+            Any: 审查结果
+        """
+        # 从ai_review_engine获取对应的方法
+        if not hasattr(self.ai_review_engine, func_name):
+            logger.warning(f"AIReviewEngine中未找到方法: {func_name}")
+            return await self._dummy_review_task(chapter_code, func_name)
+
+        method = getattr(self.ai_review_engine, func_name)
+
+        # 基础参数
+        trace_id = f"{state['callback_task_id']}_{chapter_code}_chunk{chunk_index}"
+        stage_name = f"{chapter_code}_{func_name}"
+
+        # 获取块内容
+        review_content = chunk.get("content", "")
+
+        logger.debug(f"执行审查: {trace_id} -> {func_name}")
+
+        # 根据func_name构建对应的参数并调用
+        if func_name == "sensitive_word_check":
+            result = await method(trace_id, review_content, state, stage_name)
+
+        elif func_name == "check_semantic_logic":
+            result = await method(trace_id, review_content, state, stage_name)
+
+        elif func_name == "check_sensitive":
+            result = await method(trace_id, review_content, state, stage_name)
+
+        elif func_name == "check_completeness":
+            result = await method(trace_id, chunk, state, stage_name)
+
+        elif func_name == "check_non_parameter_compliance":
+            # 技术审查方法需要从 RAG 检索结果中获取 references
+            result = await self._execute_technical_review(
+                method, trace_id, review_content, chunk, state, stage_name, rag_enhanced_content, func_name
+            )
+
+        elif func_name == "check_parameter_compliance":
+            # 技术审查方法需要从 RAG 检索结果中获取 references
+            result = await self._execute_technical_review(
+                method, trace_id, review_content, chunk, state, stage_name, rag_enhanced_content, func_name
+            )
+
+        # ⚠️ 以下三个特殊方法不在块级别处理,由主流程统一管理
+        elif func_name in ["outline_check", "timeliness_basis_reviewer", "reference_basis_reviewer"]:
+            logger.warning(f"方法 {func_name} 不应在块级别调用,已在主流程中处理")
+            return None
+
+        else:
+            logger.warning(f"未知的审查方法: {func_name},使用默认调用方式")
+            return {
+            "current_stage": "ai_review_check_item",
+            "error_message": f"未知的审查方法: {func_name},使用默认调用方式",
+            "status": "failed",
+            "messages": [AIMessage(content=f"未知的审查方法: {func_name},使用默认调用方式: {state['callback_task_id']}")]
+            }
+
+        return result
+
+    async def _execute_technical_review(
+        self, method, trace_id: str, review_content: str, chunk: Dict[str, Any],
+        state: AIReviewState, stage_name: str, rag_enhanced_content: dict, func_name: str
+    ) -> Any:
+        """
+        执行技术性审查(参数/非参数合规性检查),支持基于 RAG 查询对的多任务并发
+
+        Args:
+            method: 审查方法
+            trace_id: 追踪ID
+            review_content: 审查内容
+            chunk: 块内容
+            state: AI审查状态
+            stage_name: 阶段名称
+            rag_enhanced_content: RAG增强检索结果
+            func_name: 函数名称
+
+        Returns:
+            Any: 审查结果
+        """
+        # 获取查询对列表
+        entity_results = []
+        if rag_enhanced_content:
+            entity_results = rag_enhanced_content.get('entity_results', [])
+
+        # 如果没有查询对,返回空结果
+        if not entity_results:
+            logger.warning(f"[{func_name}] 没有RAG检索结果,返回空结果")
+            return self.ai_review_engine._process_review_result(None)
+
+        logger.info(f"[{func_name}] 开始处理 {len(entity_results)} 个查询对的审查")
+
+        # 为每个查询对创建审查任务
+        review_tasks = []
+        for idx, entity_item in enumerate(entity_results):
+            combined_query = entity_item.get('combined_query', '')
+            entity = entity_item.get('entity', f'entity_{idx}')
+            text_content = entity_item.get('text_content', '')
+            file_name = entity_item.get('file_name', '')
+
+            logger.info(f"[{func_name}] 为查询对 {idx} ({entity}) 创建审查任务")
+
+            # 创建审查任务
+            task = asyncio.create_task(
+                method(
+                    trace_id_idx=f"{trace_id}_entity_{idx}",
+                    review_content=review_content,
+                    review_references=text_content,
+                    reference_source=file_name,
+                    state=state,
+                    stage_name=f"{stage_name}_entity_{idx}",
+                    entity_query=combined_query
+                )
+            )
+            review_tasks.append((idx, entity, task))
+
+        # 并发执行所有查询对的审查任务
+        results = []
+        if review_tasks:
+            done, pending = await asyncio.wait(
+                [task for _, _, task in review_tasks],
+                timeout=150  # 与 technical_compliance_check 中的超时一致
+            )
+
+            # 取消未完成的任务
+            for task in pending:
+                task.cancel()
+                logger.warning(f"[{func_name}] 查询对任务超时,已取消")
+
+            # 收集结果
+            for idx, entity, task in review_tasks:
+                if task in done:
+                    try:
+                        result = task.result()
+                        results.append({
+                            'entity': entity,
+                            'entity_index': idx,
+                            'result': result
+                        })
+                        logger.info(f"[{func_name}] 查询对 {idx} ({entity}) 审查完成")
+                    except Exception as e:
+                        logger.error(f"[{func_name}] 查询对 {idx} ({entity}) 审查失败: {str(e)}")
+                        results.append({
+                            'entity': entity,
+                            'entity_index': idx,
+                            'error': str(e)
+                        })
+
+        # 返回聚合结果(与 technical_compliance_check 的格式一致)
+        return {
+            'review_mode': 'entity_based',
+            'func_name': func_name,
+            'total_entities': len(entity_results),
+            'entity_review_results': results
+        }
+
+    async def _dummy_review_task(self, chapter_code: str, func_name: str):
+        """
+        空任务,当找不到对应方法时使用
+        """
+        logger.warning(f"执行空任务: {chapter_code}.{func_name}")
+        return {"status": "skipped", "message": f"方法 {func_name} 不存在"}
+
+    async def _execute_concurrent_reviews(self, review_chunks: List[Dict[str, Any]],
+                                          total_units: int, state: AIReviewState,
+                                          check_terminate: bool = False) -> List[Dict[str, Any]]:
+        """
+        执行并发审查
+
+        Args:
+            review_chunks: 审查单元列表
+            total_units: 总单元数
+            state: AI审查状态
+            check_terminate: 是否检查终止信号(默认False)
+
+        Returns:
+            List[Dict[str, Any]]: 审查结果列表(issues格式)
+        """
+
+        try:
+            semaphore = asyncio.Semaphore(3)  # 并发审查数
+
+            async def process_unit_and_notify(unit_index, unit_content):
+                """处理单个单元,完成后立即推送通知"""
+                async with semaphore:
+                    # ⚠️ 检查终止信号(每个单元审查前)
+                    if check_terminate:
+                        if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
+                            logger.warning(f"并发审查检测到终止信号,停止后续审查: {state['callback_task_id']}")
+                            return None  # 返回 None 表示被终止
+
+                    # 执行单个单元审查
+                    result = await self._review_single_unit(unit_content, unit_index, total_units, state)
+                    
+                    # 审查完成后立即推送通知
+                    if result.overall_risk != "error":
+                        section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
+                        chapter_code = unit_content.get('chapter_classification', '')
+                        logger.info(f"section_label:  {section_label}")
+                        # 格式化issues以获取问题数量
+                        issues = self.inter_tool._format_review_results_to_issues(
+                            state["callback_task_id"],
+                            unit_index,
+                            f"第{unit_content.get('page', '')}页:{section_label}",
+                            chapter_code,
+                            unit_content,
+                            result.basic_compliance,
+                            result.technical_compliance
+                        )
+
+                        current = int(((unit_index + 1) / total_units) * 100)
+
+                        # 立即发送单元审查详情(包含unit_review和processing_flag事件)
+                        await self._send_unit_review_progress(state, unit_index, total_units, section_label, issues, current)
+                        return issues
+                    else:
+                        logger.error(f"执行单个单元审查失败: {str(result.error_message)}")
+                    return None
+
+
+            # 创建并发任务
+            tasks = [
+                asyncio.create_task(process_unit_and_notify(i, unit_content))
+                for i, unit_content in enumerate(review_chunks)
+            ]
+
+            # 等待所有任务完成
+            all_results = await asyncio.gather(*tasks)
+
+            # 过滤有效结果(issues格式)
+            successful_results = [issues for issues in all_results if issues and isinstance(issues, list)]
+            return successful_results
+
+        except Exception as e:
+            logger.error(f"执行并发审查失败: {str(e)}")
+            return []
+
+    async def _prepare_review_units(self, state: AIReviewState, test_designation_chunk_flag) -> tuple:
+        """准备审查单元数据 (增加清理旧进度缓存)"""
+        try:
+            # 筛选要审查的单元
+            all_chunks = state['structured_content']['chunks']
+            # 筛除编制依据章节
+            clearned_chunks = self._remove_basis_chunks(all_chunks)
+
+            # 判断是否需要筛选指定测试章节
+            if test_designation_chunk_flag is not None:
+                # 用户指定了测试章节,进行筛选
+                logger.info(f"开始筛选指定测试章节: {test_designation_chunk_flag}")
+                designation_test_chunk = self._designation_test_chunks(clearned_chunks, test_designation_chunk_flag)
+
+                if not designation_test_chunk:
+                    # 指定了测试章节但未找到,返回空列表
+                    logger.warning(f"未找到包含关键字 '{test_designation_chunk_flag}' 的测试章节,建议去除前后符号(如《》())使用简洁关键词")
+                    review_chunks = []
+                else:
+                    # 找到指定测试章节
+                    logger.info(f"找到 {len(designation_test_chunk)} 个指定测试章节")
+                    review_chunks = designation_test_chunk
+            else:
+                # 未指定测试章节,使用正常筛选流程
+                logger.info(f"未指定测试章节,使用正常筛选流程")
+                review_chunks = self._filter_review_units(clearned_chunks)
+
+            total_units = len(review_chunks)
+            logger.info(f"最终审查单元数量: {total_units}")
+
+            # 【修复 3】: 任务开始前,清理 Redis 中的旧计数,防止进度条计算错误
+            try:
+                task_id = state.get("callback_task_id", "")
+                if task_id:
+                    redis_client = await RedisConnectionFactory.get_connection()
+                    completed_key = f"ai_review:overall_task_progress:{task_id}:completed"
+                    await redis_client.delete(completed_key)
+                    logger.info(f"已清理旧进度缓存: {completed_key}")
+            except Exception as e:
+                logger.warning(f"清理进度缓存失败 (不影响主流程): {str(e)}")
+            return review_chunks, total_units
+        except Exception as e:
+            logger.error(f"准备审查单元失败: {str(e)}")
+            raise
+
+    def _remove_basis_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """
+        筛除编制依据章节的chunks
+
+        Args:
+            chunks: 所有章节chunks列表
+
+        Returns:
+            List[Dict[str, Any]]: 筛除编制依据章节后的chunks列表
+
+        Note:
+            根据 chapter_classification 字段筛选,排除值为 "basis" 的章节
+        """
+        try:
+            filtered_chunks = []
+            removed_count = 0
+            logger.info(f"开始筛除编制依据章节")
+            for chunk in chunks:
+                # 检查章节分类字段
+                chapter_classification = chunk.get('chapter_classification', '')
+                
+                # 保留非编制依据章节
+                if chapter_classification != 'basis':
+                    logger.info(f"保留非编制依据章节,当前章节: {chapter_classification}")
+                    filtered_chunks.append(chunk)
+                else:
+                    removed_count += 1
+                    logger.debug(f"筛除编制依据章节: {chunk.get('section_label', '未知章节')}")
+
+            logger.info(f"编制依据章节筛除完成: 共筛除 {removed_count} 个章节, 保留 {len(filtered_chunks)} 个章节")
+
+            return filtered_chunks
+
+        except Exception as e:
+            logger.error(f"筛除编制依据章节失败: {str(e)}")
+            # 出错时返回原始列表
+            return chunks
+
+    def _designation_test_chunks(self, chunks: List[Dict[str, Any]],test_designation_chunk_flag:str) -> List[Dict[str, Any]]:
+        """筛选设计测试章节
+        
+        Args:
+            chunks: 所有章节chunks列表
+
+        Returns:
+            List[Dict[str, Any]]: 筛选后的chunks列表
+
+        Note:
+            根据 chapter_classification 字段筛选,排除值为 "designation_test" 的章节
+
+        Raises:
+            Exception: 筛选失败
+        
+        """
+        try: 
+            designation_chunks = []
+            filtered_count = 0
+
+            logger.info(f"开始筛选设计测试章节")
+            for chunk in chunks:
+                content = chunk.get('content', '')
+                section_label = chunk.get('section_label', '未知章节')
+                logger.info(f"正在处理章节: {section_label}")
+                if test_designation_chunk_flag in content or test_designation_chunk_flag in section_label:
+                    logger.info(f"已命中指定测试章节: {chunk.get('section_label', '未知章节')}")
+                    designation_chunks.append(chunk)
+                else:
+                    filtered_count += 1
+                    logger.debug(f"跳过章节: {chunk.get('section_label', '未知章节')}")
+                if not designation_chunks:
+                    logger.info(f"未找到指定测试章节,请修改关键字尝试!")
+
+            return designation_chunks
+                     
+        except Exception as e:
+            logger.error(f"筛选设计测试章节失败: {str(e)}")
+            # 出错时返回原始列表
+            return chunks
+
+    async def _review_single_unit(self, unit_content: Dict[str, Any], unit_index: int,
+                                  total_units: int, state: AIReviewState) -> ReviewResult:
+        """
+        审查单个单元的核心业务逻辑
+
+        Args:
+            unit_content: 单元内容
+            unit_index: 单元索引
+            total_units: 总单元数
+            state: AI审查状态
+
+        Returns:
+            ReviewResult: 单元审查结果
+        """
+        try:
+
+            # 构建Trace ID
+            trace_id_idx = f"({state['callback_task_id']}-{unit_index})"
+
+            # 获取section_label用于stage_name
+            section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
+            page = unit_content.get('page', '')
+            stage_name = f"AI审查:{section_label}"
+            #logger.info(f"test_review_location_label:{trace_id_idx}: {review_location_label}")
+            review_tasks = [
+                asyncio.create_task(
+                    asyncio.wait_for(
+                        self.ai_review_engine.basic_compliance_check(trace_id_idx, unit_content, state,stage_name),
+                        timeout=REVIEW_TIMEOUT
+                    )
+                ),
+                asyncio.create_task(
+                    asyncio.wait_for(
+                        self.ai_review_engine.technical_compliance_check(trace_id_idx, unit_content, state,stage_name),
+                        timeout=REVIEW_TIMEOUT
+                    )
+                ),
+            ]
+
+            # 使用 asyncio.gather 保证结果顺序,并提供超时控制
+            # 整体超时 = 两个任务的超时之和 + 缓冲时间
+            total_timeout = REVIEW_TIMEOUT * len(review_tasks) + 10
+
+            try:
+                # 使用 gather 确保结果顺序与任务顺序一致
+                review_results = await asyncio.wait_for(
+                    asyncio.gather(*review_tasks, return_exceptions=True),
+                    timeout=total_timeout
+                )
+            except asyncio.TimeoutError:
+                logger.error(f"[工作流] 审查任务整体超时: trace_id={trace_id_idx}")
+                # 填充超时结果
+                review_results = [
+                    Exception(f"基础审查超时({REVIEW_TIMEOUT}秒)"),
+                    Exception(f"技术审查超时({REVIEW_TIMEOUT}秒)")
+                ]
+
+            # 处理异常结果(gather 已经将异常作为结果返回)
+            basic_result = review_results[0] if not isinstance(review_results[0], Exception) else {"error": str(review_results[0])}
+            technical_result = review_results[1] if not isinstance(review_results[1], Exception) else {"error": str(review_results[1])}
+
+            # RAG检查已注释,提供空结果
+            rag_result = {"error": "RAG check disabled"}
+
+            # 计算总体风险等级
+            inter_tool = InterTool()
+            overall_risk = inter_tool._calculate_overall_risk(basic_result, technical_result, rag_result)
+
+            
+            return ReviewResult(
+                unit_index=unit_index,
+                unit_content=unit_content,
+                basic_compliance=basic_result,
+                technical_compliance=technical_result,
+                rag_enhanced=rag_result,
+                overall_risk=overall_risk
+            )
+
+        except asyncio.TimeoutError:
+            logger.error(f"审查单元 {unit_index} 超时")
+            return ReviewResult(
+                unit_index=unit_index,
+                unit_content=unit_content,
+                basic_compliance={"error": f"审查超时({REVIEW_TIMEOUT}秒)"},
+                technical_compliance={"error": f"审查超时({REVIEW_TIMEOUT}秒)"},
+                rag_enhanced={"error": f"审查超时({REVIEW_TIMEOUT}秒)"},
+                overall_risk="error"
+            )
+        except Exception as e:
+            logger.error(f"审查单元 {unit_index} 失败: {str(e)}")
+            return ReviewResult(
+                unit_index=unit_index,
+                unit_content=unit_content,
+                basic_compliance={"error": str(e)},
+                technical_compliance={"error": str(e)},
+                rag_enhanced={"error": str(e)},
+                overall_risk="error"
+            )
+
+    async def _send_start_review_progress(self, state: AIReviewState, total_units: int = None, review_type : str =None) -> None:
+        """
+        发送开始审查的进度更新
+
+        Args:
+            state: AI审查状态
+            total_units: 总审查单元数
+        """
+
+
+        try:
+            
+
+            if state["progress_manager"]:
+                if  review_type == "outline":
+                    await state["progress_manager"].update_stage_progress(
+                        callback_task_id=state["callback_task_id"],
+                        stage_name="AI审查",
+                        current=0,
+                        status="processing",
+                        message=f"开始大纲审查",
+                        event_type="processing"
+                    )
+                # elif  review_type is "prpe_basis":
+                #     await state["progress_manager"].update_stage_progress(
+                #         callback_task_id=state["callback_task_id"],
+                #         stage_name="AI审查",
+                #         current=0,
+                #         total=total_units,
+                #         status="processing",
+                #         message=f"开始编制依据审查",
+                #         event_type="processing"
+                #     )
+                else:
+                    await state["progress_manager"].update_stage_progress(
+                        callback_task_id=state["callback_task_id"],
+                        stage_name="AI审查",
+                        current=0,
+                        status="processing",
+                        message=f"开始核心审查,共 {total_units} 个审查单元",
+                        event_type="processing"
+                    )
+        except Exception as e:
+            logger.warning(f"发送开始进度更新失败: {str(e)}")
+
+    async def _send_unit_review_progress(self, state: AIReviewState, unit_index: int,
+                                            total_units: int, section_label: str,
+                                            issues: List[Dict], current: int) -> None:
+        """
+        发送单元审查详细信息 - 强制串行并统一进度值
+        """
+        async with self.message_lock:
+            try:
+                # 1. 计算问题数量
+                issues_count = 0
+                if isinstance(issues, list) and issues:
+                    issues_count = sum(
+                        1 for issue in issues
+                        for issue_data in issue.values()
+                        for review_item in issue_data.get("review_lists", [])
+                        if review_item.get("exist_issue", False)
+                    )
+
+                real_current = await self._send_unit_overall_progress(
+                    state, unit_index, total_units, section_label, issues_count
+                )
+                
+
+                final_current = real_current if real_current is not None else current
+
+                await asyncio.sleep(0.05)
+
+                # 3. 发送单元详情 (Unit Review)
+                if isinstance(issues, list) and issues and state["progress_manager"]:
+                    stage_name = f"AI审查:{section_label}"
+
+                    await state["progress_manager"].update_stage_progress(
+                        callback_task_id=state["callback_task_id"],
+                        stage_name=stage_name,
+                        current=final_current,  # 【关键】使用与 Flag 事件完全一致的进度值
+                        status="unit_review_update",
+                        message=f"发现{issues_count}个问题: {section_label}",
+                        issues=issues,
+                        user_id=state.get("user_id", ""),
+                        overall_task_status="processing",
+                        event_type="unit_review"
+                    )
+
+                    # 再次微小延迟,确保 Clear 不会吞掉 Review
+                    await asyncio.sleep(0.02)
+
+                    # 清空当前issues
+                    await state["progress_manager"].update_stage_progress(
+                        callback_task_id=state["callback_task_id"],
+                        issues=['clear']
+                    )
+
+            except Exception as e:
+                logger.error(f"发送单元审查详情失败: {str(e)}")
+
+    async def _send_unit_overall_progress(self, state: AIReviewState, unit_index: int,
+                                           total_units: int, section_label: str,
+                                           issues_count: int = None) -> Optional[int]:
+        """
+        发送单元完成进度更新 - 返回计算出的实时进度
+        Returns:
+            int: 基于 Redis 统计的实时进度百分比
+        """
+        current_percent = None
+        try:
+            task_id = state.get("callback_task_id", "")
+            redis_client = None
+            try:
+                redis_client = await RedisConnectionFactory.get_connection()
+            except Exception as e:
+                logger.error(f"Redis连接失败: {str(e)}")
+
+            completed_count = 0
+            
+            if redis_client and task_id:
+                completed_key = f"ai_review:overall_task_progress:{task_id}:completed"
+                # 原子操作
+                await redis_client.sadd(completed_key, str(unit_index))
+                await redis_client.expire(completed_key, 3600)
+                completed_count = await redis_client.scard(completed_key)
+                
+                # 计算进度
+                current_percent = int((completed_count / total_units) * 100)
+            else:
+                # 降级方案
+                completed_count = unit_index + 1
+                current_percent = int((completed_count / total_units) * 100)
+
+            # 构建消息
+            if issues_count is not None and issues_count > 0:
+                message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
+            else:
+                message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}"
+
+            logger.info(f"进度更新: {current_percent}% - {message}")
+
+            if state["progress_manager"]:
+                await state["progress_manager"].update_stage_progress(
+                    callback_task_id=state["callback_task_id"],
+                    stage_name="AI审查",
+                    current=current_percent,
+                    status="processing",
+                    message=message,
+                    user_id=state.get("user_id", ""),
+                    overall_task_status="processing",
+                    event_type="processing_flag"
+                )
+            
+            return current_percent
+
+        except Exception as e:
+            logger.warning(f"发送单元完成进度更新失败: {str(e)}")
+            # 发生异常时,尝试返回一个基于 index 的估算值
+            try:
+                return int(((unit_index + 1) / total_units) * 100)
+            except:
+                return 0
+
+    def _filter_review_units(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """
+        根据配置筛选要审查的单元
+
+        Args:
+            chunks: 所有审查单元
+
+        Returns:
+            List[Dict[str, Any]]: 筛选后的审查单元
+
+        Note:
+            根据max_review_units和review_mode配置来筛选审查单元
+        """
+        if not self.max_review_units or self.max_review_units >= len(chunks):
+            # 如果没有限制或限制数量大于等于总数,返回所有单元
+            return chunks
+
+        if self.review_mode == "first":
+            # 返回前N个单元
+            return chunks[:self.max_review_units]
+        elif self.review_mode == "random":
+            # 随机选择N个单元
+            return random.sample(chunks, self.max_review_units)
+        else:
+            # 默认返回所有审查单元
+            return chunks
+
+    def _replace_review_suffix(self,items: List[str], mapping: Dict[str, Union[str, List[str]]]) -> List[str]:
+        """
+        将列表中字符串的下划线后内容替换为映射表中的对应值
+        处理多值映射(如completeness_check对应两个值时生成两条结果)
+        """
+        result = []
+        for item in items:
+            # 拆分下划线前后部分(只拆分第一个下划线,因为后缀可能包含下划线)
+            if '_' in item:
+                prefix, suffix = item.split('_', 1)  # split('_', 1) 确保只拆分一次
+            else:
+                # 无下划线时直接保留原字符串(根据实际需求可调整)
+                result.append(item)
+                continue
+            
+            # 获取映射值,无匹配时保留原后缀
+            mapped_value = mapping.get(suffix, suffix)
+            
+            # 处理单值映射
+            if isinstance(mapped_value, str):
+                result.append(f"{prefix}_{mapped_value}")
+            # 处理多值映射(列表类型)
+            elif isinstance(mapped_value, list):
+                for val in mapped_value:
+                    result.append(f"{prefix}_{val}")
+        
+        return result

+ 33 - 0
core/construction_review/workflows/types/__init__.py

@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+@Project   : lq-agent-api
+@File      : types.py
+@IDE       : VsCode
+@Date      : 2026-01-08
+
+工作流类型定义模块
+
+本模块定义了工作流相关的类型,避免循环导入问题。
+"""
+
+from typing import Optional, TypedDict, Annotated, List, Dict, Any
+from langchain_core.messages import BaseMessage
+from langgraph.graph.message import add_messages
+
+
+class AIReviewState(TypedDict):
+    """AI审查工作流状态"""
+
+    file_id: str
+    callback_task_id: str
+    file_name: str
+    user_id: str
+    structured_content: Dict[str, Any]
+    review_results: Optional[Dict[str, Any]]
+    current_stage: str
+    status: str
+    error_message: Optional[str]
+    progress_manager: Optional[Any]
+    messages: Annotated[List[BaseMessage], add_messages]