Prechádzať zdrojové kódy

v0.0.5-功能优化-条文完整性审查
- 按章审查

WangXuMing 1 mesiac pred
rodič
commit
5b8f0650a5

+ 1 - 1
core/construction_review/component/ai_review_engine.py

@@ -753,7 +753,7 @@ class AIReviewEngine(BaseReviewer):
             analyzer = ResultAnalyzer(str(csv_path))
             processed_results = analyzer.process_results(review_results)
             spec_summary_csv_path = Path('temp') / 'document_temp' / '3_spec_review_summary.csv'
-            summary_rows = analyzer.build_spec_summary(processed_results)
+            summary_rows = analyzer.build_spec_summary(processed_results,spec_summary_csv_path)
             logger.info(f"  规范覆盖汇总结果已保存至: {spec_summary_csv_path}")
             summary_rows = pd.DataFrame(summary_rows)
             summary_rows = summary_rows[summary_rows['标签'].isin(review_results_flag)]

+ 8 - 4
core/construction_review/workflows/ai_review_workflow.py

@@ -294,7 +294,7 @@ class AIReviewWorkflow:
             review_func_mapping: Dict[str, Union[str, List[str]]] = {
                 'sensitive_word_check': 'sensitive_word_check',
                 'semantic_logic_check': 'check_semantic_logic',
-                'completeness_check': ['check_completeness', 'outline_check'],
+                'completeness_check': 'check_completeness',
                 'timeliness_check': 'timeliness_basis_reviewer',
                 'reference_check': 'reference_basis_reviewer',
                 'sensitive_check': 'check_sensitive',
@@ -316,11 +316,15 @@ class AIReviewWorkflow:
             original_chunks = state.get("structured_content", {}).get("chunks", [])
 
             # 预处理:根据 review_item_dict_sorted 中的 key 对 structured_content 进行筛选
-            # original_chunks = structured_content.get("chunks", [])
             filtered_chunks = [
                 chunk for chunk in original_chunks
                 if chunk.get("chapter_classification") in review_item_dict_sorted.keys()
             ]
+            # 筛选完整性存在完整性审查的分类,将其整章进行合并
+            filtered_chunks = self.core_fun._merge_chunks_for_completeness_check(
+                filtered_chunks, review_item_dict_sorted
+            )
+
             # with open("temp/filtered_chunks/filtered_chunks.json", "w", encoding="utf-8") as f:
             #     json.dump(filtered_chunks, f, ensure_ascii=False, indent=4)
             # # 更新 chunks 和 structured_content
@@ -334,8 +338,8 @@ class AIReviewWorkflow:
             all_issues = []
             completed_chunks = 0
             chapter_chunks_map, chapter_names = self.core_fun._group_chunks_by_chapter(filtered_chunks)
-            # with open("temp/filtered_chunks/chapter_chunks_map.json", "w", encoding="utf-8") as f:
-            #      json.dump(chapter_chunks_map, f, ensure_ascii=False, indent=4)
+            with open("temp/filtered_chunks/chapter_chunks_map.json", "w", encoding="utf-8") as f:
+                 json.dump(chapter_chunks_map, f, ensure_ascii=False, indent=4)
             logger.info(f"内容分组完成,共 {len(chapter_chunks_map)} 个章节")
             await self.core_fun._send_start_review_progress(state,total_chunks, chapter_names)
             # 6️ 按章节处理

+ 185 - 38
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -59,6 +59,7 @@ class UnitReviewResult():
     technical_compliance: Dict[str, Any]
     rag_enhanced: Dict[str, Any]
     overall_risk: str
+    is_sse_push: bool = True  # 是否成功执行并推送SSE,默认为True
 
 
 class AIReviewCoreFun:
@@ -139,7 +140,9 @@ class AIReviewCoreFun:
             chunk_results = await self._execute_chunk_methods(
                 chapter_code, chunk, global_chunk_index, func_names, state
             )
-
+            if not chunk_results.get('is_sse_push', False):
+                logger.info(f"跳过当前未成功审查块 {chunk_index} 处理完成")                
+                continue  # 跳过未成功执行的块
             # 格式化当前块的结果为issues
             chunk_page = chunk.get('page', '')
             review_location_label = f"第{chunk_page}页:{chunk_label}"
@@ -320,6 +323,7 @@ class AIReviewCoreFun:
         merged_basic = {}
         merged_technical = {}
         merged_rag = {}
+        has_success = False  # 标记是否有成功执行的任务
 
         for result in completed_results:
             if isinstance(result, Exception):
@@ -329,6 +333,9 @@ class AIReviewCoreFun:
             if result and len(result) == 2:
                 func_name, review_result = result
                 if isinstance(review_result, UnitReviewResult):
+                    # 检查是否有成功的任务
+                    if review_result.is_sse_push:
+                        has_success = True
                     # 合并 basic_compliance
                     merged_basic.update(review_result.basic_compliance)
                     # 合并 technical_compliance
@@ -339,7 +346,8 @@ class AIReviewCoreFun:
         return {
             'basic_compliance': merged_basic,
             'technical_compliance': merged_technical,
-            'rag_enhanced': merged_rag
+            'rag_enhanced': merged_rag,
+            'is_sse_push': has_success  # 添加 is_sse_push 字段
         }
 
     async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None, basis_content: dict = None) -> UnitReviewResult:
@@ -356,6 +364,7 @@ class AIReviewCoreFun:
         Returns:
             UnitReviewResult: 单个审查方法的UnitReviewResult对象,包含 basic_compliance 或 technical_compliance
         """
+      
         # 从ai_review_engine获取对应的方法
         if not hasattr(self.ai_review_engine, func_name):
             logger.warning(f"AIReviewEngine中未找到方法: {func_name}")
@@ -366,7 +375,8 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: {"error": f"未找到方法: {func_name}"}},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk="error"
+                overall_risk="error",
+                is_sse_push=True
             )
 
         method = getattr(self.ai_review_engine, func_name)
@@ -377,11 +387,11 @@ class AIReviewCoreFun:
 
         # 获取块内容
         review_content = chunk.get("content", "")
-
+        is_complete_field = chunk.get("is_complete_field", False)
         logger.debug(f"执行审查: {trace_id} -> {func_name}")
 
         # 根据func_name构建对应的参数并调用
-        if func_name == "sensitive_word_check":
+        if func_name == "sensitive_word_check" and not is_complete_field:
             raw_result = await method(trace_id, review_content, state, stage_name)
             # 基础审查方法,放入 basic_compliance
             return UnitReviewResult(
@@ -390,10 +400,11 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        elif func_name == "check_semantic_logic":
+        elif func_name == "check_semantic_logic" and not is_complete_field:
             raw_result = await method(trace_id, review_content, state, stage_name)
             # 基础审查方法,放入 basic_compliance
             return UnitReviewResult(
@@ -402,10 +413,11 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        elif func_name == "check_sensitive":
+        elif func_name == "check_sensitive" and not is_complete_field:
             raw_result = await method(trace_id, review_content, state, stage_name)
             # 基础审查方法,放入 basic_compliance
             return UnitReviewResult(
@@ -414,10 +426,11 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        elif func_name == "check_completeness":
+        elif func_name == "check_completeness" and is_complete_field:
             # check_completeness 需要列表类型,将单个 chunk 包装成列表
             raw_result = await method(trace_id, [chunk], state, stage_name)
             # 基础审查方法,放入 basic_compliance
@@ -427,10 +440,11 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        elif func_name == "check_non_parameter_compliance":
+        elif func_name == "check_non_parameter_compliance" and not is_complete_field:
             # 技术审查方法需要从 RAG 检索结果中获取 references
             raw_result = await self._execute_technical_review(
                 method, trace_id, review_content, chunk, state, stage_name, rag_enhanced_content, func_name
@@ -442,10 +456,11 @@ class AIReviewCoreFun:
                 basic_compliance={},
                 technical_compliance={func_name: raw_result},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        elif func_name == "check_parameter_compliance":
+        elif func_name == "check_parameter_compliance" and not is_complete_field:
             # 技术审查方法需要从 RAG 检索结果中获取 references
             raw_result = await self._execute_technical_review(
                 method, trace_id, review_content, chunk, state, stage_name, rag_enhanced_content, func_name
@@ -457,23 +472,14 @@ class AIReviewCoreFun:
                 basic_compliance={},
                 technical_compliance={func_name: raw_result},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
-        # # outline_check 仍在章节级别处理
-        # elif func_name == "outline_check":
-        #     logger.warning(f"方法 {func_name} 不应在块级别调用,已在主流程中处理")
-        #     return UnitReviewResult(
-        #         unit_index=chunk_index,
-        #         unit_content=chunk,
-        #         basic_compliance={},
-        #         technical_compliance={},
-        #         rag_enhanced={},
-        #         overall_risk="low"
-        #     )
+
 
         # reference_basis_reviewer:编制依据审查(逐块处理)
-        elif func_name == "reference_basis_reviewer":
+        elif func_name == "reference_basis_reviewer" and not is_complete_field:
             review_data = {
                 "content": review_content,  # 原始文本内容
                 "basis_items": basis_content,  # 提取的 BasisItems 对象
@@ -492,11 +498,12 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
         # timeliness_basis_reviewer:时效性审查(逐块处理)
-        elif func_name == "timeliness_basis_reviewer":
+        elif func_name == "timeliness_basis_reviewer" and not is_complete_field:
             review_data = {
                 "content": review_content,  # 原始文本内容
                 "basis_items": basis_content,  # 提取的 BasisItems 对象
@@ -515,18 +522,21 @@ class AIReviewCoreFun:
                 basic_compliance={func_name: raw_result},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk=self._calculate_single_result_risk(raw_result)
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
             )
 
         else:
-            logger.warning(f"未知的审查方法: {func_name},使用默认调用方式")
+            logger.warning(f"未知的审查方法: {func_name}")
+            logger.warning(f"is_complete_field: {is_complete_field}")
             return UnitReviewResult(
                 unit_index=chunk_index,
                 unit_content=chunk,
                 basic_compliance={func_name: {"error": f"未知的审查方法: {func_name}"}},
                 technical_compliance={},
                 rag_enhanced={},
-                overall_risk="error"
+                overall_risk="error",
+                is_sse_push=False
             )
 
     def _calculate_single_result_risk(self, raw_result: Any) -> str:
@@ -1090,7 +1100,7 @@ class AIReviewCoreFun:
             logger.warning(f"发送单元完成进度更新失败: {str(e)}")
             # 发生异常时,尝试返回一个基于 index 的估算值
             try:
-                return int(((unit_index + 1) / total_units) * 100)
+                return int(((unit_index + 1) / total_chunks) * 100)
             except:
                 return 0
 
@@ -1157,11 +1167,11 @@ class AIReviewCoreFun:
         for item in review_item_config:
             key, value = item.split("_", 1)
             review_item_dict.setdefault(key, []).append(value)
-        
+
         # 依据方案标准章节顺序进行排序
-        sgfa_chapter_index_order = ["catalogue", "basis", "overview", "plan","technology", "safety", "quality", "environment", 
+        sgfa_chapter_index_order = ["catalogue", "basis", "overview", "plan","technology", "safety", "quality", "environment",
 "management", "acceptance", "other"]
-        
+
         all_keys = review_item_dict.keys()
         sorted_keys = sorted(
             all_keys,
@@ -1170,4 +1180,141 @@ class AIReviewCoreFun:
         review_item_dict_sorted = {}
         for key in sorted_keys:
             review_item_dict_sorted[key] = review_item_dict[key]
-        return review_item_dict_sorted
+        return review_item_dict_sorted
+
+    def _merge_chunks_for_completeness_check(
+        self,
+        chunks: List[Dict[str, Any]],
+        review_item_dict: Dict[str, List[str]]
+    ) -> List[Dict[str, Any]]:
+        """
+        筛选包含完整性审查的分类,将其整章进行合并
+
+        Args:
+            chunks: 筛选后的chunks列表
+            review_item_dict: 审查项字典 {chapter_code: [func_names]}
+
+        Returns:
+            List[Dict[str, Any]]: 追加合并chunk后的chunks列表,并按标准章节顺序排序
+
+        Note:
+            合并规则:
+            1. 找出包含 'check_completeness' 或 'outline_check' 的章节分类
+            2. 章节定义:chapter字段去除->及其之后的内容作为章节名
+            3. 同章节内按page升序排列,合并content和original_content
+            4. page取最小值
+            5. 合并后的chunk追加到原列表末尾,不删除原chunks
+            6. 增加 is_complete_field 字段标记为合并chunk(即使只有一个chunk也要标记)
+            7. 追加后按 chapter_classification 和标准章节顺序排序
+        """
+        try:
+            # 1. 找出包含完整性审查的章节分类
+            completeness_chapters = set()
+            for chapter_code, func_names in review_item_dict.items():
+                if 'check_completeness' in func_names or 'outline_check' in func_names:
+                    completeness_chapters.add(chapter_code)
+
+            if not completeness_chapters:
+                logger.info("没有包含完整性审查的章节,无需合并")
+                return chunks
+
+            logger.info(f"包含完整性审查的章节分类: {completeness_chapters}")
+
+            # 2. 筛选出需要合并的chunks(属于完整性审查章节的)
+            chunks_to_merge = []
+            for chunk in chunks:
+                chapter_code = chunk.get("chapter_classification", "")
+                if chapter_code in completeness_chapters:
+                    chunks_to_merge.append(chunk)
+
+            if not chunks_to_merge:
+                logger.info("没有找到需要合并的chunks")
+                return chunks
+
+            # 3. 按章节分组(章节定义:去除->及其之后的内容)
+            chapter_groups = {}
+            for chunk in chunks_to_merge:
+                chapter_full = chunk.get("chapter", chunk.get("section_label", ""))
+                # 提取章节名:去除->及其之后的内容
+                chapter_name = chapter_full.split("->")[0].strip() if "->" in chapter_full else chapter_full
+
+                if chapter_name not in chapter_groups:
+                    chapter_groups[chapter_name] = []
+                chapter_groups[chapter_name].append(chunk)
+
+            logger.info(f"按章节分组完成,共 {len(chapter_groups)} 个章节需要合并")
+
+            # 4. 合并每个章节的chunks
+            # 先给所有原chunk添加 is_complete_field: False
+            result_chunks = []
+            for chunk in chunks:
+                chunk_copy = chunk.copy()
+                chunk_copy["is_complete_field"] = False
+                result_chunks.append(chunk_copy)
+
+            for chapter_name, chapter_chunk_list in chapter_groups.items():
+                # 按page升序排列
+                chapter_chunk_list.sort(key=lambda x: int(x.get("page", 0)) if str(x.get("page", 0)).isdigit() else x.get("page", 0))
+
+                # 提取最小page
+                min_page = chapter_chunk_list[0].get("page", 0)
+
+                # 合并content和original_content
+                merged_content = "\n\n".join([
+                    chunk.get("content", "") for chunk in chapter_chunk_list
+                ])
+                merged_original_content = "\n\n".join([
+                    chunk.get("original_content", "") for chunk in chapter_chunk_list
+                ])
+
+                # 创建合并后的chunk(基于第一个chunk,保留所有字段)
+                merged_chunk = chapter_chunk_list[0].copy()
+
+                # 更新核心字段
+                # chunk_id 去除 -> 及其后的内容
+                original_chunk_id = merged_chunk.get('chunk_id', '')
+                clean_chunk_id = original_chunk_id.split("->")[0].strip() if "->" in original_chunk_id else original_chunk_id
+                merged_chunk["chunk_id"] = f"{clean_chunk_id}_merged"
+
+                merged_chunk["chapter"] = chapter_name  # 更新为合并后的章节名
+                merged_chunk["content"] = merged_content
+                merged_chunk["original_content"] = merged_original_content
+                merged_chunk["page"] = min_page
+                merged_chunk["is_complete_field"] = True  # 标记为合并chunk(即使只有一个chunk也要标记)
+
+                # 更新 section_label 和 title
+                merged_chunk["section_label"] = chapter_name
+                merged_chunk["title"] = chapter_name
+
+                # serial_number 设置为空字符串
+                merged_chunk["serial_number"] = ""
+
+                # 保留其他所有字段(如 element_tag, project_plan_type 等)
+                # element_tag 只保留第一个的
+                if "element_tag" in merged_chunk:
+                    merged_chunk["element_tag"] = chapter_chunk_list[0]["element_tag"].copy()
+                    # element_tag 中的 chunk_id 也要去除 -> 及其后的内容
+                    original_element_chunk_id = merged_chunk["element_tag"].get('chunk_id', '')
+                    clean_element_chunk_id = original_element_chunk_id.split("->")[0].strip() if "->" in original_element_chunk_id else original_element_chunk_id
+                    merged_chunk["element_tag"]["chunk_id"] = f"{clean_element_chunk_id}_merged"
+                    # element_tag 中的 serial_number 也设置为空字符串
+                    merged_chunk["element_tag"]["serial_number"] = ""
+
+                # 追加到结果列表
+                result_chunks.append(merged_chunk)
+
+                logger.info(f"合并章节 '{chapter_name}': {len(chapter_chunk_list)} 个chunk -> 1 个合并chunk (page={min_page})")
+
+            # 5. 按页码排序
+            result_chunks.sort(
+                key=lambda x: int(x.get("page", 0)) if str(x.get("page", 0)).isdigit() else x.get("page", 0)
+            )
+
+            logger.info(f"合并完成并按页码排序: 原始 {len(chunks)} 个chunk -> 最终 {len(result_chunks)} 个chunk(包含 {len(result_chunks) - len(chunks)} 个合并chunk)")
+
+            return result_chunks
+
+        except Exception as e:
+            logger.error(f"合并chunks失败: {str(e)}", exc_info=True)
+            # 出错时返回原始列表
+            return chunks