Ver Fonte

dev:添加了新架构的条纹要点审查模块;

ChenJiSheng há 3 meses atrás
pai
commit
c43f50086a
24 ficheiros alterados com 896 adições e 520 exclusões
  1. 148 90
      core/construction_review/component/ai_review_engine.py
  2. 0 103
      core/construction_review/component/check_completeness/components/result_processor.py
  3. 0 202
      core/construction_review/component/check_completeness/components/result_saver.py
  4. 0 49
      core/construction_review/component/check_completeness/config/Construction_Plan_Content_Specification.csv
  5. 0 39
      core/construction_review/component/check_completeness/config/prompt.yaml
  6. 1 0
      core/construction_review/component/reviewers/check_completeness/README.md
  7. 1 0
      core/construction_review/component/reviewers/check_completeness/components/__init__.py
  8. 3 1
      core/construction_review/component/reviewers/check_completeness/components/data_loader.py
  9. 0 0
      core/construction_review/component/reviewers/check_completeness/components/llm_client.py
  10. 9 3
      core/construction_review/component/reviewers/check_completeness/components/prompt_builder.py
  11. 247 0
      core/construction_review/component/reviewers/check_completeness/components/result_analyzer.py
  12. 140 0
      core/construction_review/component/reviewers/check_completeness/components/result_processor.py
  13. 218 0
      core/construction_review/component/reviewers/check_completeness/components/result_saver.py
  14. 0 0
      core/construction_review/component/reviewers/check_completeness/components/review_pipeline.py
  15. BIN
      core/construction_review/component/reviewers/check_completeness/config/Construction_Plan_Content_Specification.csv
  16. 5 5
      core/construction_review/component/reviewers/check_completeness/config/llm_api.yaml
  17. 42 0
      core/construction_review/component/reviewers/check_completeness/config/prompt.yaml
  18. 37 4
      core/construction_review/component/reviewers/check_completeness/interfaces.py
  19. 34 22
      core/construction_review/component/reviewers/check_completeness/main.py
  20. 1 0
      core/construction_review/component/reviewers/check_completeness/requirements.txt
  21. 1 0
      core/construction_review/component/reviewers/check_completeness/utils/__init__.py
  22. 1 1
      core/construction_review/component/reviewers/check_completeness/utils/file_utils.py
  23. 1 0
      core/construction_review/component/reviewers/check_completeness/utils/yaml_utils.py
  24. 7 1
      core/construction_review/workflows/ai_review_workflow.py

+ 148 - 90
core/construction_review/component/ai_review_engine.py

@@ -76,10 +76,15 @@ import json
 
 
 
-from .check_completeness.components.data_loader import CSVDataLoader
-from .check_completeness.components.prompt_builder import PromptBuilder
-from .check_completeness.components.llm_client import LLMClient
-from .check_completeness.components.result_processor import ResultProcessor
+from .reviewers.check_completeness.components.data_loader import CSVDataLoader
+from .reviewers.check_completeness.components.prompt_builder import PromptBuilder
+from .reviewers.check_completeness.components.llm_client import LLMClient
+from .reviewers.check_completeness.components.result_processor import ResultProcessor
+from .reviewers.check_completeness.components.review_pipeline import ReviewPipeline
+from .reviewers.check_completeness.components.result_saver import ResultSaver
+from .reviewers.check_completeness.components.result_analyzer import ResultAnalyzer
+from .reviewers.check_completeness.utils.file_utils import write_json
+from core.construction_review.component.reviewers.base_reviewer import ReviewResult
 
 @dataclass
 class ReviewResult:
@@ -727,7 +732,7 @@ class AIReviewEngine(BaseReviewer):
         # return await self.review("semantic_logic_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
         #                        None, review_location_label, state, stage_name)
 
-    async def check_completeness(self, trace_id_idx: str, review_content: Dict[str, Any], review_references: str,
+    async def check_completeness(self, trace_id_idx: str, review_content: List[Dict[str, Any]], review_references: str,
                                review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
         """
         完整性检查
@@ -744,22 +749,27 @@ class AIReviewEngine(BaseReviewer):
             Dict[str, Any]: 完整性检查结果
         """
 
-        
+        # with open(r'temp\structured_content.json', 'w', encoding='utf-8') as f:
+        #     json.dump(review_content, f, ensure_ascii=False, indent=4)
         name = "completeness_check"
         start_time = time.time()
         
         try:
             # 验证review_content格式
-            if not isinstance(review_content, dict):
+            if not isinstance(review_content, list):
                 raise ValueError(f"review_content必须是字典类型,当前类型: {type(review_content)}")
             
-            # 获取文档块信息
-            doc = review_content
-            chunk_id = doc.get('chunk_id', 'unknown')
-            chapter_classification = doc.get('chapter_classification', '')
-            content = doc.get('content', '')
+            # # 获取文档块信息
+            # doc = review_content
+            # chunk_id = doc.get('chunk_id', 'unknown')
+            # chapter_classification = doc.get('chapter_classification', '')
+            # content = doc.get('content', '')
+            doc = 'doc'
+            chunk_id = 'chunk_id'
+            chapter_classification = 'chunk_id'
+            content = 'chunk_id'
             
-            logger.info(f"开始执行 {name} 审查,trace_id: {trace_id_idx}, chunk_id: {chunk_id}, chapter_classification: {chapter_classification}")
+            logger.debug(f"开始执行 {name} 审查,trace_id: {trace_id_idx}, chunk_id: {chunk_id}, chapter_classification: {chapter_classification}")
             
             # 检查必要字段
             if not chapter_classification:
@@ -768,102 +778,150 @@ class AIReviewEngine(BaseReviewer):
             if not content:
                 raise ValueError(f"文档块 {chunk_id} 缺少content字段")
 
-            # 导入check_completeness组件
-            check_completeness_dir = Path(__file__).parent / "check_completeness"
-            if str(check_completeness_dir) not in sys.path:
-                sys.path.insert(0, str(check_completeness_dir))
                 
-            # 初始化组件路径
-            base_dir = check_completeness_dir
+            # 配置文件路径
+            # base_dir = Path(__file__).parent
+            base_dir = Path(r'core\construction_review\component\reviewers\check_completeness')
             csv_path = base_dir / 'config' / 'Construction_Plan_Content_Specification.csv'
+            json_path = base_dir / 'data' / '文档切分预处理结果.json'
             prompt_config_path = base_dir / 'config' / 'prompt.yaml'
             api_config_path = base_dir / 'config' / 'llm_api.yaml'
             
-            # 加载规范文件
+            logger.debug("=" * 60)
+            logger.debug("文件要点审查模块")
+            logger.debug("=" * 60)
+            
+            # 1. 加载数据
+            logger.debug("\n[1/5] 加载规范文件...")
             data_loader = CSVDataLoader()
             specification = data_loader.load_specification(str(csv_path))
+            logger.debug(f"  加载完成,共 {len(specification)} 个标签类别")
             
-            # 获取对应的规范要求
-            requirements = specification.get(chapter_classification, [])
-            if not requirements:
-                raise ValueError(f"未找到标签 {chapter_classification} 对应的规范要求")
-            
-            logger.info(f"找到 {len(requirements)} 个规范要求项")
+            logger.debug("\n[2/5] 加载文档数据...")
+            documents =  review_content
+            logger.debug(f"  加载完成,共 {len(documents)} 个文档块")
             
-            # 初始化组件
+            # 2. 初始化组件
+            logger.debug("\n[3/5] 初始化组件...")
             prompt_builder = PromptBuilder(str(prompt_config_path))
             llm_client = LLMClient(str(api_config_path))
             result_processor = ResultProcessor()
             
-            # 构建提示词
-            prompt = prompt_builder.build_prompt(content, requirements)
+            # 获取并发数配置
+            api_config = llm_client.config
+            concurrent_workers = api_config.get('keywords', {}).get('concurrent_workers', 20)
             
-            # 调用LLM
-            logger.info(f"调用LLM进行审查,使用模型: {llm_client.model_type}")
-            llm_response = await llm_client.call_llm(prompt)
+            review_pipeline = ReviewPipeline(
+                prompt_builder=prompt_builder,
+                llm_client=llm_client,
+                result_processor=result_processor,
+                max_concurrent=concurrent_workers
+            )
+            logger.debug("  组件初始化完成")
             
-            # 处理结果
-            review_result = result_processor.parse_result(llm_response, requirements)
+            start_time = time.time()
+            # 3. 执行审查
+            logger.debug("\n[4/5] 开始执行审查...")
+            logger.debug(f"  使用模型: {llm_client.model_type}")
+            logger.debug(f"  最大并发数: {concurrent_workers}")
             
-            # 构建details字段,包含审查结果
-            details = {
-                'chunk_id': chunk_id,
-                'name': 'completeness_check',
-                'chapter_classification': chapter_classification,
-                'section_label': doc.get('section_label', ''),
-                'requirements_count': len(requirements),
-                'checked_items': len(review_result),
-                'response': review_result[0] if review_result else {},
-            }
+            review_results = await review_pipeline.review(documents, specification)
             
-            execution_time = time.time() - start_time
+            # 统计结果
+            success_count = sum(1 for r in review_results if isinstance(r.get('review_result', {}), dict) and 'error' not in r.get('review_result', {}))
+            error_count = len(review_results) - success_count
+            logger.debug(f"\n  审查完成: 成功 {success_count} 个, 失败 {error_count} 个")
+
+
+            # 6. 使用结果解析处理组件,生成规范覆盖汇总表
+            logger.debug("\n[5/5] 生成规范要点覆盖汇总表...")
+            analyzer = ResultAnalyzer(str(csv_path))
+            processed_results = analyzer.process_results(review_results)
+            spec_summary_csv_path = base_dir / 'output' / 'spec_review_summary.csv'
+            summary_rows = analyzer.build_spec_summary(processed_results)
+            logger.debug(f"  规范覆盖汇总结果已保存至: {spec_summary_csv_path}")
+
+            # 生成缺失要点 JSON 列表,便于前端消费
+            missing_issue_json_path = Path(r'temp\document_temp') / 'spec_review_missing_issues.json'
+            missing_issue_list = analyzer.build_missing_issue_list(summary_rows)
+            write_json(missing_issue_list, str(missing_issue_json_path))
+            logger.debug(f"  缺失要点 JSON 已保存至: {missing_issue_json_path}")
+            cost_time = time.time() - start_time
 
-            # 创建ReviewResult对象
-            from core.construction_review.component.reviewers.base_reviewer import ReviewResult
-            result = ReviewResult(
-                success=True,
-                details=details,
-                error_message=None,
-                execution_time=execution_time
-            )
-            with open('temp/completeness_check_result_1.json','w',encoding='utf-8') as f:
-                json.dump({"details":result.details,"success":result.success,"error_message":result.error_message,"execution_time":result.execution_time},f,ensure_ascii=False,indent=4)
-            # 将审查结果转换为字典格式,添加到issues中
-            review_result_data = {
-                'name': name,
-                'success': result.success,
-                'details': result.details,
-                'error_message': result.error_message,
-                'execution_time': result.execution_time,
-                'timestamp': time.time()
-            }
-            
-            # 推送审查完成信息
-            state_dict = None
-            if state:
-                if isinstance(state, dict):
-                    state_dict = state
-                elif isinstance(state, str):
-                    try:
-                        state_dict = json.loads(state)
-                    except (json.JSONDecodeError, AttributeError):
-                        pass
-            
-            if state_dict and state_dict.get("progress_manager"):
-                asyncio.create_task(
-                    state_dict["progress_manager"].update_stage_progress(
-                        callback_task_id=state_dict.get("callback_task_id"),
-                        stage_name=stage_name,
-                        current=None,
-                        status="processing",
-                        message=f"{name} 要点审查完成 (chunk_id: {chunk_id}), 耗时: {result.execution_time:.2f}s",
-                        issues=[review_result_data],
-                        event_type="processing"
-                    )
+            # 构建details字段,包含审查结果
+            # details = {
+            #     'chunk_id': chunk_id,
+            #     'name': 'completeness_check',
+            #     'chapter_classification': chapter_classification,
+            #     'section_label': doc.get('section_label', ''),
+            #     'requirements_count': len(requirements),
+            #     'checked_items': len(review_result),
+            #     'response': review_result[0] if review_result else {},
+            # }
+            result_list = []
+            for index, missing_issue in enumerate(missing_issue_list):
+                details = {
+                    'chunk_id': f'chunk_id_{index}',
+                    'name': 'completeness_check',
+                    'chapter_classification': 'chapter_classification',
+                    'section_label': 'section_label',
+                    'requirements_count': 'requirements_count',
+                    'checked_items': len(missing_issue),
+                    'response': missing_issue if missing_issue else {},
+                }
+
+                # 创建ReviewResult对象
+                from core.construction_review.component.reviewers.base_reviewer import ReviewResult
+                result = ReviewResult(
+                    success=True,
+                    details=details,
+                    error_message=None,
+                    execution_time=cost_time
                 )
-            logger.info(f"{name} 审查完成 (chunk_id: {chunk_id}), 耗时: {result.execution_time:.2f}s")
+                
+                result_dict = {"details":result.details,"success":result.success,"error_message":result.error_message,"execution_time":result.execution_time}
+                result_list.append(result_dict)
+                with open('temp/completeness_check_result_1.json','w',encoding='utf-8') as f:
+                    json.dump(result_dict, f, ensure_ascii=False,indent=4)
+                
+                # 将审查结果转换为字典格式,添加到issues中
+                review_result_data = {
+                    'name': name,
+                    'success': result.success,
+                    'details': result.details,
+                    'error_message': result.error_message,
+                    'execution_time': result.execution_time,
+                    'timestamp': time.time()
+                }
+                
+                # 推送审查完成信息
+                state_dict = None
+                if state:
+                    if isinstance(state, dict):
+                        state_dict = state
+                    elif isinstance(state, str):
+                        try:
+                            state_dict = json.loads(state)
+                        except (json.JSONDecodeError, AttributeError):
+                            pass
+                
+                if state_dict and state_dict.get("progress_manager"):
+                    asyncio.create_task(
+                        state_dict["progress_manager"].update_stage_progress(
+                            callback_task_id=state_dict.get("callback_task_id"),
+                            stage_name=stage_name,
+                            current=None,
+                            status="processing",
+                            message=f"{name} 要点审查完成 (chunk_id: {chunk_id}_{index}), 耗时: {result.execution_time:.2f}s",
+                            issues=[review_result_data],
+                            event_type="processing"
+                        )
+                    )
+                logger.debug(f"{name} 审查完成 (chunk_id: {chunk_id}_{index}), 耗时: {result.execution_time:.2f}s")
 
-            return result
+            return {
+                        'completeness_review_result': result_list
+                    }
 
         except Exception as e:
             execution_time = time.time() - start_time
@@ -874,7 +932,7 @@ class AIReviewEngine(BaseReviewer):
             return ReviewResult(
                 success=False,
                 details={
-                    'chunk_id': review_content.get('chunk_id', 'unknown') if isinstance(review_content, dict) else 'unknown',
+                    'chunk_id': review_content[0].get('chunk_id', 'unknown') if isinstance(review_content[0], dict) else 'unknown',
                     'error': str(e)
                 },
                 error_message=error_msg,

+ 0 - 103
core/construction_review/component/check_completeness/components/result_processor.py

@@ -1,103 +0,0 @@
-"""
-结果处理组件实现
-"""
-import json
-import re
-from typing import Dict, List, Any
-import sys
-from pathlib import Path
-
-# 添加项目根目录到路径,支持相对导入
-_root = Path(__file__).parent.parent
-if str(_root) not in sys.path:
-    sys.path.insert(0, str(_root))
-
-from interfaces import IResultProcessor
-
-
-class ResultProcessor(IResultProcessor):
-    """结果处理器"""
-    
-    def parse_result(self, llm_response: str, requirements: List[Dict[str, str]]) -> List[Dict[str, Any]]:
-        """
-        解析LLM返回结果
-        
-        Args:
-            llm_response: LLM返回的文本
-            requirements: 审查要求列表
-            
-        Returns:
-            问题列表,每个问题包含:issue_point, location, suggestion, reason, risk_level
-            如果没有问题,返回空列表
-        """
-        # 处理“无明显问题”这种纯文本输出
-        text = (llm_response or "").strip()
-        if text in ("无明显问题。", "无明显问题", ""):
-            return []
-
-        # 提取JSON部分
-        json_str = self._extract_json(text)
-        if not json_str:
-            # 如果无法提取JSON,视为无结构化问题,返回空列表
-            return []
-
-        try:
-            result = json.loads(json_str)
-        except json.JSONDecodeError:
-            # JSON解析失败,返回空列表
-            return []
-
-        issues: List[Dict[str, Any]] = []
-
-        # 如果结果是单个对象,转换为列表
-        if isinstance(result, dict):
-            result = [result]
-
-        if isinstance(result, list):
-            for item in result:
-                if not isinstance(item, dict):
-                    continue
-                # 规范化字段
-                issue = {
-                    "issue_point": item.get("issue_point", ""),
-                    "location": item.get("location", ""),
-                    "suggestion": item.get("suggestion", ""),
-                    "reason": item.get("reason", ""),
-                    "risk_level": item.get("risk_level", ""),
-                }
-                # 至少要有问题描述才认为是有效问题
-                if issue["issue_point"]:
-                    issues.append(issue)
-
-        return issues
-    
-    def _extract_json(self, text: str) -> str:
-        """
-        从文本中提取JSON字符串
-        
-        Args:
-            text: 原始文本
-            
-        Returns:
-            JSON字符串
-        """
-        # 尝试直接解析
-        text = text.strip()
-        
-        # 查找JSON对象
-        # 匹配 { ... } 格式
-        json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
-        matches = re.findall(json_pattern, text, re.DOTALL)
-        
-        if matches:
-            # 返回最长的匹配(通常是完整的JSON)
-            return max(matches, key=len)
-        
-        # 如果没找到,尝试查找代码块中的JSON
-        code_block_pattern = r'```(?:json)?\s*(\{.*?\})\s*```'
-        code_matches = re.findall(code_block_pattern, text, re.DOTALL)
-        if code_matches:
-            return max(code_matches, key=len)
-        
-        return text
-

+ 0 - 202
core/construction_review/component/check_completeness/components/result_saver.py

@@ -1,202 +0,0 @@
-"""
-结果保存组件
-"""
-from typing import Dict, List, Any
-import sys
-from pathlib import Path
-from collections import defaultdict
-
-# 添加项目根目录到路径,支持相对导入
-_root = Path(__file__).parent.parent
-if str(_root) not in sys.path:
-    sys.path.insert(0, str(_root))
-
-from utils.file_utils import write_csv, write_txt
-
-
-class ResultSaver:
-    """结果保存器"""
-    
-    @staticmethod
-    def save_to_csv(results: List[Dict[str, Any]], 
-                   specification: Dict[str, List[Dict[str, str]]],
-                   output_path: str) -> None:
-        """
-        保存审查结果到CSV文件
-        格式:chunk_id | chapter_classification | section_label | page | issue_point | location | suggestion | reason | risk_level
-        
-        Args:
-            results: 审查结果列表
-            specification: 规范字典,用于确定列的顺序
-            output_path: 输出文件路径
-        """
-        csv_rows = []
-        
-        for result in results:
-            chunk_id = result.get('chunk_id', '')
-            chapter_classification = result.get('chapter_classification', '')
-            section_label = result.get('section_label', '')
-            page = result.get('page', '')
-            review_result = result.get('review_result', [])
-            
-            # 如果审查失败,记录错误信息
-            if isinstance(review_result, dict) and 'error' in review_result:
-                row = {
-                    'chunk_id': chunk_id,
-                    'chapter_classification': chapter_classification,
-                    'section_label': section_label,
-                    'page': page,
-                    'issue_point': f"错误: {review_result['error']}",
-                    'location': '',
-                    'suggestion': '',
-                    'reason': '',
-                    'risk_level': ''
-                }
-                csv_rows.append(row)
-                continue
-            
-            # 如果没有问题(空列表),记录一条“无问题”记录
-            if isinstance(review_result, list) and len(review_result) == 0:
-                row = {
-                    'chunk_id': chunk_id,
-                    'chapter_classification': chapter_classification,
-                    'section_label': section_label,
-                    'page': page,
-                    'issue_point': '无明显问题',
-                    'location': '',
-                    'suggestion': '',
-                    'reason': '',
-                    'risk_level': ''
-                }
-                csv_rows.append(row)
-                continue
-
-            # 如果有问题列表,为每个问题创建一行
-            if isinstance(review_result, list):
-                for issue in review_result:
-                    row = {
-                        'chunk_id': chunk_id,
-                        'chapter_classification': chapter_classification,
-                        'section_label': section_label,
-                        'page': page,
-                        'issue_point': issue.get('issue_point', ''),
-                        'location': issue.get('location', ''),
-                        'suggestion': issue.get('suggestion', ''),
-                        'reason': issue.get('reason', ''),
-                        'risk_level': issue.get('risk_level', '')
-                    }
-                    csv_rows.append(row)
-        
-        # 写入CSV文件(使用逗号分隔符)
-        write_csv(csv_rows, output_path, delimiter=',')
-    
-    @staticmethod
-    def save_statistics(results: List[Dict[str, Any]], 
-                       specification: Dict[str, List[Dict[str, str]]],
-                       output_path: str) -> None:
-        """
-        保存统计结果到TXT文件
-        
-        Args:
-            results: 审查结果列表
-            specification: 规范字典
-            output_path: 输出文件路径
-        """
-        # 统计信息
-        total_chunks = len(results)
-        success_count = 0  # 无问题
-        error_count = 0    # 解析或流程错误
-        issue_count = 0    # 总问题数
-        
-        # 按分类统计
-        classification_stats = defaultdict(lambda: {
-            'total': 0,
-            'no_issues': 0,
-            'has_issues': 0,
-            'errors': 0,
-            'issue_count': 0
-        })
-        
-        # 按风险等级统计
-        risk_level_stats = defaultdict(int)
-        
-        for result in results:
-            chapter_classification = result.get('chapter_classification', '')
-            review_result = result.get('review_result', [])
-            
-            # 错误记录
-            if isinstance(review_result, dict) and 'error' in review_result:
-                error_count += 1
-                classification_stats[chapter_classification]['errors'] += 1
-                classification_stats[chapter_classification]['total'] += 1
-                continue
-            
-            # 无问题
-            if isinstance(review_result, list) and len(review_result) == 0:
-                success_count += 1
-                classification_stats[chapter_classification]['no_issues'] += 1
-                classification_stats[chapter_classification]['total'] += 1
-                continue
-
-            # 有问题
-            if isinstance(review_result, list):
-                issue_count += len(review_result)
-                classification_stats[chapter_classification]['has_issues'] += 1
-                classification_stats[chapter_classification]['issue_count'] += len(review_result)
-                classification_stats[chapter_classification]['total'] += 1
-
-                for issue in review_result:
-                    risk_level = issue.get('risk_level', '')
-                    if risk_level:
-                        risk_level_stats[risk_level] += 1
-        
-        # 生成统计文本
-        lines = []
-        lines.append("=" * 80)
-        lines.append("文件要点审查统计报告")
-        lines.append("=" * 80)
-        lines.append("")
-        
-        # 总体统计
-        lines.append("【总体统计】")
-        lines.append(f"  总文档块数: {total_chunks}")
-        if total_chunks > 0:
-            lines.append(f"  无问题文档数: {success_count} ({success_count/total_chunks*100:.1f}%)")
-            lines.append(f"  存在问题文档数: {total_chunks - success_count - error_count}")
-            lines.append(f"  解析/流程错误数: {error_count}")
-        else:
-            lines.append("  无问题文档数: 0")
-            lines.append("  存在问题文档数: 0")
-            lines.append("  解析/流程错误数: 0")
-        lines.append(f"  总问题数: {issue_count}")
-        lines.append("")
-        
-        # 按分类统计
-        lines.append("【按分类统计】")
-        for classification, stats in sorted(classification_stats.items()):
-            total = stats['total']
-            no_issues = stats['no_issues']
-            has_issues = stats['has_issues']
-            errors = stats['errors']
-            lines.append(f"  {classification}:")
-            lines.append(f"    总数: {total}")
-            lines.append(f"    无问题: {no_issues}")
-            lines.append(f"    存在问题: {has_issues}")
-            lines.append(f"    错误: {errors}")
-            lines.append(f"    问题总数: {stats['issue_count']}")
-            lines.append("")
-        
-        # 按风险等级统计
-        lines.append("【按风险等级统计】")
-        for level, count in risk_level_stats.items():
-            lines.append(f"  {level}: {count}")
-        
-        lines.append("")
-        lines.append("=" * 80)
-        lines.append("统计完成")
-        lines.append("=" * 80)
-        
-        # 写入文件
-        content = "\n".join(lines)
-        write_txt(content, output_path)
-

+ 0 - 49
core/construction_review/component/check_completeness/config/Construction_Plan_Content_Specification.csv

@@ -1,49 +0,0 @@
-标签	一级目录	二级目录	内容要求
-basis	编制依据	法律法规	法律法规包括国家、工程所在地省级政府发布的法律法规、规章制度等;
-basis	编制依据	标准规范	标准规范包括行业标准、技术规程等;
-basis	编制依据	文件制度	文件制度包括四川路桥、路桥集团、桥梁公司、建设单位下发的文件制度和管理程序文件等;
-basis	编制依据	编制原则	编制原则应认真贯彻执行国家方针、政策、标准和设计文件,严格执行基本建设程序,实现工程项目的全部功能;
-basis	编制依据	编制范围	编制范围应填写完整,涵盖本方案包含的所有工程,部分工程可简要说明采取的施工工艺。
-overview	工程概况	设计概况	设计概况包含工程简介、主要技术标准两个方面。
-overview	工程概况	工程地质与水文气象	工程地质与水文气象主要包括与该工程有关的水文状况、气候条件等。
-overview	工程概况	周边环境	周边环境主要包括与该工程有关的主要建(构)筑物、山体、边坡、河谷、深基坑、道路、高压电、地下管线的位置关系、结构尺寸等情况
-overview	工程概况	施工平面及立面布置	施工平面及立面布置包括本项目拌和站、钢筋加工场、材料(临时)堆码区域的位置和与该工程的距离,施工作业平台(场站)的尺寸、地面形式以及施工便道的长度、宽度、路面形式、最小弯曲半径,临时用水的来源、管线布置、距离,变压器、配电箱的位置、大小,线路走向,敷设方式等。
-overview	工程概况	施工要求和技术保证条件	施工要求和技术保证条件包含工期目标、质量目标、安全目标、环境目标。工期目标包括本项目的总体工期和本工程的工期,仅需说明起止时间和持续时间。质量目标、安全目标和环境目标应根据施工合同和业主要求填写。
-overview	工程概况	风险辨识与分级	风险辨识与分级包含在施工过程中所有的危险源,并按照法律法规的要求对其进行分级,并说明其应对措施。
-overview	工程概况	参建各方责任主体单位	参建各方责任主体单位主要描述该项目的建设单位、设计单位、监理单位、施工单位、监控单位、专业分包单位的名称。
-plan	施工计划	施工进度计划	施工进度计划包括主要工序作业时间分析、关键工程(工序)节点安排、施工进度计划横道图等。
-plan	施工计划	施工材料计划	施工材料计划包含方案实施过程中需要使用的所有施工措施材料,明确材料名称、规格、数量、重量、来源。
-plan	施工计划	施工设备计划	施工设备计划包含方案实施过程中需要使用的主要机械设备,应明确设备名称、规格、数量、来 源。
-plan	施工计划	劳动力计划	劳动力计划包含各阶段(周、旬、月或季度)不同工种的作业人员投入情况。
-plan	施工计划	安全生产费用使用计划	安全生产费用使用计划包含实施本方案拟投入的安全费用类别、费用名称、单 项投入金额和安全生产费用总额。
-technology	施工工艺技术	主要施工方法概述	主要施工方法概述应简要说明采取的主要施工工艺和施工方法,以及模板等重 要材料的配置数量。
-technology	施工工艺技术	技术参数	技术参数包含主要使用材料的类型、规格,以及主要设备的名称、型号、出厂 时间、性能参数、自重等。
-technology	施工工艺技术	工艺流程	施工准备包含测量放样、临时用水、临时用电、场地、人员、设备、安全防护 措施和人员上下通道等内容。
-technology	施工工艺技术	施工准备	工艺流程包含整个方案的主要施工工序,按照施工的先后顺序
-technology	施工工艺技术	施工方法及操作要求	施工方法及操作要求根据工艺流程中主要的施工工序依次进行描述其操作方法, 并说明施工要点,常见问题及预防、处理措施。
-technology	施工工艺技术	检查要求	检查要求包含所用的材料,构配件进场质量检查、抽查,以及施工过程中各道 工序检查内容及标准。
-safety	安全保证措施	安全保证体系	
-safety	安全保证措施	组织保证措施	组织保证措施包含安全管理组织机构、人员安全职责。
-safety	安全保证措施	技术保证措施	技术保证措施应按总体安全措施,主要工序的安全保证措施进行梳理和说明
-safety	安全保证措施	监测监控措施	监测监控措施包括监测组织机构、监测范围、监测项目、监测点的设置、监测 仪器设备、监测方法、监测频率、预警值及控制值、信息反馈等内容。
-safety	安全保证措施	应急处置措施	应急处置措施包含应急处置程序、应急处置措施、应急物资及设备保障、交通 疏导与医疗救援、后期处置等六个方面。
-quality	质量保证措施	质量保证体系	
-quality	质量保证措施	质量目标	
-quality	质量保证措施	工程创优规划	工程创优规划包含制定工程创优总体计划,做好技术准备工作,加强过程控制,重视细部处理,创建精品工程,推广应用新技术,申报资料、工程资料的收集与整理等内容
-quality	质量保证措施	质量控制程序与具体措施	质量控制程序与具体措施包含原材料、实体工程质量检查验收程序和要求,主 要工序的质量通病、预防措施,以及季节性(冬期、高温、雨期)施工的质量保证 措施。
-environment	环境保证措施	环境保证体系	
-environment	环境保证措施	环境保护组织机构	环境保护组织机构包含管理人员姓名、职务、职责。
-environment	环境保证措施	环境保护及文明施工措施	环境保护及文明施工措施包含办公、生活区环境卫生保证措施,施工区域水土 保持保证措施、噪声污染防治措施、水污染防治措施、大气污染防治措施。
-management	施工管理及作业人员配备与分工	施工管理人员	施工管理人员以表格的形式说明管理人员名单及岗位职责
-management	施工管理及作业人员配备与分工	专职安全生产管理人员	
-management	施工管理及作业人员配备与分工	特种作业人员	
-management	施工管理及作业人员配备与分工	其他作业人员	其他作业人员包含专业分包单位(协作队伍)管理人员数量,不同工种(班组、 区域)的作业人员数量等。
-acceptance	验收要求	验收标准	验收标准包含国家和行业的标准、规范、操作规程、四川路桥、路桥集团和桥 梁公司的管理办法等。
-acceptance	验收要求	验收程序	验收程序包括进场验收、过程验收、阶段验收、完工验收等时间节点的具体验 收程序。
-acceptance	验收要求	验收内容	
-acceptance	验收要求	验收时间	
-acceptance	验收要求	验收人员	验收人员应包括建设、设计、施工、 监理、监测等单位相关人员,并明确验收人员姓名。
-other	其他资料	计算书	
-other	其他资料	相关施工图纸	
-other	其他资料	附图附表	
-other	其他资料	编制及审核人员情况	

+ 0 - 39
core/construction_review/component/check_completeness/config/prompt.yaml

@@ -1,39 +0,0 @@
-content_review:
-  system: |
-    你是一名工程与施工领域的专业文档审查专家,负责审查施工方案文档的内容完整性。
-    - 仔细分析待审查文本内容,判断是否包含每个审查要点要求的内容;
-    - 对于每个审查要点,如果文本中明确包含或涵盖了该要点要求的内容,返回true,否则返回false;
-    - 判断要严格但合理,如果文本内容能够满足要点的核心要求,即使表述方式不同,也应判定为true;
-    - 只输出JSON格式,不要添加任何解释性文字;
-
-    - /no_think
-  user_template: |
-    任务:审查施工方案文档内容是否包含所有必需的要点。
-
-    待审查文本内容:
-    {{ content }}
-
-    审查要点要求:
-    {{ requirements }}
-
-    输出格式:务必须严格按照以下标准json格式输出审查结果:
-    如果未发现明显的词句语法错误,请输出:无明显问题。
-    如果发现问题,请按以下格式输出:
-    location字段直接输出原字段内容,不得猜测。
-      - 必须输出一个 JSON 对象(不能是数组、列表或其他结构);
-      - JSON 对象的格式如下:
-      {
-        "issue_point": "[内容缺失]具体问题描述(格式严格按照:[内容缺失]具体问题描述,不得缺失,如:[内容缺失]未包含施工进度计划等内容),",
-        "location": "问题所在的原始条款内容及位置(如:三、施工方法 (页码: 12)),包含必要的上下文",
-        "suggestion": "基于逻辑规则的具体修改建议(1.必须是补全内容缺失错误,而非优化表达;2.当待审查文本内容中存在<表格></表格>标签对时,必须严格提醒用户检查要点是否在表格中)",
-        "reason": "详细说明为何这是一个内容缺失错误,包括:1)内容缺失在哪里 2)为何需要补全 3)可能产生的后果",
-        "risk_level": "高风险/中风险/低风险(严格按照系统提示词中的标准判定)"
-      }
-
-
-
-
-
-
-
-

+ 1 - 0
core/construction_review/component/check_completeness/README.md → core/construction_review/component/reviewers/check_completeness/README.md

@@ -88,3 +88,4 @@ python main.py
 4. 如果规范文件中没有对应的标签,会在结果中标记错误
 
 
+

+ 1 - 0
core/construction_review/component/check_completeness/components/__init__.py → core/construction_review/component/reviewers/check_completeness/components/__init__.py

@@ -3,3 +3,4 @@
 """
 
 
+

+ 3 - 1
core/construction_review/component/check_completeness/components/data_loader.py → core/construction_review/component/reviewers/check_completeness/components/data_loader.py

@@ -36,6 +36,7 @@ class CSVDataLoader(IDataLoader):
             tag = row.get('标签', '').strip()
             level2 = row.get('二级目录', '').strip()
             requirement = row.get('内容要求', '').strip()
+            point_count = row.get('内容要点数量', '').strip()
             
             if not tag:
                 continue
@@ -47,7 +48,8 @@ class CSVDataLoader(IDataLoader):
             if level2:
                 specification[tag].append({
                     "二级目录": level2,
-                    "内容要求": requirement
+                    "内容要求": requirement,
+                    "内容要点数量": point_count
                 })
         
         return specification

+ 0 - 0
core/construction_review/component/check_completeness/components/llm_client.py → core/construction_review/component/reviewers/check_completeness/components/llm_client.py


+ 9 - 3
core/construction_review/component/check_completeness/components/prompt_builder.py → core/construction_review/component/reviewers/check_completeness/components/prompt_builder.py

@@ -38,13 +38,19 @@ class PromptBuilder(IPromptBuilder):
         Returns:
             包含system和user的提示词字典
         """
-        # 构建审查要点要求字符串
+        # 构建审查要点要求字符串,明确展示每个要点的编号和允许的编号范围
         requirements_text = ""
         for req in requirements:
             level2_name = req.get('二级目录', '')
             requirement = req.get('内容要求', '')
-            if level2_name:
-                requirements_text += f"- {level2_name}: {requirement}\n"
+            point_count = (req.get('内容要点数量') or '').strip()
+            range_hint = ''
+            if point_count.isdigit():
+                # 告诉模型该二级目录最多只有 N 个要点,编号只能在 1~N 之间
+                range_hint = f"(本二级目录共有{point_count}个要点,只允许使用编号1~{point_count})"
+            if level2_name and requirement:
+                # 格式化要求文本,确保编号清晰可见,并包含编号范围提示
+                requirements_text += f"{level2_name}{range_hint}:\n{requirement}\n\n"
         
         # 获取模板
         system_template = self.review_config.get('system', '')

+ 247 - 0
core/construction_review/component/reviewers/check_completeness/components/result_analyzer.py

@@ -0,0 +1,247 @@
+"""
+结果汇总与规范覆盖分析组件
+"""
+from typing import Dict, List, Any, Set
+import ast
+import sys
+from pathlib import Path
+
+# 添加项目根目录到路径,支持相对导入
+_root = Path(__file__).parent.parent
+if str(_root) not in sys.path:
+    sys.path.insert(0, str(_root))
+
+from interfaces import IResultAnalyzer
+from utils.file_utils import read_csv, write_csv
+
+
+class ResultAnalyzer(IResultAnalyzer):
+    """审查结果汇总分析器"""
+
+    def __init__(self, spec_csv_path: str):
+        """
+        Args:
+            spec_csv_path: 规范 CSV 文件路径(Construction_Plan_Content_Specification.csv)
+        """
+        self.spec_csv_path = spec_csv_path
+
+    def process_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """
+        按规则清洗审查结果,生成新的 JSON 列表
+
+        - 新列表仅保留字段:chunk_id、section_label、chapter_classification、review_result
+        - 如果 section_label 中包含 "->"(视为包含两段及以上标题),
+          则遍历 review_result 的键名(即二级目录名称):
+            * 若键名未出现在 section_label 的字符串中,则将该键对应的值列表清空 []
+        - 所有要点编号列表在块内部去重
+        """
+        processed: List[Dict[str, Any]] = []
+
+        for item in results:
+            chunk_id = item.get("chunk_id", "")
+            section_label = item.get("section_label", "") or ""
+            chapter_classification = item.get("chapter_classification", "") or ""
+            review_result = item.get("review_result", {})
+
+            if not isinstance(review_result, dict):
+                review_result = {}
+
+            new_review_result: Dict[str, List[int]] = {}
+
+            has_multi_titles = "->" in section_label
+
+            for key, value in review_result.items():
+                # 只接受列表类型的要点编号
+                points: List[int] = []
+                if isinstance(value, list):
+                    for v in value:
+                        if isinstance(v, int):
+                            points.append(v)
+                        elif isinstance(v, str) and v.isdigit():
+                            points.append(int(v))
+                # 去重并排序
+                points = sorted(set(points))
+
+                if has_multi_titles:
+                    # 标题中未出现该二级目录名,则清空
+                    if key and key not in section_label:
+                        new_review_result[key] = []
+                    else:
+                        new_review_result[key] = points
+                else:
+                    new_review_result[key] = points
+
+            processed.append(
+                {
+                    "chunk_id": chunk_id,
+                    "section_label": section_label,
+                    "chapter_classification": chapter_classification,
+                    "review_result": new_review_result,
+                }
+            )
+
+        return processed
+
+    def build_spec_summary(
+        self, processed_results: List[Dict[str, Any]], output_csv_path: str = None
+    ) -> List[Dict[str, Any]]:
+        """
+        基于规范 CSV 与处理后的审查结果,生成规范要点覆盖汇总表
+
+        - 从规范 CSV 读取原始行,增加三列:
+            * 审查到的要点: 形如 [1, 2, 3]
+            * 缺失的要点: 形如 [1, 3]
+            * 要点来源:    形如 ["第五章 施工安全保证措施->一) 组织保证措施", ...]
+        - 对于每个规范行(标签 + 二级目录):
+            * 从所有块中收集该标签 & 二级目录下出现的要点编号(不重复)
+            * 根据“内容要点数量”推算缺失的要点编号
+            * 将出现过要点的块的 section_label 作为来源去重记录
+        """
+        # 读取规范原始表(制表符分隔)
+        spec_rows = read_csv(self.spec_csv_path, delimiter="\t")
+
+        # 预处理:按 (标签, 二级目录) 聚合审查结果
+        # key: (chapter_classification, level2)
+        points_found_map: Dict[str, Set[int]] = {}
+        sources_map: Dict[str, Set[str]] = {}
+
+        for item in processed_results:
+            chapter_classification = (item.get("chapter_classification") or "").strip()
+            section_label = (item.get("section_label") or "").strip()
+            review_result = item.get("review_result", {}) or {}
+
+            if not chapter_classification or not isinstance(review_result, dict):
+                continue
+
+            # 截断来源标题:只保留前两段(按 "->" 分隔)
+            if "->" in section_label:
+                parts = [p.strip() for p in section_label.split("->") if p.strip()]
+                if len(parts) >= 2:
+                    source_label = "->".join(parts[:2])
+                else:
+                    source_label = parts[0] if parts else section_label
+            else:
+                source_label = section_label
+
+            for level2_name, points in review_result.items():
+                if not level2_name:
+                    continue
+
+                key = f"{chapter_classification}_{level2_name}"
+                if key not in points_found_map:
+                    points_found_map[key] = set()
+                    sources_map[key] = set()
+
+                if isinstance(points, list) and points:
+                    for p in points:
+                        if isinstance(p, int):
+                            points_found_map[key].add(p)
+                        elif isinstance(p, str) and p.isdigit():
+                            points_found_map[key].add(int(p))
+                    # 只要该块在该二级目录下有任何要点,就记录来源(截断后的标题)
+                    sources_map[key].add(source_label)
+
+        # 根据规范逐行生成统计结果
+        summary_rows: List[Dict[str, Any]] = []
+        for row in spec_rows:
+            tag = (row.get("标签") or "").strip()
+            level2 = (row.get("二级目录") or "").strip()
+            point_count_str = (row.get("内容要点数量") or "").strip()
+
+            # 解析要点总数
+            try:
+                total_points = int(point_count_str)
+            except (TypeError, ValueError):
+                total_points = 0
+
+            key = f"{tag}_{level2}"
+            found_points = sorted(points_found_map.get(key, set()))
+
+            if total_points > 0:
+                missing_points = [
+                    i for i in range(1, total_points + 1) if i not in found_points
+                ]
+            else:
+                missing_points = []
+
+            sources = sorted(sources_map.get(key, set())) if key in sources_map else []
+
+            # 组装输出行(在原规范行基础上增加三列)
+            new_row = dict(row)
+            new_row["审查到的要点"] = str(found_points)
+            new_row["缺失的要点"] = str(missing_points)
+            new_row["要点来源"] = str(sources)
+
+            summary_rows.append(new_row)
+
+        # 写出 CSV,使用 UTF-8-SIG 编码(write_csv 内部已固定为 utf-8-sig)
+        # 使用逗号分隔符便于通用 CSV 工具查看
+        if output_csv_path:
+            write_csv(summary_rows, output_csv_path, delimiter=",")
+        return summary_rows
+
+    # 生成缺失要点的 JSON 列表,便于前端或其他系统直接消费
+    def build_missing_issue_list(
+        self, summary_rows: List[Dict[str, Any]]
+    ) -> List[Dict[str, Any]]:
+        issues: List[Dict[str, Any]] = []
+
+        for row in summary_rows:
+            level2 = (row.get("二级目录") or "").strip()
+            requirement = (row.get("内容要求") or "").strip()
+
+            missing_points_raw = row.get("缺失的要点", "")
+            missing_points = self._parse_list_field(missing_points_raw)
+            if not missing_points:
+                continue
+
+            sources_raw = row.get("要点来源", "")
+            sources = self._parse_list_field(sources_raw)
+            location = "; ".join(map(str, sources)) if sources else ""
+
+            # missing_numbers_text = "、".join(str(m) for m in missing_points)
+            requirement_list = requirement.split(':')[-1].split(';')
+            requirement_text = ';'.join([requirement_list[i-1] for i in missing_points])
+            issue_point = (
+                f"[{level2}内容缺失]未包含要点:{requirement_text}"
+            )
+            suggestion = f"补充:{requirement_text}" if requirement else "补充缺失要点内容"
+            risk_level = self._map_risk_level(len(missing_points))
+
+            issues.append(
+                {
+                    "issue_point": issue_point,
+                    "location": location,
+                    "suggestion": suggestion,
+                    "reason": requirement,
+                    "risk_level": risk_level,
+                }
+            )
+
+        return issues
+
+    @staticmethod
+    def _parse_list_field(value: Any) -> List[Any]:
+        """把 CSV 中的列表字符串安全转回列表"""
+        if isinstance(value, list):
+            return value
+        if not value:
+            return []
+        try:
+            parsed = ast.literal_eval(value)
+            if isinstance(parsed, list):
+                return parsed
+        except Exception:
+            return []
+        return []
+
+    @staticmethod
+    def _map_risk_level(missing_count: int) -> str:
+        """根据缺失要点数量映射风险等级"""
+        if missing_count >= 3:
+            return "高风险"
+        if missing_count == 2:
+            return "中风险"
+        return "低风险"
+
+

+ 140 - 0
core/construction_review/component/reviewers/check_completeness/components/result_processor.py

@@ -0,0 +1,140 @@
+"""
+结果处理组件实现
+"""
+import json
+import re
+from typing import Dict, List, Any
+import sys
+from pathlib import Path
+
+# 添加项目根目录到路径,支持相对导入
+_root = Path(__file__).parent.parent
+if str(_root) not in sys.path:
+    sys.path.insert(0, str(_root))
+
+from interfaces import IResultProcessor
+
+
+class ResultProcessor(IResultProcessor):
+    """结果处理器"""
+    
+    def parse_result(self, llm_response: str, requirements: List[Dict[str, str]]) -> Dict[str, List[int]]:
+        """
+        解析LLM返回结果
+        
+        Args:
+            llm_response: LLM返回的文本
+            requirements: 审查要求列表(用于验证二级目录名称)
+            
+        Returns:
+            字典,key为二级目录名称,value为包含的要点编号列表
+            格式: {"法律法规": [1, 2], "标准规范": [1], "文件制度": []}
+        """
+        text = (llm_response or "").strip()
+        if not text:
+            return {}
+
+        # 提取JSON部分
+        json_str = self._extract_json(text)
+        if not json_str:
+            # 如果无法提取JSON,返回空字典
+            return {}
+
+        try:
+            result = json.loads(json_str)
+        except json.JSONDecodeError:
+            # JSON解析失败,返回空字典
+            return {}
+
+        # 验证结果格式:应该是字典,字段名为二级目录,值为整数列表
+        if not isinstance(result, dict):
+            return {}
+
+        # 规范化结果:确保所有值都是整数列表
+        normalized_result = {}
+        for key, value in result.items():
+            if isinstance(value, list):
+                # 过滤出整数,忽略非整数项
+                int_list = [int(item) for item in value if isinstance(item, (int, str)) and str(item).isdigit()]
+                normalized_result[key] = int_list
+            elif isinstance(value, (int, str)) and str(value).isdigit():
+                # 单个数字,转换为列表
+                normalized_result[key] = [int(value)]
+            else:
+                # 其他类型,设为空列表
+                normalized_result[key] = []
+
+        return normalized_result
+    
+    def _extract_json(self, text: str) -> str:
+        """
+        从文本中提取JSON字符串
+        
+        Args:
+            text: 原始文本
+            
+        Returns:
+            JSON字符串
+        """
+        text = text.strip()
+        
+        # 首先尝试查找代码块中的JSON
+        code_block_pattern = r'```(?:json)?\s*(\{.*?\})\s*```'
+        code_matches = re.findall(code_block_pattern, text, re.DOTALL)
+        if code_matches:
+            # 尝试解析,找到第一个有效的JSON
+            for match in code_matches:
+                try:
+                    json.loads(match)
+                    return match
+                except json.JSONDecodeError:
+                    continue
+        
+        # 尝试直接解析整个文本
+        try:
+            json.loads(text)
+            return text
+        except json.JSONDecodeError:
+            pass
+        
+        # 使用更智能的方法:从第一个 { 开始,找到匹配的 }
+        start_idx = text.find('{')
+        if start_idx == -1:
+            return text
+        
+        # 从开始位置查找匹配的结束大括号
+        brace_count = 0
+        in_string = False
+        escape_next = False
+        
+        for i in range(start_idx, len(text)):
+            char = text[i]
+            
+            if escape_next:
+                escape_next = False
+                continue
+            
+            if char == '\\':
+                escape_next = True
+                continue
+            
+            if char == '"' and not escape_next:
+                in_string = not in_string
+                continue
+            
+            if not in_string:
+                if char == '{':
+                    brace_count += 1
+                elif char == '}':
+                    brace_count -= 1
+                    if brace_count == 0:
+                        # 找到匹配的结束大括号
+                        json_str = text[start_idx:i+1]
+                        try:
+                            json.loads(json_str)
+                            return json_str
+                        except json.JSONDecodeError:
+                            pass
+        
+        return text
+

+ 218 - 0
core/construction_review/component/reviewers/check_completeness/components/result_saver.py

@@ -0,0 +1,218 @@
+"""
+结果保存组件
+"""
+from typing import Dict, List, Any
+import sys
+from pathlib import Path
+from collections import defaultdict
+
+# 添加项目根目录到路径,支持相对导入
+_root = Path(__file__).parent.parent
+if str(_root) not in sys.path:
+    sys.path.insert(0, str(_root))
+
+from utils.file_utils import write_csv, write_txt
+
+
+class ResultSaver:
+    """结果保存器"""
+    
+    @staticmethod
+    def save_to_csv(results: List[Dict[str, Any]], 
+                   specification: Dict[str, List[Dict[str, str]]],
+                   output_path: str) -> None:
+        """
+        保存审查结果到CSV文件
+        格式:chunk_id | chapter_classification | section_label | page | 二级目录 | 内容要点数量 | 包含的要点编号
+        
+        Args:
+            results: 审查结果列表
+            specification: 规范字典,用于查找内容要点数量
+            output_path: 输出文件路径
+        """
+        # 构建二级目录到内容要点数量的映射(按分类)
+        level2_point_count_map = {}
+        for tag, requirements in specification.items():
+            for req in requirements:
+                level2_name = req.get('二级目录', '')
+                point_count = req.get('内容要点数量', '')
+                if level2_name:
+                    # 使用"分类_二级目录"作为key,避免不同分类下的同名二级目录冲突
+                    key = f"{tag}_{level2_name}"
+                    level2_point_count_map[key] = point_count
+        
+        csv_rows = []
+        
+        for result in results:
+            chunk_id = result.get('chunk_id', '')
+            chapter_classification = result.get('chapter_classification', '')
+            section_label = result.get('section_label', '')
+            page = result.get('page', '')
+            review_result = result.get('review_result', {})
+            
+            # 如果审查失败,记录错误信息
+            if isinstance(review_result, dict) and 'error' in review_result:
+                row = {
+                    'chunk_id': chunk_id,
+                    'chapter_classification': chapter_classification,
+                    'section_label': section_label,
+                    'page': page,
+                    '二级目录': f"错误: {review_result['error']}",
+                    '内容要点数量': '',
+                    '包含的要点编号': ''
+                }
+                csv_rows.append(row)
+                continue
+            
+            # 如果结果格式正确(字典格式:二级目录->编号列表)
+            if isinstance(review_result, dict):
+                # 如果没有二级目录数据,记录一条空记录
+                if not review_result:
+                    row = {
+                        'chunk_id': chunk_id,
+                        'chapter_classification': chapter_classification,
+                        'section_label': section_label,
+                        'page': page,
+                        '二级目录': '无数据',
+                        '内容要点数量': '',
+                        '包含的要点编号': ''
+                    }
+                    csv_rows.append(row)
+                else:
+                    # 为每个二级目录创建一行
+                    for level2_name, point_numbers in review_result.items():
+                        # 将编号列表转换为字符串,如 "[1, 2]" 或 "[]"
+                        numbers_str = str(point_numbers) if isinstance(point_numbers, list) else str([])
+                        
+                        # 查找该二级目录的内容要点数量
+                        key = f"{chapter_classification}_{level2_name}"
+                        point_count = level2_point_count_map.get(key, '')
+                        
+                        row = {
+                            'chunk_id': chunk_id,
+                            'chapter_classification': chapter_classification,
+                            'section_label': section_label,
+                            'page': page,
+                            '二级目录': level2_name,
+                            '内容要点数量': point_count,
+                            '包含的要点编号': numbers_str
+                        }
+                        csv_rows.append(row)
+            else:
+                # 格式不正确,记录错误
+                row = {
+                    'chunk_id': chunk_id,
+                    'chapter_classification': chapter_classification,
+                    'section_label': section_label,
+                    'page': page,
+                    '二级目录': '格式错误',
+                    '内容要点数量': '',
+                    '包含的要点编号': str(review_result)
+                }
+                csv_rows.append(row)
+        
+        # 写入CSV文件(使用逗号分隔符)
+        write_csv(csv_rows, output_path, delimiter=',')
+    
+    @staticmethod
+    def save_statistics(results: List[Dict[str, Any]], 
+                       specification: Dict[str, List[Dict[str, str]]],
+                       output_path: str) -> None:
+        """
+        保存统计结果到TXT文件
+        
+        Args:
+            results: 审查结果列表
+            specification: 规范字典
+            output_path: 输出文件路径
+        """
+        # 统计信息
+        total_chunks = len(results)
+        error_count = 0    # 解析或流程错误
+        total_points_found = 0  # 找到的要点总数
+        
+        # 按分类统计
+        classification_stats = defaultdict(lambda: {
+            'total': 0,
+            'errors': 0,
+            'points_found': 0,
+            'level2_stats': defaultdict(lambda: {'count': 0, 'points': 0})  # 每个二级目录的统计
+        })
+        
+        # 按二级目录统计(跨所有分类)
+        level2_global_stats = defaultdict(lambda: {'chunks': 0, 'total_points': 0})
+        
+        for result in results:
+            chapter_classification = result.get('chapter_classification', '')
+            review_result = result.get('review_result', {})
+            
+            # 错误记录
+            if isinstance(review_result, dict) and 'error' in review_result:
+                error_count += 1
+                classification_stats[chapter_classification]['errors'] += 1
+                classification_stats[chapter_classification]['total'] += 1
+                continue
+            
+            # 正常结果(字典格式:二级目录->编号列表)
+            if isinstance(review_result, dict):
+                classification_stats[chapter_classification]['total'] += 1
+                
+                # 统计每个二级目录的要点
+                for level2_name, point_numbers in review_result.items():
+                    if isinstance(point_numbers, list):
+                        point_count = len(point_numbers)
+                        total_points_found += point_count
+                        classification_stats[chapter_classification]['points_found'] += point_count
+                        classification_stats[chapter_classification]['level2_stats'][level2_name]['count'] += 1
+                        classification_stats[chapter_classification]['level2_stats'][level2_name]['points'] += point_count
+                        level2_global_stats[level2_name]['chunks'] += 1
+                        level2_global_stats[level2_name]['total_points'] += point_count
+        
+        # 生成统计文本
+        lines = []
+        lines.append("=" * 80)
+        lines.append("文件要点审查统计报告")
+        lines.append("=" * 80)
+        lines.append("")
+        
+        # 总体统计
+        lines.append("【总体统计】")
+        lines.append(f"  总文档块数: {total_chunks}")
+        lines.append(f"  解析/流程错误数: {error_count}")
+        lines.append(f"  找到的要点总数: {total_points_found}")
+        lines.append("")
+        
+        # 按分类统计
+        lines.append("【按分类统计】")
+        for classification, stats in sorted(classification_stats.items()):
+            total = stats['total']
+            errors = stats['errors']
+            points_found = stats['points_found']
+            lines.append(f"  {classification}:")
+            lines.append(f"    总数: {total}")
+            lines.append(f"    错误: {errors}")
+            lines.append(f"    找到的要点数: {points_found}")
+            
+            # 显示该分类下各二级目录的统计
+            if stats['level2_stats']:
+                lines.append(f"    二级目录详情:")
+                for level2_name, level2_stat in sorted(stats['level2_stats'].items()):
+                    lines.append(f"      - {level2_name}: {level2_stat['count']}个文档块,共{level2_stat['points']}个要点")
+            lines.append("")
+        
+        # 按二级目录全局统计
+        lines.append("【按二级目录全局统计】")
+        for level2_name, stats in sorted(level2_global_stats.items()):
+            lines.append(f"  {level2_name}:")
+            lines.append(f"    涉及文档块数: {stats['chunks']}")
+            lines.append(f"    找到的要点总数: {stats['total_points']}")
+            lines.append("")
+        
+        lines.append("=" * 80)
+        lines.append("统计完成")
+        lines.append("=" * 80)
+        
+        # 写入文件
+        content = "\n".join(lines)
+        write_txt(content, output_path)
+

+ 0 - 0
core/construction_review/component/check_completeness/components/review_pipeline.py → core/construction_review/component/reviewers/check_completeness/components/review_pipeline.py


BIN
core/construction_review/component/reviewers/check_completeness/config/Construction_Plan_Content_Specification.csv


+ 5 - 5
core/construction_review/component/check_completeness/config/llm_api.yaml → core/construction_review/component/reviewers/check_completeness/config/llm_api.yaml

@@ -16,12 +16,12 @@ doubao:
   DOUBAO_API_KEY: YOUR_DOUBAO_API_KEY_FOR_RAG_EVAL
 
 qwen:
-  # QWEN_SERVER_URL: http://192.168.91.253:8003/v1/
-  # QWEN_MODEL_ID: qwen3-30b
-  # QWEN_API_KEY: sk-123456
-  QWEN_SERVER_URL: http://192.168.91.253:9002/v1/
-  QWEN_MODEL_ID: Qwen3-8B
+  QWEN_SERVER_URL: http://192.168.91.253:8003/v1/
+  QWEN_MODEL_ID: qwen3-30b
   QWEN_API_KEY: sk-123456
+  # QWEN_SERVER_URL: http://192.168.91.253:9002/v1/
+  # QWEN_MODEL_ID: Qwen3-8B
+  # QWEN_API_KEY: sk-123456
 
 keywords:
   timeout: 30

+ 42 - 0
core/construction_review/component/reviewers/check_completeness/config/prompt.yaml

@@ -0,0 +1,42 @@
+content_review:
+  system: |
+    你是一名工程与施工领域的专业文档审查专家,负责审查施工方案文档的内容完整性。
+    - 仔细分析待审查文本内容,识别文本中实际包含的审查要点;
+    - 对于每个二级目录,检查文本中包含了哪些编号的要点,将这些编号记录在列表中;
+    - 如果某个二级目录的要点一个都没有包含,则返回空列表[];
+    - 判断要严格但合理,如果文本内容能够满足要点的核心要求,即使表述方式不同,也应判定为已包含;
+    - 每个二级目录的要点编号必须严格在给定范围内(例如“只允许使用编号1~2”),严禁编造超出范围的编号(如3、4、5等);
+    - 不得跳过不存在的编号,也不得添加规范中未定义的额外要点编号;
+    - 只输出JSON格式,不要添加任何解释性文字;
+
+    - /no_think
+  user_template: |
+    任务:审查施工方案文档内容,识别文本中实际包含的审查要点。
+
+    待审查文本内容:
+    {{ content }}
+
+    审查要点要求:
+    {{ requirements }}
+
+    输出格式:必须严格按照以下JSON格式输出审查结果:
+    {
+      "二级目录名称1": [要点编号列表,如: [1, 2]],
+      "二级目录名称2": [要点编号列表,如: [1]],
+      "二级目录名称3": []
+    }
+    
+    说明:
+    - JSON对象的字段名必须是二级目录名称(如"法律法规"、"标准规范"等);
+    - 每个字段的值是一个整数数组,表示文本中包含的要点编号;
+    - 每个二级目录的要点编号必须在对应说明中给出的范围之内(例如“只允许使用编号1~2”时,只能使用1或2),不能发明更大的编号;
+    - 如果某个二级目录的要点一个都没有包含,该字段的值应为空数组[];
+    - 只输出JSON对象,不要添加任何解释性文字。
+
+
+
+
+
+
+
+

+ 37 - 4
core/construction_review/component/check_completeness/interfaces.py → core/construction_review/component/reviewers/check_completeness/interfaces.py

@@ -76,17 +76,17 @@ class IResultProcessor(ABC):
     """结果处理接口"""
     
     @abstractmethod
-    def parse_result(self, llm_response: str, requirements: List[Dict[str, str]]) -> List[Dict[str, Any]]:
+    def parse_result(self, llm_response: str, requirements: List[Dict[str, str]]) -> Dict[str, List[int]]:
         """
         解析LLM返回结果
         
         Args:
             llm_response: LLM返回的文本
-            requirements: 审查要求列表(目前仅用于兼容接口,可选
+            requirements: 审查要求列表(用于验证二级目录名称
             
         Returns:
-            问题列表,每个问题包含:issue_point, location, suggestion, reason, risk_level
-            如果没有问题,返回空列表
+            字典,key为二级目录名称,value为包含的要点编号列表
+            格式: {"法律法规": [1, 2], "标准规范": [1], "文件制度": []}
         """
         raise NotImplementedError
 
@@ -110,3 +110,36 @@ class IReviewPipeline(ABC):
         raise NotImplementedError
 
 
+class IResultAnalyzer(ABC):
+    """结果汇总分析接口"""
+    
+    @abstractmethod
+    def process_results(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """
+        按业务规则对审查结果进行清洗,生成新的结果列表
+        
+        Args:
+            results: 审查流水线的原始结果列表
+            
+        Returns:
+            新的结果列表,仅保留 chunk_id、section_label、
+            chapter_classification、review_result 四个字段,
+            且按标题规则清空不相关二级目录的要点列表。
+        """
+        raise NotImplementedError
+    
+    @abstractmethod
+    def build_spec_summary(
+        self, processed_results: List[Dict[str, Any]], output_csv_path: str
+    ) -> None:
+        """
+        基于规范表与处理后的结果生成规范要点覆盖统计 CSV
+        
+        Args:
+            processed_results: 处理后的结果列表
+            output_csv_path: 汇总结果输出 CSV 路径
+        """
+        raise NotImplementedError
+
+
+

+ 34 - 22
core/construction_review/component/check_completeness/main.py → core/construction_review/component/reviewers/check_completeness/main.py

@@ -2,7 +2,6 @@
 文件要点审查模块主程序
 """
 import asyncio
-import os
 from pathlib import Path
 from components.data_loader import CSVDataLoader
 from components.prompt_builder import PromptBuilder
@@ -10,8 +9,9 @@ from components.llm_client import LLMClient
 from components.result_processor import ResultProcessor
 from components.review_pipeline import ReviewPipeline
 from components.result_saver import ResultSaver
+from components.result_analyzer import ResultAnalyzer
 from utils.file_utils import write_json
-
+import time
 
 async def main():
     """主函数"""
@@ -58,6 +58,7 @@ async def main():
     )
     print("  组件初始化完成")
     
+    start_time = time.time()
     # 3. 执行审查
     print("\n[4/5] 开始执行审查...")
     print(f"  使用模型: {llm_client.model_type}")
@@ -66,10 +67,10 @@ async def main():
     results = await review_pipeline.review(documents, specification)
     
     # 统计结果
-    success_count = sum(1 for r in results if 'error' not in r.get('review_result', {}))
+    success_count = sum(1 for r in results if isinstance(r.get('review_result', {}), dict) and 'error' not in r.get('review_result', {}))
     error_count = len(results) - success_count
     print(f"\n  审查完成: 成功 {success_count} 个, 失败 {error_count} 个")
-    
+
     # 4. 保存结果
     print("\n[5/5] 保存审查结果...")
     
@@ -92,7 +93,21 @@ async def main():
     stats_output_path = base_dir / 'output' / 'review_statistics.txt'
     ResultSaver.save_statistics(results, specification, str(stats_output_path))
     print(f"  统计结果已保存至: {stats_output_path}")
-    
+
+    # 6. 使用结果解析处理组件,生成规范覆盖汇总表
+    print("\n[6/6] 生成规范要点覆盖汇总表...")
+    analyzer = ResultAnalyzer(str(csv_path))
+    processed_results = analyzer.process_results(results)
+    spec_summary_csv_path = base_dir / 'output' / 'spec_review_summary.csv'
+    summary_rows = analyzer.build_spec_summary(processed_results, str(spec_summary_csv_path))
+    print(f"  规范覆盖汇总结果已保存至: {spec_summary_csv_path}")
+
+    # 生成缺失要点 JSON 列表,便于前端消费
+    missing_issue_json_path = base_dir / 'output' / 'spec_review_missing_issues.json'
+    missing_issue_list = analyzer.build_missing_issue_list(summary_rows)
+    write_json(missing_issue_list, str(missing_issue_json_path))
+    print(f"  缺失要点 JSON 已保存至: {missing_issue_json_path}")
+    cost_time = time.time() - start_time
     # 5. 显示部分结果示例
     print("\n" + "=" * 60)
     print("审查结果示例(前3个):")
@@ -105,26 +120,23 @@ async def main():
         # 错误情况
         if isinstance(review_result, dict) and 'error' in review_result:
             print(f"  错误: {review_result['error']}")
-        # 无问题或有问题列表
-        elif isinstance(review_result, list):
-            if len(review_result) == 0:
-                print("  审查结果: 无明显问题。")
+        # 正常结果(字典格式:二级目录->编号列表)
+        elif isinstance(review_result, dict):
+            if not review_result:
+                print("  审查结果: 无数据")
             else:
-                print("  审查结果(前3条问题):")
-                for issue in review_result[:3]:
-                    issue_point = issue.get('issue_point', '')
-                    location = issue.get('location', '')
-                    risk_level = issue.get('risk_level', '')
-                    print(f"    - 问题: {issue_point}")
-                    if location:
-                        print(f"      位置: {location}")
-                    if risk_level:
-                        print(f"      风险等级: {risk_level}")
-                if len(review_result) > 3:
-                    print(f"    ... 还有 {len(review_result) - 3} 条问题")
+                print("  审查结果(包含的要点):")
+                for level2_name, point_numbers in review_result.items():
+                    if isinstance(point_numbers, list) and len(point_numbers) > 0:
+                        numbers_str = ', '.join(map(str, point_numbers))
+                        print(f"    - {level2_name}: 要点编号 [{numbers_str}]")
+                    elif isinstance(point_numbers, list):
+                        print(f"    - {level2_name}: 无要点")
+                    else:
+                        print(f"    - {level2_name}: {point_numbers}")
         else:
             print("  审查结果格式未知,无法显示详情。")
-    
+    print(f"\n  审查完成,耗时: {cost_time:.2f}s")
     print("\n" + "=" * 60)
     print("审查完成!")
     print("=" * 60)

+ 1 - 0
core/construction_review/component/check_completeness/requirements.txt → core/construction_review/component/reviewers/check_completeness/requirements.txt

@@ -2,3 +2,4 @@ aiohttp>=3.9.0
 pyyaml>=6.0
 
 
+

+ 1 - 0
core/construction_review/component/check_completeness/utils/__init__.py → core/construction_review/component/reviewers/check_completeness/utils/__init__.py

@@ -3,3 +3,4 @@
 """
 
 
+

+ 1 - 1
core/construction_review/component/check_completeness/utils/file_utils.py → core/construction_review/component/reviewers/check_completeness/utils/file_utils.py

@@ -40,7 +40,7 @@ def read_csv(file_path: str, delimiter: str = '\t') -> List[Dict[str, str]]:
                 rows.append(dict(row))
     except UnicodeDecodeError:
         # 如果UTF-8-SIG失败,尝试UTF-8
-        with open(file_path, 'r', encoding='utf-8') as f:
+        with open(file_path, 'r', encoding='utf-16') as f:
             reader = csv.DictReader(f, delimiter=delimiter)
             for row in reader:
                 rows.append(dict(row))

+ 1 - 0
core/construction_review/component/check_completeness/utils/yaml_utils.py → core/construction_review/component/reviewers/check_completeness/utils/yaml_utils.py

@@ -19,3 +19,4 @@ def read_yaml(file_path: str) -> Dict[str, Any]:
         return yaml.safe_load(f)
 
 
+

+ 7 - 1
core/construction_review/workflows/ai_review_workflow.py

@@ -43,6 +43,7 @@ import asyncio
 import json
 import random
 import re
+from sre_parse import JUMP
 import time
 import os
 from dataclasses import dataclass, asdict
@@ -344,9 +345,13 @@ class AIReviewWorkflow:
             else:
             # 3. 执行大纲审查
                 logger.info(f"开始执行大纲审查")
+
+
                 outline_review_result = await self.ai_review_engine.outline_check(state["callback_task_id"], state["structured_content"],
                                                     state, state.get("stage_name", "大纲审查"))
-                
+                check_completeness_result = await self.ai_review_engine.check_completeness(trace_id_idx = state["callback_task_id"], review_content = state["structured_content"]["chunks"], 
+                        review_references = None, review_location_label = None, state = state, stage_name = state.get("stage_name", "完整性审查"))
+
             # # 4. 执行编制依据审查
             # #await self.core_fun._send_start_review_progress(state, total_units,'prpe_basis')
             reference_check = "reference_check" in self.task_info.get_review_config_list()
@@ -416,6 +421,7 @@ class AIReviewWorkflow:
             all_issues = []
             if completeness_check:
                 all_issues.append(outline_review_result)
+                all_issues.append(check_completeness_result)
             if reference_check and reference_check_result:
                 all_issues.append(reference_check_result)
             if timeliness_check and timeliness_check_result: