Selaa lähdekoodia

dev:添加了内要要点审查模块与并于主体框架完成对接;

ChenJiSheng 1 kuukausi sitten
vanhempi
sitoutus
9ee4a8697d
18 muutettua tiedostoa jossa 1455 lisäystä ja 9 poistoa
  1. 160 9
      core/construction_review/component/ai_review_engine.py
  2. 90 0
      core/construction_review/component/check_completeness/README.md
  3. 5 0
      core/construction_review/component/check_completeness/components/__init__.py
  4. 64 0
      core/construction_review/component/check_completeness/components/data_loader.py
  5. 123 0
      core/construction_review/component/check_completeness/components/llm_client.py
  6. 61 0
      core/construction_review/component/check_completeness/components/prompt_builder.py
  7. 103 0
      core/construction_review/component/check_completeness/components/result_processor.py
  8. 202 0
      core/construction_review/component/check_completeness/components/result_saver.py
  9. 145 0
      core/construction_review/component/check_completeness/components/review_pipeline.py
  10. 49 0
      core/construction_review/component/check_completeness/config/Construction_Plan_Content_Specification.csv
  11. 33 0
      core/construction_review/component/check_completeness/config/llm_api.yaml
  12. 39 0
      core/construction_review/component/check_completeness/config/prompt.yaml
  13. 112 0
      core/construction_review/component/check_completeness/interfaces.py
  14. 135 0
      core/construction_review/component/check_completeness/main.py
  15. 4 0
      core/construction_review/component/check_completeness/requirements.txt
  16. 5 0
      core/construction_review/component/check_completeness/utils/__init__.py
  17. 104 0
      core/construction_review/component/check_completeness/utils/file_utils.py
  18. 21 0
      core/construction_review/component/check_completeness/utils/yaml_utils.py

+ 160 - 9
core/construction_review/component/ai_review_engine.py

@@ -191,6 +191,8 @@ class AIReviewEngine(BaseReviewer):
             Dict[str, Any]: 基础合规性检查结果
         """
         review_content = unit_content['content']
+        with open('temp/review_content.txt', 'a', encoding='utf-8') as f:
+            f.write(str(unit_content))
         #review_references = unit_content.get('review_references')
 
         logger.info(f"basic开始基础合规性检查, 内容长度: {len(review_content)}")
@@ -213,6 +215,10 @@ class AIReviewEngine(BaseReviewer):
             basic_tasks.append(
                 check_with_semaphore(self.check_sensitive, trace_id_idx=trace_id_idx, review_content=review_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name),
             )
+        if 'completeness_check' in self.task_info.get_review_config_list():
+            basic_tasks.append(
+                check_with_semaphore(self.check_completeness, trace_id_idx=trace_id_idx, review_content=unit_content, review_references=None, review_location_label=review_location_label, state=state, stage_name=stage_name),
+            )
 
         # 一次性执行所有任务,避免重复协程调用
         if not basic_tasks:
@@ -229,7 +235,8 @@ class AIReviewEngine(BaseReviewer):
         grammar_result = self._process_review_result(None)
         semantic_result = self._process_review_result(None)
         sensitive_result = self._process_review_result(None)
-
+        completeness_result = self._process_review_result(None)
+        logger.info(f"completeness_result: {completeness_result}")
         result_index = 0
 
         if 'sensitive_word_check' in self.task_info.get_review_config_list():
@@ -246,11 +253,16 @@ class AIReviewEngine(BaseReviewer):
             if result_index < len(results):
                 sensitive_result = self._process_review_result(results[result_index])
             result_index += 1
+        if 'completeness_check' in self.task_info.get_review_config_list():
+            if result_index < len(results):
+                completeness_result = self._process_review_result(results[result_index])
+            result_index += 1
 
         return {
             'grammar_check': grammar_result,
             'semantic_check': semantic_result,
             'sensitive_check': sensitive_result,
+            'completeness_check': completeness_result,
         }
     async def technical_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any],
                                       review_location_label: str,state:str,stage_name:str) -> Dict[str, Any]:
@@ -555,27 +567,166 @@ class AIReviewEngine(BaseReviewer):
         # return await self.review("semantic_logic_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
         #                        None, review_location_label, state, stage_name)
 
-    async def check_completeness(self, trace_id_idx: str, review_content: str, review_references: str,
+    async def check_completeness(self, trace_id_idx: str, review_content: Dict[str, Any], review_references: str,
                                review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
         """
         完整性检查
 
         Args:
             trace_id_idx: 追踪ID索引
-            review_content: 审查内容
+            review_content: 审查内容,单个文档块(chunk)的字典,格式如文档切分预处理结果.json中的chunks项
             review_references: 审查参考信息
             stage_name: 阶段名称
             state: 状态字典
-            current_progress: 当前进度
+            review_location_label: 审查位置标签
 
         Returns:
             Dict[str, Any]: 完整性检查结果
         """
-        reviewer_type = Stage.BASIC.value['reviewer_type']
-        prompt_name = Stage.BASIC.value['completeness']
-        trace_id = prompt_name+trace_id_idx
-        return await self.review("completeness_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
-                               None, review_location_label, state, stage_name)
+        from pathlib import Path
+        import sys
+        import json
+        
+        # 导入check_completeness组件
+        check_completeness_dir = Path(__file__).parent / "check_completeness"
+        if str(check_completeness_dir) not in sys.path:
+            sys.path.insert(0, str(check_completeness_dir))
+        
+        from components.data_loader import CSVDataLoader
+        from components.prompt_builder import PromptBuilder
+        from components.llm_client import LLMClient
+        from components.result_processor import ResultProcessor
+        
+        name = "completeness_check"
+        start_time = time.time()
+        
+        try:
+            # 验证review_content格式
+            if not isinstance(review_content, dict):
+                raise ValueError(f"review_content必须是字典类型,当前类型: {type(review_content)}")
+            
+            # 获取文档块信息
+            doc = review_content
+            chunk_id = doc.get('chunk_id', 'unknown')
+            chapter_classification = doc.get('chapter_classification', '')
+            content = doc.get('content', '')
+            
+            logger.info(f"开始执行 {name} 审查,trace_id: {trace_id_idx}, chunk_id: {chunk_id}, chapter_classification: {chapter_classification}")
+            
+            # 检查必要字段
+            if not chapter_classification:
+                raise ValueError(f"文档块 {chunk_id} 缺少chapter_classification字段")
+            
+            if not content:
+                raise ValueError(f"文档块 {chunk_id} 缺少content字段")
+            
+            # 初始化组件路径
+            base_dir = check_completeness_dir
+            csv_path = base_dir / 'config' / 'Construction_Plan_Content_Specification.csv'
+            prompt_config_path = base_dir / 'config' / 'prompt.yaml'
+            api_config_path = base_dir / 'config' / 'llm_api.yaml'
+            
+            # 加载规范文件
+            data_loader = CSVDataLoader()
+            specification = data_loader.load_specification(str(csv_path))
+            
+            # 获取对应的规范要求
+            requirements = specification.get(chapter_classification, [])
+            if not requirements:
+                raise ValueError(f"未找到标签 {chapter_classification} 对应的规范要求")
+            
+            logger.info(f"找到 {len(requirements)} 个规范要求项")
+            
+            # 初始化组件
+            prompt_builder = PromptBuilder(str(prompt_config_path))
+            llm_client = LLMClient(str(api_config_path))
+            result_processor = ResultProcessor()
+            
+            # 构建提示词
+            prompt = prompt_builder.build_prompt(content, requirements)
+            
+            # 调用LLM
+            logger.info(f"调用LLM进行审查,使用模型: {llm_client.model_type}")
+            llm_response = await llm_client.call_llm(prompt)
+            
+            # 处理结果
+            review_result = result_processor.parse_result(llm_response, requirements)
+            
+            # 构建details字段,包含审查结果
+            details = {
+                'chunk_id': chunk_id,
+                'name': 'completeness_check',
+                'chapter_classification': chapter_classification,
+                'section_label': doc.get('section_label', ''),
+                'requirements_count': len(requirements),
+                'checked_items': len(review_result),
+                'response': review_result[0] if review_result else {},
+            }
+            
+            execution_time = time.time() - start_time
+
+            # 创建ReviewResult对象
+            from core.construction_review.component.reviewers.base_reviewer import ReviewResult
+            result = ReviewResult(
+                success=True,
+                details=details,
+                error_message=None,
+                execution_time=execution_time
+            )
+            # with open('temp/completeness_check_result.json','w',encoding='utf-8') as f:
+            #     json.dump({"details":result.details,"success":result.success,"error_message":result.error_message,"execution_time":result.execution_time},f,ensure_ascii=False,indent=4)
+            # 将审查结果转换为字典格式,添加到issues中
+            review_result_data = {
+                'name': name,
+                'success': result.success,
+                'details': result.details,
+                'error_message': result.error_message,
+                'execution_time': result.execution_time,
+                'timestamp': time.time()
+            }
+            
+            # 推送审查完成信息
+            state_dict = None
+            if state:
+                if isinstance(state, dict):
+                    state_dict = state
+                elif isinstance(state, str):
+                    try:
+                        state_dict = json.loads(state)
+                    except (json.JSONDecodeError, AttributeError):
+                        pass
+            
+            if state_dict and state_dict.get("progress_manager"):
+                asyncio.create_task(
+                    state_dict["progress_manager"].update_stage_progress(
+                        callback_task_id=state_dict.get("callback_task_id"),
+                        stage_name=stage_name,
+                        current=None,
+                        status="processing",
+                        message=f"{name} 要点审查完成 (chunk_id: {chunk_id}), 耗时: {result.execution_time:.2f}s",
+                        issues=[review_result_data],
+                        event_type="processing"
+                    )
+                )
+            logger.info(f"{name} 审查完成 (chunk_id: {chunk_id}), 耗时: {result.execution_time:.2f}s")
+
+            return result
+
+        except Exception as e:
+            execution_time = time.time() - start_time
+            error_msg = f"{name} 审查失败: {str(e)}"
+            logger.error(error_msg, exc_info=True)
+
+            from core.construction_review.component.reviewers.base_reviewer import ReviewResult
+            return ReviewResult(
+                success=False,
+                details={
+                    'chunk_id': review_content.get('chunk_id', 'unknown') if isinstance(review_content, dict) else 'unknown',
+                    'error': str(e)
+                },
+                error_message=error_msg,
+                execution_time=execution_time
+            )
 
     async def check_sensitive(self, trace_id_idx: str, review_content: str, review_references: str,
                             review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:

+ 90 - 0
core/construction_review/component/check_completeness/README.md

@@ -0,0 +1,90 @@
+# 文件要点审查模块
+
+## 功能说明
+
+本模块用于审查施工方案文档的内容完整性,根据规范要求检查文档是否包含所有必需的要点。
+
+## 架构设计
+
+采用面向接口编程的思想,主要包含以下组件:
+
+### 接口层(interfaces.py)
+- `IDataLoader`: 数据加载接口
+- `IPromptBuilder`: 提示词构建接口
+- `ILLMClient`: LLM调用接口
+- `IResultProcessor`: 结果处理接口
+- `IReviewPipeline`: 审查流水线接口
+
+### 组件层(components/)
+- `data_loader.py`: CSV和JSON数据加载实现
+- `prompt_builder.py`: 提示词构建实现
+- `llm_client.py`: LLM API调用实现(支持异步)
+- `result_processor.py`: 结果解析和处理实现
+- `review_pipeline.py`: 审查流水线编排实现
+
+### 工具层(utils/)
+- `file_utils.py`: 文件读写工具
+- `yaml_utils.py`: YAML配置读取工具
+
+## 使用说明
+
+### 1. 安装依赖
+
+```bash
+pip install -r requirements.txt
+```
+
+### 2. 配置说明
+
+#### config/llm_api.yaml
+配置LLM API相关信息:
+- `MODEL_TYPE`: 使用的模型类型(qwen/gemini/deepseek/doubao)
+- 各模型的API配置(URL、模型ID、API Key)
+- `keywords`: 请求参数配置(超时时间、重试次数、并发数等)
+
+#### config/prompt.yaml
+包含提示词模板:
+- `content_review`: 要点审查任务的提示词模板
+
+#### config/Construction_Plan_Content_Specification.csv
+规范文件,包含:
+- 标签:分类标签(basis/overview/plan等)
+- 一级目录:一级目录名称
+- 二级目录:二级目录名称
+- 内容要求:该要点的内容要求
+
+### 3. 运行程序
+
+```bash
+python main.py
+```
+
+### 4. 输出结果
+
+结果保存在 `output/review_results.json`,包含:
+- `total_chunks`: 总文档块数
+- `success_count`: 成功审查数
+- `error_count`: 失败数
+- `results`: 审查结果列表
+
+每个结果包含:
+- 原始文档信息
+- `review_result`: 审查结果字典,key为二级目录名称,value为布尔值(true/false)
+
+## 工作流程
+
+1. **数据加载**:读取CSV规范文件和JSON文档数据
+2. **匹配规范**:根据文档的`chapter_classification`字段匹配对应的规范要求
+3. **构建提示词**:根据待审查内容和规范要求构建LLM提示词
+4. **异步调用LLM**:并发调用LLM API进行审查
+5. **结果处理**:解析LLM返回的JSON结果,转换为标准格式
+6. **输出结果**:保存审查结果到JSON文件
+
+## 注意事项
+
+1. 确保LLM API配置正确,API Key有效
+2. 根据实际情况调整并发数(`concurrent_workers`)
+3. 如果某个文档块缺少`chapter_classification`或`content`字段,会在结果中标记错误
+4. 如果规范文件中没有对应的标签,会在结果中标记错误
+
+

+ 5 - 0
core/construction_review/component/check_completeness/components/__init__.py

@@ -0,0 +1,5 @@
+"""
+组件模块
+"""
+
+

+ 64 - 0
core/construction_review/component/check_completeness/components/data_loader.py

@@ -0,0 +1,64 @@
+"""
+数据加载组件实现
+"""
+from typing import Dict, List, Any
+# 使用相对导入引用项目根目录的模块
+import sys
+from pathlib import Path
+_root = Path(__file__).parent.parent
+if str(_root) not in sys.path:
+    sys.path.insert(0, str(_root))
+from interfaces import IDataLoader
+from utils.file_utils import read_csv, read_json
+
+
+class CSVDataLoader(IDataLoader):
+    """CSV规范数据加载器"""
+    
+    def load_specification(self, csv_path: str) -> Dict[str, List[Dict[str, str]]]:
+        """
+        加载规范文件(CSV)
+        
+        Returns:
+            字典,key为标签值,value为该标签下的二级目录列表
+            格式: {
+                "basis": [
+                    {"二级目录": "法律法规", "内容要求": "法律法规包括..."},
+                    ...
+                ],
+                ...
+            }
+        """
+        rows = read_csv(csv_path, delimiter='\t')
+        
+        specification = {}
+        for row in rows:
+            tag = row.get('标签', '').strip()
+            level2 = row.get('二级目录', '').strip()
+            requirement = row.get('内容要求', '').strip()
+            
+            if not tag:
+                continue
+                
+            if tag not in specification:
+                specification[tag] = []
+            
+            # 只添加有二级目录名称的项(跳过空项)
+            if level2:
+                specification[tag].append({
+                    "二级目录": level2,
+                    "内容要求": requirement
+                })
+        
+        return specification
+    
+    def load_documents(self, json_path: str) -> List[Dict[str, Any]]:
+        """
+        加载文档数据(JSON)
+        
+        Returns:
+            文档块列表
+        """
+        data = read_json(json_path)
+        return data.get('chunks', [])
+

+ 123 - 0
core/construction_review/component/check_completeness/components/llm_client.py

@@ -0,0 +1,123 @@
+"""
+LLM调用组件实现
+"""
+import asyncio
+import json
+import aiohttp
+from typing import Dict, Optional
+import sys
+from pathlib import Path
+
+# 添加项目根目录到路径,支持相对导入
+_root = Path(__file__).parent.parent
+if str(_root) not in sys.path:
+    sys.path.insert(0, str(_root))
+
+from interfaces import ILLMClient
+from utils.yaml_utils import read_yaml
+
+
+class LLMClient(ILLMClient):
+    """LLM客户端"""
+    
+    def __init__(self, api_config_path: str):
+        """
+        初始化LLM客户端
+        
+        Args:
+            api_config_path: llm_api.yaml文件路径
+        """
+        self.config = read_yaml(api_config_path)
+        self.model_type = self.config.get('MODEL_TYPE', 'qwen')
+        self.keywords_config = self.config.get('keywords', {})
+        self._setup_model_config()
+    
+    def _setup_model_config(self):
+        """设置模型配置"""
+        model_config = self.config.get(self.model_type, {})
+        
+        if self.model_type == 'qwen':
+            self.server_url = model_config.get('QWEN_SERVER_URL', '')
+            self.model_id = model_config.get('QWEN_MODEL_ID', '')
+            self.api_key = model_config.get('QWEN_API_KEY', '')
+        elif self.model_type == 'gemini':
+            self.server_url = model_config.get('GEMINI_SERVER_URL', '')
+            self.model_id = model_config.get('GEMINI_MODEL_ID', '')
+            self.api_key = model_config.get('GEMINI_API_KEY', '')
+        elif self.model_type == 'deepseek':
+            self.server_url = model_config.get('DEEPSEEK_SERVER_URL', '')
+            self.model_id = model_config.get('DEEPSEEK_MODEL_ID', '')
+            self.api_key = model_config.get('DEEPSEEK_API_KEY', '')
+        elif self.model_type == 'doubao':
+            self.server_url = model_config.get('DOUBAO_SERVER_URL', '')
+            self.model_id = model_config.get('DOUBAO_MODEL_ID', '')
+            self.api_key = model_config.get('DOUBAO_API_KEY', '')
+        else:
+            raise ValueError(f"不支持的模型类型: {self.model_type}")
+    
+    async def call_llm(self, prompt: Dict[str, str]) -> str:
+        """
+        异步调用LLM API
+        
+        Args:
+            prompt: 提示词字典,包含system和user
+            
+        Returns:
+            LLM返回的文本结果
+        """
+        max_retries = self.keywords_config.get('max_retries', 2)
+        timeout = self.keywords_config.get('timeout', 30)
+        request_payload = self.keywords_config.get('request_payload', {})
+        
+        headers = {
+            'Content-Type': 'application/json',
+            'Authorization': f'Bearer {self.api_key}'
+        }
+        
+        # 构建请求体(OpenAI兼容格式)
+        messages = []
+        if prompt.get('system'):
+            messages.append({
+                'role': 'system',
+                'content': prompt['system']
+            })
+        messages.append({
+            'role': 'user',
+            'content': prompt['user']
+        })
+        
+        payload = {
+            'model': self.model_id,
+            'messages': messages,
+            **request_payload
+        }
+        
+        url = f"{self.server_url.rstrip('/')}/chat/completions"
+        
+        # 重试逻辑
+        for attempt in range(max_retries + 1):
+            try:
+                async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
+                    async with session.post(url, headers=headers, json=payload) as response:
+                        if response.status == 200:
+                            result = await response.json()
+                            return result.get('choices', [{}])[0].get('message', {}).get('content', '')
+                        else:
+                            error_text = await response.text()
+                            if attempt < max_retries:
+                                await asyncio.sleep(1 * (attempt + 1))  # 指数退避
+                                continue
+                            raise Exception(f"API调用失败: {response.status}, {error_text}")
+            except asyncio.TimeoutError:
+                if attempt < max_retries:
+                    await asyncio.sleep(1 * (attempt + 1))
+                    continue
+                raise Exception(f"API调用超时: {timeout}秒")
+            except Exception as e:
+                if attempt < max_retries:
+                    await asyncio.sleep(1 * (attempt + 1))
+                    continue
+                raise e
+        
+        raise Exception("LLM调用失败,已重试所有次数")
+

+ 61 - 0
core/construction_review/component/check_completeness/components/prompt_builder.py

@@ -0,0 +1,61 @@
+"""
+提示词构建组件实现
+"""
+from typing import Dict, List
+import sys
+from pathlib import Path
+
+# 添加项目根目录到路径,支持相对导入
+_root = Path(__file__).parent.parent
+if str(_root) not in sys.path:
+    sys.path.insert(0, str(_root))
+
+from interfaces import IPromptBuilder
+from utils.yaml_utils import read_yaml
+
+
+class PromptBuilder(IPromptBuilder):
+    """提示词构建器"""
+    
+    def __init__(self, prompt_config_path: str):
+        """
+        初始化提示词构建器
+        
+        Args:
+            prompt_config_path: prompt.yaml文件路径
+        """
+        self.config = read_yaml(prompt_config_path)
+        self.review_config = self.config.get('content_review', {})
+    
+    def build_prompt(self, content: str, requirements: List[Dict[str, str]]) -> Dict[str, str]:
+        """
+        构建审查提示词
+        
+        Args:
+            content: 待审查文本内容
+            requirements: 审查要求列表,每个要求包含二级目录名称和内容要求
+            
+        Returns:
+            包含system和user的提示词字典
+        """
+        # 构建审查要点要求字符串
+        requirements_text = ""
+        for req in requirements:
+            level2_name = req.get('二级目录', '')
+            requirement = req.get('内容要求', '')
+            if level2_name:
+                requirements_text += f"- {level2_name}: {requirement}\n"
+        
+        # 获取模板
+        system_template = self.review_config.get('system', '')
+        user_template = self.review_config.get('user_template', '')
+        
+        # 填充模板
+        user_prompt = user_template.replace('{{ content }}', content)
+        user_prompt = user_prompt.replace('{{ requirements }}', requirements_text.strip())
+        
+        return {
+            'system': system_template,
+            'user': user_prompt
+        }
+

+ 103 - 0
core/construction_review/component/check_completeness/components/result_processor.py

@@ -0,0 +1,103 @@
+"""
+结果处理组件实现
+"""
+import json
+import re
+from typing import Dict, List, Any
+import sys
+from pathlib import Path
+
+# 添加项目根目录到路径,支持相对导入
+_root = Path(__file__).parent.parent
+if str(_root) not in sys.path:
+    sys.path.insert(0, str(_root))
+
+from interfaces import IResultProcessor
+
+
+class ResultProcessor(IResultProcessor):
+    """结果处理器"""
+    
+    def parse_result(self, llm_response: str, requirements: List[Dict[str, str]]) -> List[Dict[str, Any]]:
+        """
+        解析LLM返回结果
+        
+        Args:
+            llm_response: LLM返回的文本
+            requirements: 审查要求列表
+            
+        Returns:
+            问题列表,每个问题包含:issue_point, location, suggestion, reason, risk_level
+            如果没有问题,返回空列表
+        """
+        # 处理“无明显问题”这种纯文本输出
+        text = (llm_response or "").strip()
+        if text in ("无明显问题。", "无明显问题", ""):
+            return []
+
+        # 提取JSON部分
+        json_str = self._extract_json(text)
+        if not json_str:
+            # 如果无法提取JSON,视为无结构化问题,返回空列表
+            return []
+
+        try:
+            result = json.loads(json_str)
+        except json.JSONDecodeError:
+            # JSON解析失败,返回空列表
+            return []
+
+        issues: List[Dict[str, Any]] = []
+
+        # 如果结果是单个对象,转换为列表
+        if isinstance(result, dict):
+            result = [result]
+
+        if isinstance(result, list):
+            for item in result:
+                if not isinstance(item, dict):
+                    continue
+                # 规范化字段
+                issue = {
+                    "issue_point": item.get("issue_point", ""),
+                    "location": item.get("location", ""),
+                    "suggestion": item.get("suggestion", ""),
+                    "reason": item.get("reason", ""),
+                    "risk_level": item.get("risk_level", ""),
+                }
+                # 至少要有问题描述才认为是有效问题
+                if issue["issue_point"]:
+                    issues.append(issue)
+
+        return issues
+    
+    def _extract_json(self, text: str) -> str:
+        """
+        从文本中提取JSON字符串
+        
+        Args:
+            text: 原始文本
+            
+        Returns:
+            JSON字符串
+        """
+        # 尝试直接解析
+        text = text.strip()
+        
+        # 查找JSON对象
+        # 匹配 { ... } 格式
+        json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
+        matches = re.findall(json_pattern, text, re.DOTALL)
+        
+        if matches:
+            # 返回最长的匹配(通常是完整的JSON)
+            return max(matches, key=len)
+        
+        # 如果没找到,尝试查找代码块中的JSON
+        code_block_pattern = r'```(?:json)?\s*(\{.*?\})\s*```'
+        code_matches = re.findall(code_block_pattern, text, re.DOTALL)
+        if code_matches:
+            return max(code_matches, key=len)
+        
+        return text
+

+ 202 - 0
core/construction_review/component/check_completeness/components/result_saver.py

@@ -0,0 +1,202 @@
+"""
+结果保存组件
+"""
+from typing import Dict, List, Any
+import sys
+from pathlib import Path
+from collections import defaultdict
+
+# 添加项目根目录到路径,支持相对导入
+_root = Path(__file__).parent.parent
+if str(_root) not in sys.path:
+    sys.path.insert(0, str(_root))
+
+from utils.file_utils import write_csv, write_txt
+
+
+class ResultSaver:
+    """结果保存器"""
+    
+    @staticmethod
+    def save_to_csv(results: List[Dict[str, Any]], 
+                   specification: Dict[str, List[Dict[str, str]]],
+                   output_path: str) -> None:
+        """
+        保存审查结果到CSV文件
+        格式:chunk_id | chapter_classification | section_label | page | issue_point | location | suggestion | reason | risk_level
+        
+        Args:
+            results: 审查结果列表
+            specification: 规范字典,用于确定列的顺序
+            output_path: 输出文件路径
+        """
+        csv_rows = []
+        
+        for result in results:
+            chunk_id = result.get('chunk_id', '')
+            chapter_classification = result.get('chapter_classification', '')
+            section_label = result.get('section_label', '')
+            page = result.get('page', '')
+            review_result = result.get('review_result', [])
+            
+            # 如果审查失败,记录错误信息
+            if isinstance(review_result, dict) and 'error' in review_result:
+                row = {
+                    'chunk_id': chunk_id,
+                    'chapter_classification': chapter_classification,
+                    'section_label': section_label,
+                    'page': page,
+                    'issue_point': f"错误: {review_result['error']}",
+                    'location': '',
+                    'suggestion': '',
+                    'reason': '',
+                    'risk_level': ''
+                }
+                csv_rows.append(row)
+                continue
+            
+            # 如果没有问题(空列表),记录一条“无问题”记录
+            if isinstance(review_result, list) and len(review_result) == 0:
+                row = {
+                    'chunk_id': chunk_id,
+                    'chapter_classification': chapter_classification,
+                    'section_label': section_label,
+                    'page': page,
+                    'issue_point': '无明显问题',
+                    'location': '',
+                    'suggestion': '',
+                    'reason': '',
+                    'risk_level': ''
+                }
+                csv_rows.append(row)
+                continue
+
+            # 如果有问题列表,为每个问题创建一行
+            if isinstance(review_result, list):
+                for issue in review_result:
+                    row = {
+                        'chunk_id': chunk_id,
+                        'chapter_classification': chapter_classification,
+                        'section_label': section_label,
+                        'page': page,
+                        'issue_point': issue.get('issue_point', ''),
+                        'location': issue.get('location', ''),
+                        'suggestion': issue.get('suggestion', ''),
+                        'reason': issue.get('reason', ''),
+                        'risk_level': issue.get('risk_level', '')
+                    }
+                    csv_rows.append(row)
+        
+        # 写入CSV文件(使用逗号分隔符)
+        write_csv(csv_rows, output_path, delimiter=',')
+    
+    @staticmethod
+    def save_statistics(results: List[Dict[str, Any]], 
+                       specification: Dict[str, List[Dict[str, str]]],
+                       output_path: str) -> None:
+        """
+        保存统计结果到TXT文件
+        
+        Args:
+            results: 审查结果列表
+            specification: 规范字典
+            output_path: 输出文件路径
+        """
+        # 统计信息
+        total_chunks = len(results)
+        success_count = 0  # 无问题
+        error_count = 0    # 解析或流程错误
+        issue_count = 0    # 总问题数
+        
+        # 按分类统计
+        classification_stats = defaultdict(lambda: {
+            'total': 0,
+            'no_issues': 0,
+            'has_issues': 0,
+            'errors': 0,
+            'issue_count': 0
+        })
+        
+        # 按风险等级统计
+        risk_level_stats = defaultdict(int)
+        
+        for result in results:
+            chapter_classification = result.get('chapter_classification', '')
+            review_result = result.get('review_result', [])
+            
+            # 错误记录
+            if isinstance(review_result, dict) and 'error' in review_result:
+                error_count += 1
+                classification_stats[chapter_classification]['errors'] += 1
+                classification_stats[chapter_classification]['total'] += 1
+                continue
+            
+            # 无问题
+            if isinstance(review_result, list) and len(review_result) == 0:
+                success_count += 1
+                classification_stats[chapter_classification]['no_issues'] += 1
+                classification_stats[chapter_classification]['total'] += 1
+                continue
+
+            # 有问题
+            if isinstance(review_result, list):
+                issue_count += len(review_result)
+                classification_stats[chapter_classification]['has_issues'] += 1
+                classification_stats[chapter_classification]['issue_count'] += len(review_result)
+                classification_stats[chapter_classification]['total'] += 1
+
+                for issue in review_result:
+                    risk_level = issue.get('risk_level', '')
+                    if risk_level:
+                        risk_level_stats[risk_level] += 1
+        
+        # 生成统计文本
+        lines = []
+        lines.append("=" * 80)
+        lines.append("文件要点审查统计报告")
+        lines.append("=" * 80)
+        lines.append("")
+        
+        # 总体统计
+        lines.append("【总体统计】")
+        lines.append(f"  总文档块数: {total_chunks}")
+        if total_chunks > 0:
+            lines.append(f"  无问题文档数: {success_count} ({success_count/total_chunks*100:.1f}%)")
+            lines.append(f"  存在问题文档数: {total_chunks - success_count - error_count}")
+            lines.append(f"  解析/流程错误数: {error_count}")
+        else:
+            lines.append("  无问题文档数: 0")
+            lines.append("  存在问题文档数: 0")
+            lines.append("  解析/流程错误数: 0")
+        lines.append(f"  总问题数: {issue_count}")
+        lines.append("")
+        
+        # 按分类统计
+        lines.append("【按分类统计】")
+        for classification, stats in sorted(classification_stats.items()):
+            total = stats['total']
+            no_issues = stats['no_issues']
+            has_issues = stats['has_issues']
+            errors = stats['errors']
+            lines.append(f"  {classification}:")
+            lines.append(f"    总数: {total}")
+            lines.append(f"    无问题: {no_issues}")
+            lines.append(f"    存在问题: {has_issues}")
+            lines.append(f"    错误: {errors}")
+            lines.append(f"    问题总数: {stats['issue_count']}")
+            lines.append("")
+        
+        # 按风险等级统计
+        lines.append("【按风险等级统计】")
+        for level, count in risk_level_stats.items():
+            lines.append(f"  {level}: {count}")
+        
+        lines.append("")
+        lines.append("=" * 80)
+        lines.append("统计完成")
+        lines.append("=" * 80)
+        
+        # 写入文件
+        content = "\n".join(lines)
+        write_txt(content, output_path)
+

+ 145 - 0
core/construction_review/component/check_completeness/components/review_pipeline.py

@@ -0,0 +1,145 @@
+"""
+审查流水线实现
+"""
+import asyncio
+from typing import Dict, List, Any
+import sys
+from pathlib import Path
+
+# 添加项目根目录到路径,支持相对导入
+_root = Path(__file__).parent.parent
+if str(_root) not in sys.path:
+    sys.path.insert(0, str(_root))
+
+from interfaces import IReviewPipeline, IPromptBuilder, ILLMClient, IResultProcessor
+
+
+class ReviewPipeline(IReviewPipeline):
+    """审查流水线"""
+    
+    def __init__(self, prompt_builder: IPromptBuilder, 
+                 llm_client: ILLMClient, 
+                 result_processor: IResultProcessor,
+                 max_concurrent: int = 20):
+        """
+        初始化审查流水线
+        
+        Args:
+            prompt_builder: 提示词构建器
+            llm_client: LLM客户端
+            result_processor: 结果处理器
+            max_concurrent: 最大并发数
+        """
+        self.prompt_builder = prompt_builder
+        self.llm_client = llm_client
+        self.result_processor = result_processor
+        self.max_concurrent = max_concurrent
+    
+    async def review(self, documents: List[Dict[str, Any]], 
+                    specification: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, Any]]:
+        """
+        执行审查流程
+        
+        Args:
+            documents: 文档块列表
+            specification: 规范字典
+            
+        Returns:
+            审查结果列表,每个结果包含原始文档信息和审查结果
+        """
+        # 创建信号量控制并发数
+        semaphore = asyncio.Semaphore(self.max_concurrent)
+        
+        # 创建所有任务
+        tasks = []
+        for doc in documents:
+            task = self._review_single_document(doc, specification, semaphore)
+            tasks.append(task)
+        
+        # 并发执行所有任务
+        results = await asyncio.gather(*tasks, return_exceptions=True)
+        
+        # 处理异常结果
+        final_results = []
+        for i, result in enumerate(results):
+            if isinstance(result, Exception):
+                # 如果出现异常,返回错误信息
+                doc = documents[i]
+                final_results.append({
+                    **doc,
+                    'review_result': {
+                        'error': str(result)
+                    }
+                })
+            else:
+                final_results.append(result)
+        
+        return final_results
+    
+    async def _review_single_document(self, doc: Dict[str, Any], 
+                                     specification: Dict[str, List[Dict[str, str]]],
+                                     semaphore: asyncio.Semaphore) -> Dict[str, Any]:
+        """
+        审查单个文档
+        
+        Args:
+            doc: 文档块
+            specification: 规范字典
+            semaphore: 信号量
+            
+        Returns:
+            包含审查结果的文档字典
+        """
+        async with semaphore:
+            try:
+                # 获取文档的分类标签
+                chapter_classification = doc.get('chapter_classification', '')
+                if not chapter_classification:
+                    return {
+                        **doc,
+                        'review_result': {
+                            'error': '缺少chapter_classification字段'
+                        }
+                    }
+                
+                # 获取对应的规范要求
+                requirements = specification.get(chapter_classification, [])
+                if not requirements:
+                    return {
+                        **doc,
+                        'review_result': {
+                            'error': f'未找到标签 {chapter_classification} 对应的规范要求'
+                        }
+                    }
+                
+                # 获取待审查内容
+                content = doc.get('content', '')
+                if not content:
+                    return {
+                        **doc,
+                        'review_result': {
+                            'error': '缺少content字段'
+                        }
+                    }
+                
+                # 构建提示词
+                prompt = self.prompt_builder.build_prompt(content, requirements)
+                
+                # 调用LLM
+                llm_response = await self.llm_client.call_llm(prompt)
+                
+                # 处理结果
+                review_result = self.result_processor.parse_result(llm_response, requirements)
+                
+                return {
+                    **doc,
+                    'review_result': review_result
+                }
+            except Exception as e:
+                return {
+                    **doc,
+                    'review_result': {
+                        'error': str(e)
+                    }
+                }
+

+ 49 - 0
core/construction_review/component/check_completeness/config/Construction_Plan_Content_Specification.csv

@@ -0,0 +1,49 @@
+标签	一级目录	二级目录	内容要求
+basis	编制依据	法律法规	法律法规包括国家、工程所在地省级政府发布的法律法规、规章制度等;
+basis	编制依据	标准规范	标准规范包括行业标准、技术规程等;
+basis	编制依据	文件制度	文件制度包括四川路桥、路桥集团、桥梁公司、建设单位下发的文件制度和管理程序文件等;
+basis	编制依据	编制原则	编制原则应认真贯彻执行国家方针、政策、标准和设计文件,严格执行基本建设程序,实现工程项目的全部功能;
+basis	编制依据	编制范围	编制范围应填写完整,涵盖本方案包含的所有工程,部分工程可简要说明采取的施工工艺。
+overview	工程概况	设计概况	设计概况包含工程简介、主要技术标准两个方面。
+overview	工程概况	工程地质与水文气象	工程地质与水文气象主要包括与该工程有关的水文状况、气候条件等。
+overview	工程概况	周边环境	周边环境主要包括与该工程有关的主要建(构)筑物、山体、边坡、河谷、深基坑、道路、高压电、地下管线的位置关系、结构尺寸等情况
+overview	工程概况	施工平面及立面布置	施工平面及立面布置包括本项目拌和站、钢筋加工场、材料(临时)堆码区域的位置和与该工程的距离,施工作业平台(场站)的尺寸、地面形式以及施工便道的长度、宽度、路面形式、最小弯曲半径,临时用水的来源、管线布置、距离,变压器、配电箱的位置、大小,线路走向,敷设方式等。
+overview	工程概况	施工要求和技术保证条件	施工要求和技术保证条件包含工期目标、质量目标、安全目标、环境目标。工期目标包括本项目的总体工期和本工程的工期,仅需说明起止时间和持续时间。质量目标、安全目标和环境目标应根据施工合同和业主要求填写。
+overview	工程概况	风险辨识与分级	风险辨识与分级包含在施工过程中所有的危险源,并按照法律法规的要求对其进行分级,并说明其应对措施。
+overview	工程概况	参建各方责任主体单位	参建各方责任主体单位主要描述该项目的建设单位、设计单位、监理单位、施工单位、监控单位、专业分包单位的名称。
+plan	施工计划	施工进度计划	施工进度计划包括主要工序作业时间分析、关键工程(工序)节点安排、施工进度计划横道图等。
+plan	施工计划	施工材料计划	施工材料计划包含方案实施过程中需要使用的所有施工措施材料,明确材料名称、规格、数量、重量、来源。
+plan	施工计划	施工设备计划	施工设备计划包含方案实施过程中需要使用的主要机械设备,应明确设备名称、规格、数量、来 源。
+plan	施工计划	劳动力计划	劳动力计划包含各阶段(周、旬、月或季度)不同工种的作业人员投入情况。
+plan	施工计划	安全生产费用使用计划	安全生产费用使用计划包含实施本方案拟投入的安全费用类别、费用名称、单 项投入金额和安全生产费用总额。
+technology	施工工艺技术	主要施工方法概述	主要施工方法概述应简要说明采取的主要施工工艺和施工方法,以及模板等重 要材料的配置数量。
+technology	施工工艺技术	技术参数	技术参数包含主要使用材料的类型、规格,以及主要设备的名称、型号、出厂 时间、性能参数、自重等。
+technology	施工工艺技术	工艺流程	施工准备包含测量放样、临时用水、临时用电、场地、人员、设备、安全防护 措施和人员上下通道等内容。
+technology	施工工艺技术	施工准备	工艺流程包含整个方案的主要施工工序,按照施工的先后顺序
+technology	施工工艺技术	施工方法及操作要求	施工方法及操作要求根据工艺流程中主要的施工工序依次进行描述其操作方法, 并说明施工要点,常见问题及预防、处理措施。
+technology	施工工艺技术	检查要求	检查要求包含所用的材料,构配件进场质量检查、抽查,以及施工过程中各道 工序检查内容及标准。
+safety	安全保证措施	安全保证体系	
+safety	安全保证措施	组织保证措施	组织保证措施包含安全管理组织机构、人员安全职责。
+safety	安全保证措施	技术保证措施	技术保证措施应按总体安全措施,主要工序的安全保证措施进行梳理和说明
+safety	安全保证措施	监测监控措施	监测监控措施包括监测组织机构、监测范围、监测项目、监测点的设置、监测 仪器设备、监测方法、监测频率、预警值及控制值、信息反馈等内容。
+safety	安全保证措施	应急处置措施	应急处置措施包含应急处置程序、应急处置措施、应急物资及设备保障、交通 疏导与医疗救援、后期处置等六个方面。
+quality	质量保证措施	质量保证体系	
+quality	质量保证措施	质量目标	
+quality	质量保证措施	工程创优规划	工程创优规划包含制定工程创优总体计划,做好技术准备工作,加强过程控制,重视细部处理,创建精品工程,推广应用新技术,申报资料、工程资料的收集与整理等内容
+quality	质量保证措施	质量控制程序与具体措施	质量控制程序与具体措施包含原材料、实体工程质量检查验收程序和要求,主 要工序的质量通病、预防措施,以及季节性(冬期、高温、雨期)施工的质量保证 措施。
+environment	环境保证措施	环境保证体系	
+environment	环境保证措施	环境保护组织机构	环境保护组织机构包含管理人员姓名、职务、职责。
+environment	环境保证措施	环境保护及文明施工措施	环境保护及文明施工措施包含办公、生活区环境卫生保证措施,施工区域水土 保持保证措施、噪声污染防治措施、水污染防治措施、大气污染防治措施。
+management	施工管理及作业人员配备与分工	施工管理人员	施工管理人员以表格的形式说明管理人员名单及岗位职责
+management	施工管理及作业人员配备与分工	专职安全生产管理人员	
+management	施工管理及作业人员配备与分工	特种作业人员	
+management	施工管理及作业人员配备与分工	其他作业人员	其他作业人员包含专业分包单位(协作队伍)管理人员数量,不同工种(班组、 区域)的作业人员数量等。
+acceptance	验收要求	验收标准	验收标准包含国家和行业的标准、规范、操作规程、四川路桥、路桥集团和桥 梁公司的管理办法等。
+acceptance	验收要求	验收程序	验收程序包括进场验收、过程验收、阶段验收、完工验收等时间节点的具体验 收程序。
+acceptance	验收要求	验收内容	
+acceptance	验收要求	验收时间	
+acceptance	验收要求	验收人员	验收人员应包括建设、设计、施工、 监理、监测等单位相关人员,并明确验收人员姓名。
+other	其他资料	计算书	
+other	其他资料	相关施工图纸	
+other	其他资料	附图附表	
+other	其他资料	编制及审核人员情况	

+ 33 - 0
core/construction_review/component/check_completeness/config/llm_api.yaml

@@ -0,0 +1,33 @@
+MODEL_TYPE: qwen
+
+gemini:
+  GEMINI_SERVER_URL: https://generativelanguage.googleapis.com/v1beta/openai/
+  GEMINI_MODEL_ID: gemini-2.0-flash
+  GEMINI_API_KEY: YOUR_GEMINI_API_KEY_FOR_RAG_EVAL
+
+deepseek:
+  DEEPSEEK_SERVER_URL: https://api.deepseek.com
+  DEEPSEEK_MODEL_ID: deepseek-chat
+  DEEPSEEK_API_KEY: YOUR_DEEPSEEK_API_KEY_FOR_RAG_EVAL
+
+doubao:
+  DOUBAO_SERVER_URL: https://ark.cn-beijing.volces.com/api/v3/
+  DOUBAO_MODEL_ID: doubao-seed-1-6-flash-250715
+  DOUBAO_API_KEY: YOUR_DOUBAO_API_KEY_FOR_RAG_EVAL
+
+qwen:
+  # QWEN_SERVER_URL: http://192.168.91.253:8003/v1/
+  # QWEN_MODEL_ID: qwen3-30b
+  # QWEN_API_KEY: sk-123456
+  QWEN_SERVER_URL: http://192.168.91.253:9002/v1/
+  QWEN_MODEL_ID: Qwen3-8B
+  QWEN_API_KEY: sk-123456
+
+keywords:
+  timeout: 30
+  max_retries: 2
+  concurrent_workers: 20
+  stream: false
+  request_payload:
+    temperature: 0.3
+    max_tokens: 1024

+ 39 - 0
core/construction_review/component/check_completeness/config/prompt.yaml

@@ -0,0 +1,39 @@
+content_review:
+  system: |
+    你是一名工程与施工领域的专业文档审查专家,负责审查施工方案文档的内容完整性。
+    - 仔细分析待审查文本内容,判断是否包含每个审查要点要求的内容;
+    - 对于每个审查要点,如果文本中明确包含或涵盖了该要点要求的内容,返回true,否则返回false;
+    - 判断要严格但合理,如果文本内容能够满足要点的核心要求,即使表述方式不同,也应判定为true;
+    - 只输出JSON格式,不要添加任何解释性文字;
+
+    - /no_think
+  user_template: |
+    任务:审查施工方案文档内容是否包含所有必需的要点。
+
+    待审查文本内容:
+    {{ content }}
+
+    审查要点要求:
+    {{ requirements }}
+
+    输出格式:务必须严格按照以下标准json格式输出审查结果:
+    如果未发现明显的词句语法错误,请输出:无明显问题。
+    如果发现问题,请按以下格式输出:
+    location字段直接输出原字段内容,不得猜测。
+      - 必须输出一个 JSON 对象(不能是数组、列表或其他结构);
+      - JSON 对象的格式如下:
+      {
+        "issue_point": "[内容缺失]具体问题描述,如:未包含施工进度计划等内容(问题类型必须是:内容缺失)",
+        "location": "问题所在的原始条款内容及位置(如:三、施工方法 (页码: 12)),包含必要的上下文",
+        "suggestion": "基于逻辑规则的具体修改建议(必须是补全内容缺失错误,而非优化表达)",
+        "reason": "详细说明为何这是一个内容缺失错误,包括:1)内容缺失在哪里 2)为何需要补全 3)可能产生的后果",
+        "risk_level": "高风险/中风险/低风险(严格按照系统提示词中的标准判定)"
+      }
+
+
+
+
+
+
+
+

+ 112 - 0
core/construction_review/component/check_completeness/interfaces.py

@@ -0,0 +1,112 @@
+"""
+接口定义模块
+定义各个组件的接口,实现面向接口编程
+"""
+from abc import ABC, abstractmethod
+from typing import Dict, List, Any
+
+
+class IDataLoader(ABC):
+    """数据加载接口"""
+    
+    @abstractmethod
+    def load_specification(self, csv_path: str) -> Dict[str, List[Dict[str, str]]]:
+        """
+        加载规范文件(CSV)
+        
+        Args:
+            csv_path: CSV文件路径
+            
+        Returns:
+            字典,key为标签值,value为该标签下的二级目录列表
+            每个二级目录包含:二级目录名称、内容要求
+        """
+        raise NotImplementedError
+    
+    @abstractmethod
+    def load_documents(self, json_path: str) -> List[Dict[str, Any]]:
+        """
+        加载文档数据(JSON)
+        
+        Args:
+            json_path: JSON文件路径
+            
+        Returns:
+            文档块列表
+        """
+        raise NotImplementedError
+
+
+class IPromptBuilder(ABC):
+    """提示词构建接口"""
+    
+    @abstractmethod
+    def build_prompt(self, content: str, requirements: List[Dict[str, str]]) -> Dict[str, str]:
+        """
+        构建审查提示词
+        
+        Args:
+            content: 待审查文本内容
+            requirements: 审查要求列表,每个要求包含二级目录名称和内容要求
+            
+        Returns:
+            包含system和user的提示词字典
+        """
+        raise NotImplementedError
+
+
+class ILLMClient(ABC):
+    """LLM调用接口"""
+    
+    @abstractmethod
+    async def call_llm(self, prompt: Dict[str, str]) -> str:
+        """
+        异步调用LLM API
+        
+        Args:
+            prompt: 提示词字典,包含system和user
+            
+        Returns:
+            LLM返回的文本结果
+        """
+        raise NotImplementedError
+
+
+class IResultProcessor(ABC):
+    """结果处理接口"""
+    
+    @abstractmethod
+    def parse_result(self, llm_response: str, requirements: List[Dict[str, str]]) -> List[Dict[str, Any]]:
+        """
+        解析LLM返回结果
+        
+        Args:
+            llm_response: LLM返回的文本
+            requirements: 审查要求列表(目前仅用于兼容接口,可选)
+            
+        Returns:
+            问题列表,每个问题包含:issue_point, location, suggestion, reason, risk_level
+            如果没有问题,返回空列表
+        """
+        raise NotImplementedError
+
+
+class IReviewPipeline(ABC):
+    """审查流水线接口"""
+    
+    @abstractmethod
+    async def review(self, documents: List[Dict[str, Any]],
+                     specification: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, Any]]:
+        """
+        执行审查流程
+        
+        Args:
+            documents: 文档块列表
+            specification: 规范字典
+            
+        Returns:
+            审查结果列表,每个结果包含原始文档信息和审查结果
+        """
+        raise NotImplementedError
+
+

+ 135 - 0
core/construction_review/component/check_completeness/main.py

@@ -0,0 +1,135 @@
+"""
+文件要点审查模块主程序
+"""
+import asyncio
+import os
+from pathlib import Path
+from components.data_loader import CSVDataLoader
+from components.prompt_builder import PromptBuilder
+from components.llm_client import LLMClient
+from components.result_processor import ResultProcessor
+from components.review_pipeline import ReviewPipeline
+from components.result_saver import ResultSaver
+from utils.file_utils import write_json
+
+
+async def main():
+    """主函数"""
+    # 配置文件路径
+    base_dir = Path(__file__).parent
+    csv_path = base_dir / 'config' / 'Construction_Plan_Content_Specification.csv'
+    json_path = base_dir / 'data' / '文档切分预处理结果.json'
+    prompt_config_path = base_dir / 'config' / 'prompt.yaml'
+    api_config_path = base_dir / 'config' / 'llm_api.yaml'
+    
+    # 输出路径
+    output_path = base_dir / 'output' / 'review_results.json'
+    output_path.parent.mkdir(exist_ok=True)
+    
+    print("=" * 60)
+    print("文件要点审查模块")
+    print("=" * 60)
+    
+    # 1. 加载数据
+    print("\n[1/5] 加载规范文件...")
+    data_loader = CSVDataLoader()
+    specification = data_loader.load_specification(str(csv_path))
+    print(f"  加载完成,共 {len(specification)} 个标签类别")
+    
+    print("\n[2/5] 加载文档数据...")
+    documents = data_loader.load_documents(str(json_path))
+    print(f"  加载完成,共 {len(documents)} 个文档块")
+    
+    # 2. 初始化组件
+    print("\n[3/5] 初始化组件...")
+    prompt_builder = PromptBuilder(str(prompt_config_path))
+    llm_client = LLMClient(str(api_config_path))
+    result_processor = ResultProcessor()
+    
+    # 获取并发数配置
+    api_config = llm_client.config
+    concurrent_workers = api_config.get('keywords', {}).get('concurrent_workers', 20)
+    
+    review_pipeline = ReviewPipeline(
+        prompt_builder=prompt_builder,
+        llm_client=llm_client,
+        result_processor=result_processor,
+        max_concurrent=concurrent_workers
+    )
+    print("  组件初始化完成")
+    
+    # 3. 执行审查
+    print("\n[4/5] 开始执行审查...")
+    print(f"  使用模型: {llm_client.model_type}")
+    print(f"  最大并发数: {concurrent_workers}")
+    
+    results = await review_pipeline.review(documents, specification)
+    
+    # 统计结果
+    success_count = sum(1 for r in results if 'error' not in r.get('review_result', {}))
+    error_count = len(results) - success_count
+    print(f"\n  审查完成: 成功 {success_count} 个, 失败 {error_count} 个")
+    
+    # 4. 保存结果
+    print("\n[5/5] 保存审查结果...")
+    
+    # 保存JSON格式结果
+    output_data = {
+        'total_chunks': len(results),
+        'success_count': success_count,
+        'error_count': error_count,
+        'results': results
+    }
+    write_json(output_data, str(output_path))
+    print(f"  JSON结果已保存至: {output_path}")
+    
+    # 保存CSV格式结果
+    csv_output_path = base_dir / 'output' / 'review_results.csv'
+    ResultSaver.save_to_csv(results, specification, str(csv_output_path))
+    print(f"  CSV结果已保存至: {csv_output_path}")
+    
+    # 保存统计结果
+    stats_output_path = base_dir / 'output' / 'review_statistics.txt'
+    ResultSaver.save_statistics(results, specification, str(stats_output_path))
+    print(f"  统计结果已保存至: {stats_output_path}")
+    
+    # 5. 显示部分结果示例
+    print("\n" + "=" * 60)
+    print("审查结果示例(前3个):")
+    print("=" * 60)
+    for i, result in enumerate(results[:3]):
+        print(f"\n文档块 {i+1}:")
+        print(f"  chunk_id: {result.get('chunk_id', 'N/A')}")
+        print(f"  chapter_classification: {result.get('chapter_classification', 'N/A')}")
+        review_result = result.get('review_result', {})
+        # 错误情况
+        if isinstance(review_result, dict) and 'error' in review_result:
+            print(f"  错误: {review_result['error']}")
+        # 无问题或有问题列表
+        elif isinstance(review_result, list):
+            if len(review_result) == 0:
+                print("  审查结果: 无明显问题。")
+            else:
+                print("  审查结果(前3条问题):")
+                for issue in review_result[:3]:
+                    issue_point = issue.get('issue_point', '')
+                    location = issue.get('location', '')
+                    risk_level = issue.get('risk_level', '')
+                    print(f"    - 问题: {issue_point}")
+                    if location:
+                        print(f"      位置: {location}")
+                    if risk_level:
+                        print(f"      风险等级: {risk_level}")
+                if len(review_result) > 3:
+                    print(f"    ... 还有 {len(review_result) - 3} 条问题")
+        else:
+            print("  审查结果格式未知,无法显示详情。")
+    
+    print("\n" + "=" * 60)
+    print("审查完成!")
+    print("=" * 60)
+
+
+if __name__ == '__main__':
+    asyncio.run(main())
+

+ 4 - 0
core/construction_review/component/check_completeness/requirements.txt

@@ -0,0 +1,4 @@
+aiohttp>=3.9.0
+pyyaml>=6.0
+
+

+ 5 - 0
core/construction_review/component/check_completeness/utils/__init__.py

@@ -0,0 +1,5 @@
+"""
+工具类模块
+"""
+
+

+ 104 - 0
core/construction_review/component/check_completeness/utils/file_utils.py

@@ -0,0 +1,104 @@
+"""
+文件操作工具类
+"""
+import json
+import csv
+from typing import Dict, List, Any
+
+
+def read_json(file_path: str) -> Any:
+    """
+    读取JSON文件
+    
+    Args:
+        file_path: JSON文件路径
+        
+    Returns:
+        JSON数据
+    """
+    with open(file_path, 'r', encoding='utf-8') as f:
+        return json.load(f)
+
+
+def read_csv(file_path: str, delimiter: str = '\t') -> List[Dict[str, str]]:
+    """
+    读取CSV文件
+    
+    Args:
+        file_path: CSV文件路径
+        delimiter: 分隔符,默认为制表符
+        
+    Returns:
+        字典列表,每个字典代表一行数据
+    """
+    rows = []
+    # 尝试使用UTF-8-SIG编码(支持BOM),如果失败则尝试UTF-8
+    try:
+        with open(file_path, 'r', encoding='utf-8-sig') as f:
+            reader = csv.DictReader(f, delimiter=delimiter)
+            for row in reader:
+                rows.append(dict(row))
+    except UnicodeDecodeError:
+        # 如果UTF-8-SIG失败,尝试UTF-8
+        with open(file_path, 'r', encoding='utf-8') as f:
+            reader = csv.DictReader(f, delimiter=delimiter)
+            for row in reader:
+                rows.append(dict(row))
+    return rows
+
+
+def write_json(data: Any, file_path: str, indent: int = 2) -> None:
+    """
+    写入JSON文件
+    
+    Args:
+        data: 要写入的数据
+        file_path: 输出文件路径
+        indent: 缩进空格数
+    """
+    with open(file_path, 'w', encoding='utf-8') as f:
+        json.dump(data, f, ensure_ascii=False, indent=indent)
+
+
+def write_csv(rows: List[Dict[str, Any]], file_path: str, delimiter: str = '\t') -> None:
+    """
+    写入CSV文件(UTF-8-SIG编码,支持Excel打开)
+    
+    Args:
+        rows: 字典列表,每个字典代表一行数据
+        file_path: 输出文件路径
+        delimiter: 分隔符,默认为制表符
+    """
+    if not rows:
+        return
+    
+    # 获取所有字段名
+    fieldnames = list(rows[0].keys())
+    
+    with open(file_path, 'w', encoding='utf-8-sig', newline='') as f:
+        writer = csv.DictWriter(f, fieldnames=fieldnames, delimiter=delimiter)
+        writer.writeheader()
+        for row in rows:
+            # 将值转换为字符串,处理None值
+            cleaned_row = {}
+            for key, value in row.items():
+                if value is None:
+                    cleaned_row[key] = ''
+                elif isinstance(value, bool):
+                    cleaned_row[key] = '是' if value else '否'
+                else:
+                    cleaned_row[key] = str(value)
+            writer.writerow(cleaned_row)
+
+
+def write_txt(content: str, file_path: str) -> None:
+    """
+    写入TXT文件
+    
+    Args:
+        content: 文本内容
+        file_path: 输出文件路径
+    """
+    with open(file_path, 'w', encoding='utf-8') as f:
+        f.write(content)
+

+ 21 - 0
core/construction_review/component/check_completeness/utils/yaml_utils.py

@@ -0,0 +1,21 @@
+"""
+YAML操作工具类
+"""
+import yaml
+from typing import Dict, Any
+
+
+def read_yaml(file_path: str) -> Dict[str, Any]:
+    """
+    读取YAML文件
+    
+    Args:
+        file_path: YAML文件路径
+        
+    Returns:
+        字典数据
+    """
+    with open(file_path, 'r', encoding='utf-8') as f:
+        return yaml.safe_load(f)
+
+