Преглед изворни кода

v0.0.5-功能优化-_ai_review_node_check_item
- 基本流程调通,可获取结果

WangXuMing пре 1 месец
родитељ
комит
09a3af646b

+ 27 - 163
core/construction_review/component/ai_review_engine.py

@@ -526,21 +526,11 @@ class AIReviewEngine(BaseReviewer):
             logger.info(f"[技术审查] 总共创建了 {len(technical_tasks)} 个动态审查任务")
 
         else:
-            # 没有entity_results或未配置专业性审查,直接跳过该条审查
-            logger.warning("[技术审查] 未提供 entity_results 或未配置专业性审查,跳过该条审查")
-
-            # 直接返回空结果,不执行任何审查任务
-            return {
-                "non_parameter_compliance": self._process_review_result(None),
-                "parameter_compliance": self._process_review_result(None),
-            }
+            # 没有entity_results,直接返回None
+            logger.warning("[技术审查] 未提取到实体,跳过技术性审查")
+            return None
 
         # 一次性执行所有任务,避免重复协程调用
-        if not technical_tasks:
-            return {
-                "non_parameter_compliance": self._process_review_result(None),
-                "parameter_compliance": self._process_review_result(None),
-            }
 
         # 使用 asyncio.wait 替代 gather,提供更好的超时控制
         # 整体超时时间 = 单个任务超时 × 任务数量 + 缓冲时间
@@ -565,84 +555,37 @@ class AIReviewEngine(BaseReviewer):
                 logger.error(f"[技术审查] 异常详情: {repr(e)}")
                 results.append(e)
 
-        # 根据配置项分配结果
-        # 判断是否使用 entity_results 模式(多查询对模式)
-        if entity_results and len(entity_results) > 0:
-            # 多查询对模式:按查询对分组处理所有结果
-            logger.info(f"[技术审查] 处理多查询对审查结果,共 {len(entity_results)} 个查询对")
-
-            entity_review_results = []
-            result_index = 0
-
-            # 遍历每个查询对,提取对应的审查结果
-            for idx, entity_item in enumerate(entity_results):
-                entity_review_item = {
-                    'entity': entity_item.get('entity', f'entity_{idx}'),
-                    'combined_query': entity_item.get('combined_query', ''),
-                    'file_name': entity_item.get('file_name', ''),
-                    'text_content': entity_item.get('text_content', ''),
-                    'bfp_rerank_score': entity_item.get('bfp_rerank_score', 0.0)
-                }
-
-                # 提取非参数性审查结果(如果配置了)
-                if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
-                    if result_index < len(results):
-                        entity_review_item['non_parameter_compliance'] = self._process_review_result(
-                            results[result_index]
-                        )
-                        logger.info(f"[技术审查] 查询对 {idx} 非参数性审查结果已处理")
-                        result_index += 1
-                    else:
-                        logger.warning(f"[技术审查] 查询对 {idx} 缺少非参数性审查结果")
-                        entity_review_item['non_parameter_compliance'] = self._process_review_result(None)
-
-                # 提取参数性审查结果(如果配置了)
-                if 'parameter_compliance_check' in self.task_info.get_review_config_list():
-                    if result_index < len(results):
-                        entity_review_item['parameter_compliance'] = self._process_review_result(
-                            results[result_index]
-                        )
-                        logger.info(f"[技术审查] 查询对 {idx} 参数性审查结果已处理")
-                        result_index += 1
-                    else:
-                        logger.warning(f"[技术审查] 查询对 {idx} 缺少参数性审查结果")
-                        entity_review_item['parameter_compliance'] = self._process_review_result(None)
-
-                entity_review_results.append(entity_review_item)
-
-            logger.info(f"[技术审查] 成功处理 {len(entity_review_results)} 个查询对的审查结果")
-
-            return {
-                'review_mode': 'entity_based',  # 标记为基于查询对的审查模式
-                'entity_review_results': entity_review_results,
-                'total_entities': len(entity_results),
-                'total_results_processed': result_index
-            }
-
-        else:
-            # 通用审查模式(未使用 entity_results)
-            logger.info("[技术审查] 处理通用审查模式结果")
+        # 扁平化处理结果:直接返回每个实体的审查结果
+        logger.info(f"[技术审查] 处理审查结果,共 {len(entity_results)} 个实体")
 
-            non_parameter_result = self._process_review_result(None)
-            parameter_result = self._process_review_result(None)
+        flattened_results = {}
+        result_index = 0
 
-            result_index = 0
+        for idx, entity_item in enumerate(entity_results):
+            entity = entity_item.get('entity', f'entity_{idx}')
 
+            # 提取非参数性审查结果(如果配置了)
             if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():
+                key = f'non_parameter_compliance_{entity}_{idx}'
                 if result_index < len(results):
-                    non_parameter_result = self._process_review_result(results[result_index])
+                    flattened_results[key] = self._process_review_result(results[result_index])
+                    logger.info(f"[技术审查] {key} 结果已处理")
+                else:
+                    logger.warning(f"[技术审查] {key} 缺少结果")
                 result_index += 1
 
+            # 提取参数性审查结果(如果配置了)
             if 'parameter_compliance_check' in self.task_info.get_review_config_list():
+                key = f'parameter_compliance_{entity}_{idx}'
                 if result_index < len(results):
-                    parameter_result = self._process_review_result(results[result_index])
+                    flattened_results[key] = self._process_review_result(results[result_index])
+                    logger.info(f"[技术审查] {key} 结果已处理")
+                else:
+                    logger.warning(f"[技术审查] {key} 缺少结果")
                 result_index += 1
 
-            return {
-                'review_mode': 'general',  # 标记为通用审查模式
-                'non_parameter_compliance': non_parameter_result,
-                'parameter_compliance': parameter_result
-            }
+        logger.info(f"[技术审查] 成功处理 {len(flattened_results)} 个审查结果")
+        return flattened_results
 
     async def sensitive_word_check(self, trace_id_idx: str, review_content: str,
                           state: str, stage_name: str) -> Dict[str, Any]:
@@ -789,7 +732,7 @@ class AIReviewEngine(BaseReviewer):
             )
             logger.info("  组件初始化完成")
             
-            start_time = time.time()
+
             # 3. 执行审查
             logger.info("\n[4/5] 开始执行审查...")
             logger.info(f"  使用模型: {llm_client.model_type}")
@@ -812,89 +755,10 @@ class AIReviewEngine(BaseReviewer):
             logger.info(f"  规范覆盖汇总结果已保存至: {spec_summary_csv_path}")
 
             # 生成缺失要点 JSON 列表,便于前端消费
-            missing_issue_json_path = Path(r'temp\document_temp') / 'spec_review_missing_issues.json'
-            missing_issue_list = analyzer.build_missing_issue_list(summary_rows)
-            write_json(missing_issue_list, str(missing_issue_json_path))
-            logger.info(f"  缺失要点 JSON 已保存至: {missing_issue_json_path}")
-            cost_time = time.time() - start_time
-
-            # 构建details字段,包含审查结果
-            # details = {
-            #     'chunk_id': chunk_id,
-            #     'name': 'completeness_check',
-            #     'chapter_classification': chapter_classification,
-            #     'section_label': doc.get('section_label', ''),
-            #     'requirements_count': len(requirements),
-            #     'checked_items': len(review_result),
-            #     'response': review_result[0] if review_result else {},
-            # }
-            
-            result_list = []
-            for index, missing_issue in enumerate(missing_issue_list):
-                details = {
-                    'chunk_id': f'chunk_id_{index}',
-                    'name': 'completeness_check',
-                    'chapter_classification': 'chapter_classification',
-                    'section_label': 'section_label',
-                    'requirements_count': 'requirements_count',
-                    'checked_items': len(missing_issue),
-                    'response': missing_issue if missing_issue else {},
-                }
-
-                # 创建ReviewResult对象
-                from core.construction_review.component.reviewers.base_reviewer import ReviewResult
-                result = ReviewResult(
-                    success=True,
-                    details=details,
-                    error_message=None,
-                    execution_time=cost_time
-                )
-
-                # 将ReviewResult对象转换为字典格式
-                result_dict = {"details":result.details,"success":result.success,"error_message":result.error_message,"execution_time":result.execution_time}
-                result_list.append(result_dict)
-                # with open('temp/completeness_check_result_1.json','w',encoding='utf-8') as f:
-                #     json.dump(result_dict, f, ensure_ascii=False,indent=4)
-                
-                # 将审查结果转换为字典格式,添加到issues中
-                review_result_data = {
-                    'name': name,
-                    'success': result.success,
-                    'details': result.details,
-                    'error_message': result.error_message,
-                    'execution_time': result.execution_time,
-                    'timestamp': time.time()
-                }
-                
-                # 推送审查完成信息
-                state_dict = None
-                if state:
-                    if isinstance(state, dict):
-                        state_dict = state
-                    elif isinstance(state, str):
-                        try:
-                            state_dict = json.loads(state)
-                        except (json.JSONDecodeError, AttributeError):
-                            pass
-                # 更新进度
-                if state_dict and state_dict.get("progress_manager"):
-                    asyncio.create_task(
-                        state_dict["progress_manager"].update_stage_progress(
-                            callback_task_id=state_dict.get("callback_task_id"),
-                            stage_name=stage_name,
-                            current=None,
-                            status="processing",
-                            message=f"{name} 要点审查完成 (chunk_id: {chunk_id}_{index}), 耗时: {result.execution_time:.2f}s",
-                            issues=[review_result_data],
-                            event_type="processing"
-                        )
-                    )
-                logger.info(f"{name} 审查完成 (chunk_id: {chunk_id}_{index}), 耗时: {result.execution_time:.2f}s")
-
-            return {
-                        'completeness_review_result': result_list
-                    }
+            #missing_issue_json_path = Path(r'temp\document_temp') / 'spec_review_missing_issues.json'
+            issues = await analyzer.build_missing_issue_list(summary_rows,state,start_time)
 
+            return  issues
         except Exception as e:
             execution_time = time.time() - start_time
             error_msg = f"{name} 审查失败: {str(e)}"

+ 118 - 14
core/construction_review/component/reviewers/check_completeness/components/result_analyzer.py

@@ -1,6 +1,8 @@
 """
 结果汇总与规范覆盖分析组件
 """
+import time
+import asyncio
 from typing import Dict, List, Any, Set
 import ast
 import sys
@@ -13,7 +15,9 @@ if str(_root) not in sys.path:
 
 from interfaces import IResultAnalyzer
 from utils.file_utils import read_csv, write_csv
-
+from ...utils.inter_tool import InterTool
+from ...base_reviewer import ReviewResult
+from core.construction_review.workflows.core_functions.ai_review_core_fun import UnitReviewResult
 
 class ResultAnalyzer(IResultAnalyzer):
     """审查结果汇总分析器"""
@@ -24,6 +28,7 @@ class ResultAnalyzer(IResultAnalyzer):
             spec_csv_path: 规范 CSV 文件路径(Construction_Plan_Content_Specification.csv)
         """
         self.spec_csv_path = spec_csv_path
+        self.inter_tool = InterTool()
 
     def process_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
         """
@@ -42,6 +47,7 @@ class ResultAnalyzer(IResultAnalyzer):
             section_label = item.get("section_label", "") or ""
             chapter_classification = item.get("chapter_classification", "") or ""
             review_result = item.get("review_result", {})
+            content = item.get("content", "")
 
             if not isinstance(review_result, dict):
                 review_result = {}
@@ -77,6 +83,7 @@ class ResultAnalyzer(IResultAnalyzer):
                     "section_label": section_label,
                     "chapter_classification": chapter_classification,
                     "review_result": new_review_result,
+                    "content": content,
                 }
             )
 
@@ -104,15 +111,31 @@ class ResultAnalyzer(IResultAnalyzer):
         # key: (chapter_classification, level2)
         points_found_map: Dict[str, Set[int]] = {}
         sources_map: Dict[str, Set[str]] = {}
+        content_map: Dict[str, str] = {}  # 二级目录的 content 映射
+        section_label_map: Dict[str, str] = {}  # 二级目录的 section_label 映射
+        # 一级章节的映射(作为后备,当找不到二级目录时使用)
+        chapter_content_map: Dict[str, str] = {}  # 按 chapter_classification 索引
+        chapter_section_label_map: Dict[str, str] = {}  # 按 chapter_classification 索引
 
         for item in processed_results:
             chapter_classification = (item.get("chapter_classification") or "").strip()
             section_label = (item.get("section_label") or "").strip()
             review_result = item.get("review_result", {}) or {}
+            content = item.get("content", "")
 
             if not chapter_classification or not isinstance(review_result, dict):
                 continue
 
+            # 记录一级章节信息(作为后备)
+            if chapter_classification not in chapter_content_map:
+                chapter_content_map[chapter_classification] = content
+                chapter_section_label_map[chapter_classification] = section_label
+            else:
+                # 如果已存在且当前内容非空,更新为最新的
+                if content:
+                    chapter_content_map[chapter_classification] = content
+                    chapter_section_label_map[chapter_classification] = section_label
+
             # 截断来源标题:只保留前两段(按 "->" 分隔)
             if "->" in section_label:
                 parts = [p.strip() for p in section_label.split("->") if p.strip()]
@@ -131,6 +154,8 @@ class ResultAnalyzer(IResultAnalyzer):
                 if key not in points_found_map:
                     points_found_map[key] = set()
                     sources_map[key] = set()
+                    content_map[key] = ""  # 初始化 content
+                    section_label_map[key] = ""  # 初始化 section_label
 
                 if isinstance(points, list) and points:
                     for p in points:
@@ -140,6 +165,9 @@ class ResultAnalyzer(IResultAnalyzer):
                             points_found_map[key].add(int(p))
                     # 只要该块在该二级目录下有任何要点,就记录来源(截断后的标题)
                     sources_map[key].add(source_label)
+                    # 记录 content 和 section_label(使用最后一个遇到的内容)
+                    content_map[key] = content
+                    section_label_map[key] = section_label
 
         # 根据规范逐行生成统计结果
         summary_rows: List[Dict[str, Any]] = []
@@ -166,12 +194,22 @@ class ResultAnalyzer(IResultAnalyzer):
 
             sources = sorted(sources_map.get(key, set())) if key in sources_map else []
 
+            # 获取 content 和 section_label,优先使用二级目录映射,找不到则使用一级章节映射
+            content = content_map.get(key, "")
+            section_label = section_label_map.get(key, "")
+            # 如果二级目录映射为空,使用一级章节映射作为后备
+            if not content:
+                content = chapter_content_map.get(tag, "")
+            if not section_label:
+                section_label = chapter_section_label_map.get(tag, "")
+
             # 组装输出行(在原规范行基础上增加三列)
             new_row = dict(row)
             new_row["审查到的要点"] = str(found_points)
             new_row["缺失的要点"] = str(missing_points)
             new_row["要点来源"] = str(sources)
-
+            new_row['content'] = content
+            new_row['section_label'] = section_label
             summary_rows.append(new_row)
 
         # 写出 CSV,使用 UTF-8-SIG 编码(write_csv 内部已固定为 utf-8-sig)
@@ -181,12 +219,23 @@ class ResultAnalyzer(IResultAnalyzer):
         return summary_rows
 
     # 生成缺失要点的 JSON 列表,便于前端或其他系统直接消费
-    def build_missing_issue_list(
-        self, summary_rows: List[Dict[str, Any]]
+    async def build_missing_issue_list(
+        self, summary_rows: List[Dict[str, Any]], state: Dict[str, Any] = None, start_time: float = None
     ) -> List[Dict[str, Any]]:
+        """
+        构建缺失要点的问题列表(异步版本)
+
+        Args:
+            summary_rows: 规范汇总行数据
+            state: 状态字典(可选,用于回调)
+            start_time: 开始时间(可选)
+
+        Returns:
+            List[Dict]: 缺失要点的问题列表
+        """
         issues: List[Dict[str, Any]] = []
 
-        for row in summary_rows:
+        for i, row in enumerate(summary_rows):
             level2 = (row.get("二级目录") or "").strip()
             requirement = (row.get("内容要求") or "").strip()
 
@@ -199,7 +248,6 @@ class ResultAnalyzer(IResultAnalyzer):
             sources = self._parse_list_field(sources_raw)
             location = "; ".join(map(str, sources)) if sources else ""
 
-            # missing_numbers_text = "、".join(str(m) for m in missing_points)
             requirement_list = requirement.split(':')[-1].split(';')
             requirement_text = ';'.join([requirement_list[i-1] for i in missing_points])
             issue_point = (
@@ -207,17 +255,73 @@ class ResultAnalyzer(IResultAnalyzer):
             )
             suggestion = f"补充:{requirement_text}" if requirement else "补充缺失要点内容"
             risk_level = self._map_risk_level(len(missing_points))
+            chapter_code = row.get("标签")
+            content = row.get("content", "")
+            section_label = row.get("section_label", "")
+
+            # 构建兼容模型响应格式
+            model_response = [{
+                "issue_point": issue_point,
+                "location": location,
+                "suggestion": suggestion,
+                "reason": requirement,
+                "risk_level": risk_level,
+            }]
+
+            # 构造审查结果
+            completeness_check_result = ReviewResult(
+                success=True,
+                details={
+                    "name": "completeness_check",
+                    "response": model_response
+                },
+                error_message=None,
+                execution_time=time.time() - start_time if start_time else 0
+            )
 
-            issues.append(
-                {
-                    "issue_point": issue_point,
-                    "location": location,
-                    "suggestion": suggestion,
-                    "reason": requirement,
-                    "risk_level": risk_level,
+            # # 构造 issue 字典并添加到 issues 列表
+            # issue = {
+            #     "issue_point": issue_point,
+            #     "location": location,
+            #     "suggestion": suggestion,
+            #     "requirement": requirement,
+            #     "risk_level": risk_level,
+            #     "chapter_code": chapter_name,
+            #     "section_label": section_label,
+            #     "missing_points": missing_points,
+            #     "check_name": "completeness_check",
+            # }
+            # issues.append(issue)
+
+            # 如果提供了 state,发送进度更新
+            if state is not None:
+                # 构造 UnitReviewResult 对象
+                unit_review_result = UnitReviewResult(
+                    unit_index=i,
+                    unit_content={"section_label": section_label, "chapter_code": chapter_code},
+                    basic_compliance={"completeness_check": completeness_check_result},
+                    technical_compliance={},
+                    rag_enhanced={},
+                    overall_risk=risk_level
+                )
+
+                # 构造 completeness_config
+                completeness_config = {
+                    "section_label": section_label,
+                    "chapter_code": chapter_code,
+                    "content": content
+                    
                 }
-            )
 
+                # 调用 _complete_format 发送进度更新
+                new_issues = await self.inter_tool._complete_format(
+                    unit_review_result,
+                    state,
+                    i,
+                    len(summary_rows),
+                    completeness_config
+                )
+                issues.append(new_issues)
         return issues
 
     @staticmethod

+ 6 - 1
core/construction_review/component/reviewers/check_completeness/components/review_pipeline.py

@@ -11,6 +11,7 @@ _root = Path(__file__).parent.parent
 if str(_root) not in sys.path:
     sys.path.insert(0, str(_root))
 
+from foundation.observability import logger
 from interfaces import IReviewPipeline, IPromptBuilder, ILLMClient, IResultProcessor
 
 
@@ -46,7 +47,11 @@ class ReviewPipeline(IReviewPipeline):
             
         Returns:
             审查结果列表,每个结果包含原始文档信息和审查结果
+            
         """
+
+
+
         # 创建信号量控制并发数
         semaphore = asyncio.Semaphore(self.max_concurrent)
         
@@ -55,7 +60,7 @@ class ReviewPipeline(IReviewPipeline):
         for doc in documents:
             task = self._review_single_document(doc, specification, semaphore)
             tasks.append(task)
-        
+
         # 并发执行所有任务
         results = await asyncio.gather(*tasks, return_exceptions=True)
         

+ 1 - 1
core/construction_review/component/reviewers/check_completeness/main.py

@@ -104,7 +104,7 @@ async def main():
 
     # 生成缺失要点 JSON 列表,便于前端消费
     missing_issue_json_path = base_dir / 'output' / 'spec_review_missing_issues.json'
-    missing_issue_list = analyzer.build_missing_issue_list(summary_rows)
+    missing_issue_list = await analyzer.build_missing_issue_list(summary_rows)
     write_json(missing_issue_list, str(missing_issue_json_path))
     print(f"  缺失要点 JSON 已保存至: {missing_issue_json_path}")
     cost_time = time.time() - start_time

+ 293 - 79
core/construction_review/component/reviewers/utils/inter_tool.py

@@ -7,6 +7,7 @@ InterTool 类 - AI 审查工具类
 负责 AI 审查结果的辅助计算和结果处理功能。
 """
 
+import asyncio
 import json
 import re
 from typing import Optional, Dict, Any, List, TypedDict
@@ -73,12 +74,14 @@ class InterTool:
             logger.warning(f"风险等级计算异常: {str(e)},使用默认风险等级")
             return DEFAULT_RISK_LEVEL
 
-    def _aggregate_results(self, successful_results: List[List[Dict[str, Any]]]) -> Dict[str, Any]:
+    def _aggregate_results(self, successful_results: List) -> Dict[str, Any]:
         """
         汇总审查结果(issues格式)
 
         Args:
-            successful_results: 成功的审查结果列表(issues格式),每个单元返回一个issues列表
+            successful_results: 成功的审查结果列表,支持两种格式:
+                - 嵌套列表格式:[[issue1, issue2], [issue3], ...],每个单元返回一个issues列表
+                - 扁平列表格式:[issue1, issue2, issue3, ...],所有issues合并为扁平列表
 
         Returns:
             Dict[str, Any]: 汇总后的统计信息,包含以下字段:
@@ -88,6 +91,7 @@ class InterTool:
 
         Note:
             当输入为空时返回空字典,异常时记录错误并返回空字典
+            自动检测输入格式(嵌套列表 or 扁平列表)并分别处理
         """
         try:
             if not successful_results:
@@ -97,27 +101,62 @@ class InterTool:
             risk_stats = {"low": 0, "medium": 0, "high": 0}
             total_issues = 0
 
-            for unit_issues in successful_results:
-                # 每个unit_issues是一个issues列表
-                if unit_issues and isinstance(unit_issues, list):
-                    total_issues += len(unit_issues)
+            # 检测输入格式(通过第一个元素判断是扁平列表还是嵌套列表)
+            is_flat_format = False
+            if successful_results and len(successful_results) > 0:
+                first_element = successful_results[0]
+                if isinstance(first_element, dict):
+                    # 第一个元素是字典,说明是扁平列表格式
+                    is_flat_format = True
+                    logger.debug(f"检测到扁平列表格式,总元素数: {len(successful_results)}")
+                elif isinstance(first_element, list):
+                    # 第一个元素是列表,说明是嵌套列表格式
+                    logger.debug(f"检测到嵌套列表格式,总单元数: {len(successful_results)}")
+
+            if is_flat_format:
+                # 扁平列表格式: [issue1, issue2, issue3, ...]
+                # 每个 issue 是一个 dict: {issue_id: {risk_summary: {...}, ...}}
+                for issue in successful_results:
+                    if isinstance(issue, dict):
+                        for issue_data in issue.values():
+                            risk_summary = issue_data.get('risk_summary', {})
+                            max_risk = risk_summary.get('max_risk_level', '0')
+
+                            if max_risk in risk_stats:
+                                risk_stats[max_risk] += 1
+                            elif max_risk == '0':
+                                risk_stats['low'] += 1  # 无风险视为低风险
+
+                total_issues = len(successful_results)
+                total_reviewed = len(successful_results)
 
-                    # 统计每个issue中的风险等级
-                    for issue in unit_issues:
-                        if isinstance(issue, dict):
-                            # issue格式: {issue_id: {risk_summary: {...}}}
-                            for issue_data in issue.values():
-                                risk_summary = issue_data.get('risk_summary', {})
-                                max_risk = risk_summary.get('max_risk_level', '0')
+            else:
+                # 嵌套列表格式: [[issue1, issue2], [issue3], ...]
+                # 每个 unit_issues 是一个 issues 列表
+                for unit_issues in successful_results:
+                    if unit_issues and isinstance(unit_issues, list):
+                        total_issues += len(unit_issues)
+
+                        # 统计每个issue中的风险等级
+                        for issue in unit_issues:
+                            if isinstance(issue, dict):
+                                # issue格式: {issue_id: {risk_summary: {...}}}
+                                for issue_data in issue.values():
+                                    risk_summary = issue_data.get('risk_summary', {})
+                                    max_risk = risk_summary.get('max_risk_level', '0')
+
+                                    if max_risk in risk_stats:
+                                        risk_stats[max_risk] += 1
+                                    elif max_risk == '0':
+                                        risk_stats['low'] += 1  # 无风险视为低风险
+
+                total_reviewed = len(successful_results)
 
-                                if max_risk in risk_stats:
-                                    risk_stats[max_risk] += 1
-                                elif max_risk == '0':
-                                    risk_stats['low'] += 1  # 无风险视为低风险
+            logger.info(f"结果汇总完成: total_issues={total_issues}, total_reviewed={total_reviewed}, risk_stats={risk_stats}")
 
             return {
                 'risk_stats': risk_stats,
-                'total_reviewed': len(successful_results),
+                'total_reviewed': total_reviewed,
                 'total_issues': total_issues
             }
         except (ZeroDivisionError, KeyError, TypeError) as e:
@@ -164,10 +203,30 @@ class InterTool:
         # 合并所有审查结果
         all_results = {}
 
-        # check_item 模式:如果提供了 merged_results,直接使用
+        # check_item 模式:如果提供了 merged_results,需要特殊处理
         if merged_results is not None:
             logger.debug(f"使用 check_item 模式的合并结果: {list(merged_results.keys()) if isinstance(merged_results, dict) else 'N/A'}")
-            all_results.update(merged_results)
+
+            # 检查是否是 entity_based 模式
+            if isinstance(merged_results, dict) and merged_results.get('review_mode') == 'entity_based' and 'entity_review_results' in merged_results:
+                # entity_based 模式:需要提取嵌套的审查结果
+                logger.debug(f"🔍 [DEBUG] merged_results 是 entity_based 模式,提取审查结果")
+                entity_review_results = merged_results.get('entity_review_results', [])
+                func_name = merged_results.get('func_name', '')
+
+                for idx, entity_item in enumerate(entity_review_results):
+                    entity = entity_item.get('entity', f'entity_{idx}')
+
+                    # ✅ 实际结构:result 字段包含 ReviewResult 对象
+                    if 'result' in entity_item:
+                        result_key = f'{func_name}_{entity}_{idx}'  # 使用 func_name 作为键名前缀
+                        all_results[result_key] = entity_item['result']
+                        logger.debug(f"🔍 [DEBUG] 提取审查结果: {result_key}, 结果类型: {type(entity_item['result'])}")
+
+                logger.debug(f"🔍 [DEBUG] entity_based 模式处理完成,共提取 {len(entity_review_results)} 个实体的审查结果")
+            else:
+                # 普通模式:直接使用 merged_results
+                all_results.update(merged_results)
         else:
             # core_review 模式:合并 basic_result 和 technical_result
             if basic_result:
@@ -176,44 +235,23 @@ class InterTool:
 
             if technical_result:
                 logger.debug(f"🔍 [DEBUG] technical_result 类型: {type(technical_result)}, 键: {list(technical_result.keys()) if isinstance(technical_result, dict) else 'N/A'}")
-
-                # 检查是否是 entity_based 模式
-                if technical_result.get('review_mode') == 'entity_based' and 'entity_review_results' in technical_result:
-                    # entity_based 模式:从 entity_review_results 中提取实际审查结果
-                    logger.debug(f"🔍 [DEBUG] 检测到 entity_based 模式,从 entity_review_results 提取审查结果")
-                    entity_review_results = technical_result.get('entity_review_results', [])
-                    total_entities = technical_result.get('total_entities', 0)
-
-                    for idx, entity_item in enumerate(entity_review_results):
-                        entity = entity_item.get('entity', f'entity_{idx}')
-                        entity_info = f"{entity}_{idx}"  # 使用 entity+索引 避免重复
-
-                        # 提取非参数性审查结果
-                        if 'non_parameter_compliance' in entity_item:
-                            result_key = f'non_parameter_compliance_{entity_info}'
-                            all_results[result_key] = entity_item['non_parameter_compliance']
-                            logger.debug(f"🔍 [DEBUG] 提取审查结果: {result_key}")
-
-                        # 提取参数性审查结果
-                        if 'parameter_compliance' in entity_item:
-                            result_key = f'parameter_compliance_{entity_info}'
-                            all_results[result_key] = entity_item['parameter_compliance']
-                            logger.debug(f"🔍 [DEBUG] 提取审查结果: {result_key}")
-
-                    logger.debug(f"🔍 [DEBUG] entity_based 模式处理完成,共提取 {len(entity_review_results)} 个实体的审查结果")
-
-                else:
-                    # general 模式:过滤掉元数据字段,保留实际审查结果
-                    filtered_technical = {}
-                    metadata_keys = ['review_mode', 'entity_review_results', 'total_entities', 'total_results_processed']
-                    for key, value in technical_result.items():
-                        if key not in metadata_keys:
-                            filtered_technical[key] = value
-                        else:
-                            logger.debug(f"跳过技术审查元数据字段: {key} = {value} (类型: {type(value).__name__})")
-
-                    logger.debug(f"🔍 [DEBUG] 过滤后的 technical_result 键: {list(filtered_technical.keys())}")
-                    all_results.update(filtered_technical)
+                # 遍历 technical_result,处理可能的 entity_based 模式
+                for tech_key, tech_value in technical_result.items():
+                    if isinstance(tech_value, dict) and tech_value.get('review_mode') == 'entity_based':
+                        # entity_based 模式:提取嵌套的审查结果
+                        logger.debug(f"🔍 [DEBUG] technical_result[{tech_key}] 是 entity_based 模式,提取审查结果")
+                        entity_review_results = tech_value.get('entity_review_results', [])
+                        func_name = tech_value.get('func_name', tech_key)
+                        
+                        for idx, entity_item in enumerate(entity_review_results):
+                            entity = entity_item.get('entity', f'entity_{idx}')
+                            if 'result' in entity_item:
+                                result_key = f'{func_name}_{entity}_{idx}'
+                                all_results[result_key] = entity_item['result']
+                                logger.debug(f"🔍 [DEBUG] 提取审查结果: {result_key}, 结果类型: {type(entity_item['result'])}")
+                    else:
+                        # 普通模式:直接添加
+                        all_results[tech_key] = tech_value
 
         logger.debug(f"开始格式化审查结果,合并后结果: {list(all_results.keys())}")
 
@@ -224,22 +262,32 @@ class InterTool:
                 logger.debug(f"跳过分数字段: {check_key}")
                 continue
 
-            # 🔧 类型安全检查:确保 check_result 是字典类型
-            if not isinstance(check_result, dict):
-                logger.warning(f"⚠️ 检查项 {check_key} 的结果类型不是字典: {type(check_result).__name__}, 跳过")
+            # 🔧 类型安全检查:支持字典和 base_reviewer.ReviewResult 对象
+            is_dict = isinstance(check_result, dict)
+            is_review_result = hasattr(check_result, 'details') and hasattr(check_result, 'success')
+
+            if not is_dict and not is_review_result:
+                logger.warning(f"⚠️ 检查项 {check_key} 的结果类型不支持: {type(check_result).__name__}, 跳过")
                 continue
 
             # 检查 check_result 是否为 None 或不包含 details
-            if not check_result or "details" not in check_result:
+            if is_dict and (not check_result or "details" not in check_result):
                 logger.warning(f"检查项 {check_key} 结果为空或缺少details字段,跳过")
                 continue
 
-            check_name = check_result["details"].get("name")
-            if "response" in check_result["details"]:
-                response = check_result["details"]["response"]
-                check_name = check_result["details"].get("name")
-                reference_source = check_result["details"].get("rag_reference_source")
-                review_references = check_result["details"].get("rag_review_references")
+            if is_review_result and not check_result.details:
+                logger.warning(f"检查项 {check_key} 的 base_reviewer.ReviewResult 中 details 为空,跳过")
+                continue
+
+            # 根据类型获取 details
+            details = check_result["details"] if is_dict else check_result.details
+
+            check_name = details.get("name")
+            if "response" in details:
+                response = details["response"]
+                check_name = details.get("name")
+                reference_source = details.get("rag_reference_source")
+                review_references = details.get("rag_review_references")
 
 
 
@@ -444,9 +492,26 @@ class InterTool:
 
         return review_lists
 
-    def _extract_json_data(self, response: str):
-        """从响应中提取JSON数据,合并所有提取策略"""
+    def _extract_json_data(self, response):
+        """
+        从响应中提取JSON数据,合并所有提取策略
+
+        Args:
+            response: 可以是字符串、列表或字典
+
+        Returns:
+            提取的JSON数据(列表或字典),如果提取失败返回None
+        """
         try:
+            # 如果 response 已经是列表或字典,直接返回
+            if isinstance(response, (list, dict)):
+                return response
+
+            # 确保 response 是字符串
+            if not isinstance(response, str):
+                logger.warning(f"响应类型错误: {type(response)}, 期望字符串")
+                return None
+
             # 尝试直接解析整个响应
             response_stripped = response.strip()
             if ((response_stripped.startswith('{') and response_stripped.endswith('}')) or
@@ -467,10 +532,18 @@ class InterTool:
                 matches = re.findall(pattern, response, re.DOTALL)
                 if matches:
                     for match in matches:
-                        try:
-                            return json.loads(match.strip())
-                        except json.JSONDecodeError:
-                            continue
+                        # 确保 match 是字符串
+                        if isinstance(match, str):
+                            try:
+                                return json.loads(match.strip())
+                            except json.JSONDecodeError:
+                                continue
+                        # 如果 match 是列表/元组(多捕获组),取第一个元素
+                        elif isinstance(match, (list, tuple)) and len(match) > 0:
+                            try:
+                                return json.loads(str(match[0]).strip())
+                            except (json.JSONDecodeError, IndexError):
+                                continue
 
             # 尝试模式匹配提取JSON
             json_patterns = [
@@ -482,10 +555,12 @@ class InterTool:
             for pattern in json_patterns:
                 matches = re.findall(pattern, response, re.DOTALL)
                 for match in matches:
-                    try:
-                        return json.loads(match)
-                    except json.JSONDecodeError:
-                        continue
+                    # 确保 match 是字符串
+                    if isinstance(match, str):
+                        try:
+                            return json.loads(match)
+                        except json.JSONDecodeError:
+                            continue
 
         except Exception as e:
             logger.warning(f"JSON提取失败: {str(e)}")
@@ -565,4 +640,143 @@ class InterTool:
         """
         if state.get("error_message"):
             return "error"
-        return "success"
+        return "success"
+
+    async def _complete_format(self,result, state, unit_index, total_units, completeness_config):
+        section_label = completeness_config.get('section_label')
+        chapter_code = completeness_config.get('chapter_code', '')
+        logger.info(f"section_label:  {section_label}")
+        # 格式化issues以获取问题数量
+        issues = self._format_review_results_to_issues(
+            state["callback_task_id"],
+            unit_index,
+            section_label,
+            chapter_code,
+            completeness_config,
+            result.basic_compliance,
+            result.technical_compliance
+        )
+
+        current = int(((unit_index + 1) / total_units) * 100)
+
+        # 立即发送单元审查详情(包含unit_review和processing_flag事件)
+        await self._send_unit_review_progress(state, unit_index, total_units, section_label, issues, current)
+
+        return issues
+
+    async def _send_unit_review_progress(self, state: AIReviewState, unit_index: int,
+                                            total_units: int, section_label: str,
+                                            issues: List[Dict], current: int) -> None:
+        """
+        发送单元审查详细信息 - 强制串行并统一进度值
+        """
+        try:
+            # 1. 计算问题数量
+            issues_count = 0
+            if isinstance(issues, list) and issues:
+                issues_count = sum(
+                    1 for issue in issues
+                    for issue_data in issue.values()
+                    for review_item in issue_data.get("review_lists", [])
+                    if review_item.get("exist_issue", False)
+                )
+
+            real_current = await self._send_unit_overall_progress(
+                state, unit_index, total_units, section_label, issues_count
+            )
+
+            final_current = real_current if real_current is not None else current
+
+            await asyncio.sleep(0.05)
+
+            # 3. 发送单元详情 (Unit Review)
+            if isinstance(issues, list) and issues and state["progress_manager"]:
+                stage_name = f"AI审查:{section_label}"
+
+                await state["progress_manager"].update_stage_progress(
+                    callback_task_id=state["callback_task_id"],
+                    stage_name=stage_name,
+                    current=final_current,  # 【关键】使用与 Flag 事件完全一致的进度值
+                    status="unit_review_update",
+                    message=f"发现{issues_count}个问题: {section_label}",
+                    issues=issues,
+                    user_id=state.get("user_id", ""),
+                    overall_task_status="processing",
+                    event_type="unit_review"
+                )
+
+                # 再次微小延迟,确保 Clear 不会吞掉 Review
+                await asyncio.sleep(0.02)
+
+                # 清空当前issues
+                await state["progress_manager"].update_stage_progress(
+                    callback_task_id=state["callback_task_id"],
+                    issues=['clear']
+                )
+
+        except Exception as e:
+            logger.error(f"发送单元审查详情失败: {str(e)}")
+
+    async def _send_unit_overall_progress(self, state: AIReviewState, unit_index: int,
+                                           total_units: int, section_label: str,
+                                           issues_count: int = None) -> Optional[int]:
+        """
+        发送单元完成进度更新 - 返回计算出的实时进度
+        Returns:
+            int: 基于 Redis 统计的实时进度百分比
+        """
+        current_percent = None
+        try:
+            task_id = state.get("callback_task_id", "")
+            redis_client = None
+            try:
+                from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
+                redis_client = await RedisConnectionFactory.get_connection()
+            except Exception as e:
+                logger.error(f"Redis连接失败: {str(e)}")
+
+            completed_count = 0
+
+            if redis_client and task_id:
+                completed_key = f"ai_review:overall_task_progress:{task_id}:completed"
+                # 原子操作
+                await redis_client.sadd(completed_key, str(unit_index))
+                await redis_client.expire(completed_key, 3600)
+                completed_count = await redis_client.scard(completed_key)
+
+                # 计算进度
+                current_percent = int((completed_count / total_units) * 100)
+            else:
+                # 降级方案
+                completed_count = unit_index + 1
+                current_percent = int((completed_count / total_units) * 100)
+
+            # 构建消息
+            if issues_count is not None and issues_count > 0:
+                message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
+            else:
+                message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}"
+
+            logger.info(f"进度更新: {current_percent}% - {message}")
+
+            if state["progress_manager"]:
+                await state["progress_manager"].update_stage_progress(
+                    callback_task_id=state["callback_task_id"],
+                    stage_name="AI审查",
+                    current=current_percent,
+                    status="processing",
+                    message=message,
+                    user_id=state.get("user_id", ""),
+                    overall_task_status="processing",
+                    event_type="processing_flag"
+                )
+
+            return current_percent
+
+        except Exception as e:
+            logger.warning(f"发送单元完成进度更新失败: {str(e)}")
+            # 发生异常时,尝试返回一个基于 index 的估算值
+            try:
+                return int(((unit_index + 1) / total_units) * 100)
+            except:
+                return 0

+ 14 - 12
core/construction_review/workflows/ai_review_workflow.py

@@ -60,15 +60,15 @@ from .types import AIReviewState
 
 
 
-@dataclass
-class ReviewResult:
-    """审查结果"""
-    unit_index: int
-    unit_content: Dict[str, Any]
-    basic_compliance: Dict[str, Any]
-    technical_compliance: Dict[str, Any]
-    rag_enhanced: Dict[str, Any]
-    overall_risk: str
+# @dataclass
+# class ReviewResult:
+#     """审查结果"""
+#     unit_index: int
+#     unit_content: Dict[str, Any]
+#     basic_compliance: Dict[str, Any]
+#     technical_compliance: Dict[str, Any]
+#     rag_enhanced: Dict[str, Any]
+#     overall_risk: str
 
 
 class AIReviewWorkflow:
@@ -533,8 +533,9 @@ class AIReviewWorkflow:
                 logger.info(f"开始执行大纲审查")
 
 
-                outline_review_result = await self.ai_review_engine.outline_check(state["callback_task_id"], state["structured_content"],
-                                                    state, state.get("stage_name", "大纲审查"))
+                # outline_review_result = await self.ai_review_engine.outline_check(state["callback_task_id"], state["structured_content"],
+                #                                     state, state.get("stage_name", "大纲审查"))
+                outline_review_result = {} 
                 check_completeness_result = await self.ai_review_engine.check_completeness(
                     trace_id_idx = state["callback_task_id"],
                     review_content = state["structured_content"]["chunks"],
@@ -542,6 +543,7 @@ class AIReviewWorkflow:
                     stage_name = state.get("stage_name", "完整性审查")
                 )
 
+
             # # 4. 执行编制依据审查
             # #await self.core_fun._send_start_review_progress(state, total_units,'prpe_basis')
             reference_check = "reference_check" in self.task_info.get_review_config_list()
@@ -705,7 +707,7 @@ class AIReviewWorkflow:
 
             # 保存review_results到本地JSON文件
             with open(file_path, 'w', encoding='utf-8') as f:
-                json.dump(review_results, f, ensure_ascii=False, indent=2, default=str)
+                json.dump(review_results, f, ensure_ascii=False, indent=2)
 
             logger.info(f"审查结果已保存到: {file_path}")
 

+ 178 - 43
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -42,6 +42,7 @@ from langchain_core.messages import AIMessage
 from foundation.observability.logger.loggering import server_logger as logger
 from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
 from core.base.task_models import TaskFileInfo
+# from core.construction_review.component.reviewers.base_reviewer import ReviewResult
 from ...component.reviewers.utils.inter_tool import InterTool
 from ..types import AIReviewState
 
@@ -50,8 +51,8 @@ REVIEW_TIMEOUT = 120  # 单个审查任务超时时间(秒)
 
 
 @dataclass
-class ReviewResult:
-    """审查结果"""
+class UnitReviewResult():
+    """Workflow层单元审查结果(包含 basic_compliance 和 technical_compliance)"""
     unit_index: int
     unit_content: Dict[str, Any]
     basic_compliance: Dict[str, Any]
@@ -141,7 +142,28 @@ class AIReviewCoreFun:
         combined_content = "\n\n".join(basis_contents)
         logger.info(f"编制依据内容拼接完成,总长度: {len(combined_content)} 字符")
 
-        # 3. 执行需要的审查方法
+        # 3. 提取编制依据条目(时效性和规范性审查都需要)
+        basis_items = None
+        needs_basis_extraction = any(
+            fn in ["timeliness_basis_reviewer", "reference_basis_reviewer"] 
+            for fn in func_names
+        )
+        
+        if needs_basis_extraction:
+            try:
+                from core.construction_review.component.reviewers.utils.directory_extraction import (
+                    extract_basis_with_langchain_qwen,
+                )
+                basis_items = await extract_basis_with_langchain_qwen(
+                    progress_manager=state.get("progress_manager"),
+                    callback_task_id=state.get("callback_task_id"),
+                    text=combined_content,
+                )
+                logger.info(f"编制依据AI提取完成,条数: {len(getattr(basis_items, 'items', []))}")
+            except Exception as e:
+                logger.error(f"编制依据AI提取失败: {e}", exc_info=True)
+
+        # 4. 执行需要的审查方法
         chapter_issues = []
 
         for func_name in func_names:
@@ -151,6 +173,7 @@ class AIReviewCoreFun:
                 try:
                     review_data = {
                         "content": combined_content,
+                        "basis_items": basis_items,
                         "max_concurrent": self.max_concurrent
                     }
                     result = await self.ai_review_engine.reference_basis_reviewer(
@@ -173,6 +196,7 @@ class AIReviewCoreFun:
                 try:
                     review_data = {
                         "content": combined_content,
+                        "basis_items": basis_items,
                         "max_concurrent": self.max_concurrent
                     }
                     result = await self.ai_review_engine.timeliness_basis_reviewer(
@@ -253,8 +277,8 @@ class AIReviewCoreFun:
             chunk_results = await self._execute_chunk_methods(
                 chapter_code, chunk, chunk_index, func_names, state
             )
-
-            # 格式化当前块的结果为issues(使用 check_item 模式)
+            logger.info(f"🔍 chunk_results: basic_compliance={chunk_results.get('basic_compliance', {})}, technical_compliance={chunk_results.get('technical_compliance', {})}")
+            # 格式化当前块的结果为issues
             chunk_page = chunk.get('page', '')
             review_location_label = f"第{chunk_page}页:{chunk_label}"
             issues = self.inter_tool._format_review_results_to_issues(
@@ -263,14 +287,11 @@ class AIReviewCoreFun:
                 review_location_label=review_location_label,
                 chapter_code=chapter_code,
                 unit_content=chunk,
-                basic_result={},  # check_item 模式下不需要
-                technical_result={},  # check_item 模式下不需要
-                merged_results=chunk_results  # 直接使用合并结果
+                basic_result=chunk_results.get('basic_compliance', {}),
+                technical_result=chunk_results.get('technical_compliance', {}),
+                merged_results=None  # 不使用 merged_results
             )
 
-            # 🔍 调试日志:检查issues是否有数据
-            logger.info(f"🔍 块{chunk_index}审查结果: chunk_results键={list(chunk_results.keys()) if chunk_results else 'None'}, issues数量={len(issues) if issues else 0}")
-
             # 推送当前块的进度
             current = int(((chunk_index + 1) / total_chunks) * 100)
             await self._send_unit_review_progress(
@@ -292,7 +313,7 @@ class AIReviewCoreFun:
         从审查结果中提取issues列表
 
         Args:
-            result: 审查结果(可能是ReviewResult、dict、entity_based结果等)
+            result: 审查结果(可能是 base_reviewer.ReviewResult、dict、entity_based结果等)
 
         Returns:
             List[Dict]: issues列表
@@ -302,7 +323,7 @@ class AIReviewCoreFun:
         if result is None:
             return issues
 
-        # ReviewResult对象
+        # base_reviewer.ReviewResult 对象(Component层)
         if hasattr(result, 'details'):
             issues.append(result.details)
 
@@ -392,14 +413,14 @@ class AIReviewCoreFun:
             state: AI审查状态
 
         Returns:
-            Dict[str, Any]: 该块所有方法的执行结果
+            Dict[str, Any]: 合并后的 basic_compliance 和 technical_compliance 结果
         """
-        results = {}
         semaphore = asyncio.Semaphore(5)  # 单个块内限制并发数为5
         rag_enhanced_content = None  # 初始化变量,避免作用域错误
         if 'check_parameter_compliance' in func_names or 'check_non_parameter_compliance' in func_names:
                 logger.debug("开始执行RAG检索增强")
                 rag_enhanced_content = self.ai_review_engine.rag_enhanced_check(chunk.get('content', ''))
+
         async def execute_with_semaphore(func_name):
             async with semaphore:
                 try:
@@ -408,7 +429,14 @@ class AIReviewCoreFun:
                     return func_name, result
                 except Exception as e:
                     logger.error(f"审查任务执行失败 [{chapter_code}.chunk{chunk_index}.{func_name}]: {str(e)}")
-                    return func_name, {"error": str(e)}
+                    return func_name, UnitReviewResult(
+                        unit_index=chunk_index,
+                        unit_content=chunk,
+                        basic_compliance={func_name: {"error": str(e)}},
+                        technical_compliance={},
+                        rag_enhanced={},
+                        overall_risk="error"
+                    )
 
         # 创建并发任务
         async_tasks = [execute_with_semaphore(func_name) for func_name in func_names]
@@ -416,19 +444,33 @@ class AIReviewCoreFun:
         # 等待当前块所有方法完成
         completed_results = await asyncio.gather(*async_tasks, return_exceptions=True)
 
-        # 整理结果
+        # 合并所有 UnitReviewResult 对象的 basic_compliance 和 technical_compliance
+        merged_basic = {}
+        merged_technical = {}
+        merged_rag = {}
+
         for result in completed_results:
             if isinstance(result, Exception):
                 logger.error(f"任务异常: {str(result)}")
                 continue
 
             if result and len(result) == 2:
-                func_name, task_result = result
-                results[func_name] = task_result
+                func_name, review_result = result
+                if isinstance(review_result, UnitReviewResult):
+                    # 合并 basic_compliance
+                    merged_basic.update(review_result.basic_compliance)
+                    # 合并 technical_compliance
+                    merged_technical.update(review_result.technical_compliance)
+                    # 合并 rag_enhanced
+                    merged_rag.update(review_result.rag_enhanced)
 
-        return results
+        return {
+            'basic_compliance': merged_basic,
+            'technical_compliance': merged_technical,
+            'rag_enhanced': merged_rag
+        }
 
-    async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None) -> Any:
+    async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None) -> UnitReviewResult:
         """
         执行单个块的审查任务
 
@@ -440,12 +482,20 @@ class AIReviewCoreFun:
             state: AI审查状态
 
         Returns:
-            Any: 审查结果
+            UnitReviewResult: 单个审查方法的UnitReviewResult对象,包含 basic_compliance 或 technical_compliance
         """
         # 从ai_review_engine获取对应的方法
         if not hasattr(self.ai_review_engine, func_name):
             logger.warning(f"AIReviewEngine中未找到方法: {func_name}")
-            return await self._dummy_review_task(chapter_code, func_name)
+            # 返回错误结果的 UnitReviewResult
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: {"error": f"未找到方法: {func_name}"}},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk="error"
+            )
 
         method = getattr(self.ai_review_engine, func_name)
 
@@ -460,44 +510,128 @@ class AIReviewCoreFun:
 
         # 根据func_name构建对应的参数并调用
         if func_name == "sensitive_word_check":
-            result = await method(trace_id, review_content, state, stage_name)
+            raw_result = await method(trace_id, review_content, state, stage_name)
+            # 基础审查方法,放入 basic_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: raw_result},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result)
+            )
 
         elif func_name == "check_semantic_logic":
-            result = await method(trace_id, review_content, state, stage_name)
+            raw_result = await method(trace_id, review_content, state, stage_name)
+            # 基础审查方法,放入 basic_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: raw_result},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result)
+            )
 
         elif func_name == "check_sensitive":
-            result = await method(trace_id, review_content, state, stage_name)
+            raw_result = await method(trace_id, review_content, state, stage_name)
+            # 基础审查方法,放入 basic_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: raw_result},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result)
+            )
 
         elif func_name == "check_completeness":
-            result = await method(trace_id, chunk, state, stage_name)
+            raw_result = await method(trace_id, chunk, state, stage_name)
+            # 基础审查方法,放入 basic_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: raw_result},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result)
+            )
 
         elif func_name == "check_non_parameter_compliance":
             # 技术审查方法需要从 RAG 检索结果中获取 references
-            result = await self._execute_technical_review(
+            raw_result = await self._execute_technical_review(
                 method, trace_id, review_content, chunk, state, stage_name, rag_enhanced_content, func_name
             )
+            # 技术审查方法,放入 technical_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={},
+                technical_compliance={func_name: raw_result},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result)
+            )
 
         elif func_name == "check_parameter_compliance":
             # 技术审查方法需要从 RAG 检索结果中获取 references
-            result = await self._execute_technical_review(
+            raw_result = await self._execute_technical_review(
                 method, trace_id, review_content, chunk, state, stage_name, rag_enhanced_content, func_name
             )
+            # 技术审查方法,放入 technical_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={},
+                technical_compliance={func_name: raw_result},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result)
+            )
 
         # ⚠️ 以下三个特殊方法不在块级别处理,由主流程统一管理
         elif func_name in ["outline_check", "timeliness_basis_reviewer", "reference_basis_reviewer"]:
             logger.warning(f"方法 {func_name} 不应在块级别调用,已在主流程中处理")
-            return None
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk="low"
+            )
 
         else:
             logger.warning(f"未知的审查方法: {func_name},使用默认调用方式")
-            return {
-            "current_stage": "ai_review_check_item",
-            "error_message": f"未知的审查方法: {func_name},使用默认调用方式",
-            "status": "failed",
-            "messages": [AIMessage(content=f"未知的审查方法: {func_name},使用默认调用方式: {state['callback_task_id']}")]
-            }
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: {"error": f"未知的审查方法: {func_name}"}},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk="error"
+            )
 
-        return result
+    def _calculate_single_result_risk(self, raw_result: Any) -> str:
+        """计算单个审查结果的风险等级"""
+        if raw_result is None or isinstance(raw_result, Exception):
+            return "error"
+
+        if isinstance(raw_result, dict):
+            if "error" in raw_result:
+                return "error"
+            # 检查是否有高风险问题
+            if "details" in raw_result and "response" in raw_result["details"]:
+                response = raw_result["details"]["response"]
+                if isinstance(response, str) and ("高风险" in response or "严重" in response):
+                    return "high"
+        elif hasattr(raw_result, 'success'):
+            if not raw_result.success:
+                return "error"
+            if hasattr(raw_result, 'details') and "response" in raw_result.details:
+                response = raw_result.details["response"]
+                if isinstance(response, str) and ("高风险" in response or "严重" in response):
+                    return "high"
+
+        return "low"
 
     async def _execute_technical_review(
         self, method, trace_id: str, review_content: str, chunk: Dict[str, Any],
@@ -801,7 +935,7 @@ class AIReviewCoreFun:
             return chunks
 
     async def _review_single_unit(self, unit_content: Dict[str, Any], unit_index: int,
-                                  total_units: int, state: AIReviewState) -> ReviewResult:
+                                  total_units: int, state: AIReviewState) -> UnitReviewResult:
         """
         审查单个单元的核心业务逻辑
 
@@ -812,7 +946,7 @@ class AIReviewCoreFun:
             state: AI审查状态
 
         Returns:
-            ReviewResult: 单元审查结果
+            UnitReviewResult: 单元审查结果
         """
         try:
 
@@ -859,7 +993,8 @@ class AIReviewCoreFun:
 
             # 处理异常结果(gather 已经将异常作为结果返回)
             basic_result = review_results[0] if not isinstance(review_results[0], Exception) else {"error": str(review_results[0])}
-            technical_result = review_results[1] if not isinstance(review_results[1], Exception) else {"error": str(review_results[1])}
+            # technical_result 可能为 None(未提取到实体),需要处理为空字典
+            technical_result = review_results[1] if not isinstance(review_results[1], Exception) and review_results[1] is not None else {"error": str(review_results[1]) if isinstance(review_results[1], Exception) else "未提取到实体,跳过技术审查"}
 
             # RAG检查已注释,提供空结果
             rag_result = {"error": "RAG check disabled"}
@@ -869,7 +1004,7 @@ class AIReviewCoreFun:
             overall_risk = inter_tool._calculate_overall_risk(basic_result, technical_result, rag_result)
 
             
-            return ReviewResult(
+            return UnitReviewResult(
                 unit_index=unit_index,
                 unit_content=unit_content,
                 basic_compliance=basic_result,
@@ -880,7 +1015,7 @@ class AIReviewCoreFun:
 
         except asyncio.TimeoutError:
             logger.error(f"审查单元 {unit_index} 超时")
-            return ReviewResult(
+            return UnitReviewResult(
                 unit_index=unit_index,
                 unit_content=unit_content,
                 basic_compliance={"error": f"审查超时({REVIEW_TIMEOUT}秒)"},
@@ -890,7 +1025,7 @@ class AIReviewCoreFun:
             )
         except Exception as e:
             logger.error(f"审查单元 {unit_index} 失败: {str(e)}")
-            return ReviewResult(
+            return UnitReviewResult(
                 unit_index=unit_index,
                 unit_content=unit_content,
                 basic_compliance={"error": str(e)},