ソースを参照

v0.0.5-功能优化-完整性、编制依据
- 优化条文完整性审查问题数量
- 解决进度百分比错误问题
- 部分优化编制依据问题

WangXuMing 1 ヶ月 前
コミット
c8ad020633

+ 6 - 6
core/construction_review/component/reviewers/check_completeness/components/result_analyzer.py

@@ -13,7 +13,7 @@ if str(_root) not in sys.path:
 
 from interfaces import IResultAnalyzer
 from utils.file_utils import read_csv, write_csv
-
+from foundation.observability.logger.loggering import server_logger as logger
 class ResultAnalyzer(IResultAnalyzer):
     """审查结果汇总分析器"""
 
@@ -196,7 +196,7 @@ class ResultAnalyzer(IResultAnalyzer):
                 content = chapter_content_map.get(tag, "")
             if not section_label:
                 section_label = chapter_section_label_map.get(tag, "")
-
+            
             # 组装输出行(在原规范行基础上增加三列)
             new_row = dict(row)
             new_row["审查到的要点"] = str(found_points)
@@ -238,8 +238,8 @@ class ResultAnalyzer(IResultAnalyzer):
                 continue
 
             sources_raw = row.get("要点来源", "")
-            sources = self._parse_list_field(sources_raw)
-            location = "; ".join(map(str, sources)) if sources else ""
+            #sources = self._parse_list_field(sources_raw)
+            #location = "; ".join(map(str, sources)) if sources else ""
 
             requirement_list = requirement.split(':')[-1].split(';')
             requirement_text = ';'.join([requirement_list[i-1] for i in missing_points])
@@ -252,7 +252,7 @@ class ResultAnalyzer(IResultAnalyzer):
             # 构建问题项并添加到列表
             issue_item = {
                 "issue_point": issue_point,
-                "location": location,
+                "location": row.get("section_label", ""),
                 "suggestion": suggestion,
                 "reason": requirement,
                 "risk_level": risk_level,
@@ -266,7 +266,7 @@ class ResultAnalyzer(IResultAnalyzer):
                     "chapter_code": row.get("标签", ""),
                     "original_content": row.get("content", "")
                 }
-
+        logger.debug(f"build_missing_issue_list_all_issues:{len(all_issues)}")
         # 返回包含问题和元数据的字典,由外层统一格式化
         return {
             "response": all_issues,

+ 20 - 32
core/construction_review/workflows/ai_review_workflow.py

@@ -275,8 +275,7 @@ class AIReviewWorkflow:
         执行流程:
         1. 终止信号检查
         2. 解析审查项配置
-        3. 优先处理大纲审查
-        4. 按章节处理(basis章节 vs 普通章节)
+        3. 开始审查
         5. 汇总结果并构建响应
         """
         try:
@@ -303,56 +302,46 @@ class AIReviewWorkflow:
                 'parameter_compliance_check': 'check_parameter_compliance'
             }
 
-            
+            # 获取审查项配置
             review_item_config_raw = self.task_info.get_review_item_config_list()
-            review_item_config = self.core_fun._replace_review_suffix(review_item_config_raw, review_func_mapping)
-
-            review_item_dict = {}
-            for item in review_item_config:
-                key, value = item.split("_", 1)
-                review_item_dict.setdefault(key, []).append(value)
             
-            # 依据方案标准章节顺序进行排序
-            sgfa_chapter_index_order = ["catalogue", "basis", "overview", "plan","technology", "safety", "quality", "environment", 
-    "management", "acceptance", "other"]
+            # 将review_item_config中的值拆分成chapter_code和func_name 如{['basis':["sensitive_word_check","timeliness_basis_reviewer"]]}
+            review_item_config = self.core_fun._replace_review_suffix(review_item_config_raw, review_func_mapping)
             
-            all_keys = review_item_dict.keys()
-            sorted_keys = sorted(
-                all_keys,
-                key=lambda x :sgfa_chapter_index_order.index(x)
-            )
-            review_item_dict_sorted = {}
-            for key in sorted_keys:
-                review_item_dict_sorted[key] = review_item_dict[key]
+            # 根据标准配置对review_item_config进行排序
+            review_item_dict_sorted = self.core_fun._check_item_mapping_order(review_item_config)
             logger.info(f"审查项配置解析完成: {review_item_dict_sorted}")
 
             # 3️ 获取结构化内容
-            structured_content = state.get("structured_content", {})
+            original_chunks = state.get("structured_content", {}).get("chunks", [])
 
             # 预处理:根据 review_item_dict_sorted 中的 key 对 structured_content 进行筛选
-            original_chunks = structured_content.get("chunks", [])
+            # original_chunks = structured_content.get("chunks", [])
             filtered_chunks = [
                 chunk for chunk in original_chunks
                 if chunk.get("chapter_classification") in review_item_dict_sorted.keys()
             ]
-
-            # 更新 chunks 和 structured_content
-            chunks = filtered_chunks
-            structured_content["chunks"] = chunks
+            # with open("temp/filtered_chunks/filtered_chunks.json", "w", encoding="utf-8") as f:
+            #     json.dump(filtered_chunks, f, ensure_ascii=False, indent=4)
+            # # 更新 chunks 和 structured_content
+            # chunks = filtered_chunks
+            # structured_content["chunks"] = chunks
 
             total_chapters = len(review_item_dict_sorted)
-            total_chunks = len(chunks)
+            total_chunks = len(filtered_chunks)
 
             # 初始化issues列表
             all_issues = []
             completed_chunks = 0
-            chapter_chunks_map = self.core_fun._group_chunks_by_chapter(chunks)
+            chapter_chunks_map, chapter_names = self.core_fun._group_chunks_by_chapter(filtered_chunks)
+            with open("temp/filtered_chunks/chapter_chunks_map.json", "w", encoding="utf-8") as f:
+                 json.dump(chapter_chunks_map, f, ensure_ascii=False, indent=4)
             logger.info(f"内容分组完成,共 {len(chapter_chunks_map)} 个章节")
-
+            await self.core_fun._send_start_review_progress(state,total_chunks, chapter_names)
             # 6️ 按章节处理
-            for chapter_idx, (chapter_code, func_names) in enumerate(review_item_dict.items()):
+            for chapter_idx, (chapter_code, func_names) in enumerate(review_item_dict_sorted.items()):
                 logger.info(f" 处理章节 [{chapter_idx+1}/{total_chapters}]: {chapter_code},包含 {len(func_names)} 个审查任务")
-                logger.info(f"🔍 章节处理前: all_issues数量={len(all_issues)}")  # 调试日志
+                # logger.info(f"🔍 章节处理前: all_issues数量={len(all_issues)}")  # 调试日志
 
                 # 终止信号检查(章节级别)
                 if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
@@ -368,7 +357,6 @@ class AIReviewWorkflow:
                 chunks_completed, all_issues = await self.core_fun._process_chapter_item(
                     chapter_code, chapter_content, func_names, state, all_issues, completed_chunks, total_chunks
                 )
-                logger.info(f"🔍 章节{chapter_code}处理后: all_issues数量={len(all_issues) if all_issues else 0}, chunks_completed={chunks_completed}")  # 调试日志
                 # 更新已完成块数
                 completed_chunks += chunks_completed
 

+ 56 - 53
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -121,12 +121,14 @@ class AIReviewCoreFun:
             Tuple[int, List[Dict]]: (处理的块数量, 更新后的issues列表)
         """
         logger.info(f"📝 处理章节: {chapter_code}, 共 {len(chapter_content)} 个chunk")
-        chapter_total_chunks = len(chapter_content)
+        # chapter_total_chunks = len(chapter_content)
 
         # 按块串行遍历(所有章节统一流程)
         for chunk_index, chunk in enumerate(chapter_content):
-            chunk_label = chunk.get("section_label", f"chunk_{chunk_index}")
-            logger.info(f"  📄 处理块 {chunk_index+1}/{chapter_total_chunks}: {chunk_label}")
+            # 计算全局块索引,避免不同章节间索引重复导致Redis计数错误
+            global_chunk_index = completed_chunks + chunk_index
+            chunk_label = chunk.get("section_label", f"chunk_{global_chunk_index}")
+            logger.info(f"  📄 处理块 {global_chunk_index+1}/{total_chunks}: {chunk_label}")
 
             # 终止信号检查(块级别)
             if await self.workflow_manager.check_terminate_signal(state["callback_task_id"]):
@@ -135,7 +137,7 @@ class AIReviewCoreFun:
 
             # 并发执行当前块的所有审查方法
             chunk_results = await self._execute_chunk_methods(
-                chapter_code, chunk, chunk_index, func_names, state
+                chapter_code, chunk, global_chunk_index, func_names, state
             )
 
             # 格式化当前块的结果为issues
@@ -143,7 +145,7 @@ class AIReviewCoreFun:
             review_location_label = f"第{chunk_page}页:{chunk_label}"
             issues = self.inter_tool._format_review_results_to_issues(
                 callback_task_id=state["callback_task_id"],
-                unit_index=chunk_index,
+                unit_index=global_chunk_index,
                 review_location_label=review_location_label,
                 chapter_code=chapter_code,
                 unit_content=chunk,
@@ -153,9 +155,9 @@ class AIReviewCoreFun:
             )
 
             # 推送当前块的进度
-            current = int(((completed_chunks + chunk_index + 1) / total_chunks) * 100)
+            current = int(((global_chunk_index + 1) / total_chunks) * 100)
             await self._send_unit_review_progress(
-                state, chunk_index, chapter_total_chunks, chunk_label, issues, current
+                state, global_chunk_index, total_chunks, chunk_label, issues, current
             )
 
             # 累积issues
@@ -165,8 +167,8 @@ class AIReviewCoreFun:
             else:
                 logger.warning(f"⚠️ 块{chunk_index}: issues为空,未添加到all_issues")
 
-        logger.info(f"🔍 章节{chapter_code}完成: 总共处理{chapter_total_chunks}个块, all_issues最终数量={len(all_issues)}")
-        return chapter_total_chunks, all_issues
+        logger.info(f"🔍 章节{chapter_code}完成: 总共处理{total_chunks}个块, all_issues最终数量={len(all_issues)}")
+        return total_chunks, all_issues
 
     def _extract_issues_from_result(self, result: Any) -> List[Dict]:
         """
@@ -258,8 +260,8 @@ class AIReviewCoreFun:
                 chapter_map[chapter_code] = []
 
             chapter_map[chapter_code].append(chunk)
-
-        return chapter_map
+            chapter_names = list(chapter_map.keys())
+        return chapter_map, chapter_names
 
     async def _execute_chunk_methods(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_names: List[str], state: AIReviewState) -> Dict[str, Any]:
         """
@@ -951,7 +953,7 @@ class AIReviewCoreFun:
                 overall_risk="error"
             )
 
-    async def _send_start_review_progress(self, state: AIReviewState, total_units: int = None, review_type : str =None) -> None:
+    async def _send_start_review_progress(self, state: AIReviewState, total_units: int, chapter_names: list) -> None:
         """
         发送开始审查的进度更新
 
@@ -959,45 +961,24 @@ class AIReviewCoreFun:
             state: AI审查状态
             total_units: 总审查单元数
         """
-
-
+        #logger.info(f"发送开始审查的进度更新 {chapter_names}")
+        chapter_count = len(chapter_names)
+        chapter_names_str = "、".join(chapter_names)
         try:
-            
-
             if state["progress_manager"]:
-                if  review_type == "outline":
-                    await state["progress_manager"].update_stage_progress(
-                        callback_task_id=state["callback_task_id"],
-                        stage_name="AI审查",
-                        current=0,
-                        status="processing",
-                        message=f"开始大纲审查",
-                        event_type="processing"
-                    )
-                # elif  review_type is "prpe_basis":
-                #     await state["progress_manager"].update_stage_progress(
-                #         callback_task_id=state["callback_task_id"],
-                #         stage_name="AI审查",
-                #         current=0,
-                #         total=total_units,
-                #         status="processing",
-                #         message=f"开始编制依据审查",
-                #         event_type="processing"
-                #     )
-                else:
-                    await state["progress_manager"].update_stage_progress(
-                        callback_task_id=state["callback_task_id"],
-                        stage_name="AI审查",
-                        current=0,
-                        status="processing",
-                        message=f"开始核心审查,共 {total_units} 个审查单元",
-                        event_type="processing"
-                    )
+                await state["progress_manager"].update_stage_progress(
+                    callback_task_id=state["callback_task_id"],
+                    stage_name="AI审查",
+                    current=0,
+                    status="processing",
+                    message=f"开始核心审查,共计{chapter_count}章,{chapter_names_str}, {total_units} 个审查单元",
+                    event_type="processing"
+                )
         except Exception as e:
             logger.warning(f"发送开始进度更新失败: {str(e)}")
 
     async def _send_unit_review_progress(self, state: AIReviewState, unit_index: int,
-                                            total_units: int, section_label: str,
+                                            total_chunks: int, section_label: str,
                                             issues: List[Dict], current: int) -> None:
         """
         发送单元审查详细信息 - 强制串行并统一进度值
@@ -1015,7 +996,7 @@ class AIReviewCoreFun:
                     )
 
                 real_current = await self._send_unit_overall_progress(
-                    state, unit_index, total_units, section_label, issues_count
+                    state, unit_index, total_chunks, section_label, issues_count
                 )
                 
 
@@ -1052,7 +1033,7 @@ class AIReviewCoreFun:
                 logger.error(f"发送单元审查详情失败: {str(e)}")
 
     async def _send_unit_overall_progress(self, state: AIReviewState, unit_index: int,
-                                           total_units: int, section_label: str,
+                                           total_chunks: int, section_label: str,
                                            issues_count: int = None) -> Optional[int]:
         """
         发送单元完成进度更新 - 返回计算出的实时进度
@@ -1078,17 +1059,16 @@ class AIReviewCoreFun:
                 completed_count = await redis_client.scard(completed_key)
                 
                 # 计算进度
-                current_percent = int((completed_count / total_units) * 100)
+                current_percent = int((completed_count / total_chunks) * 100)
             else:
                 # 降级方案
                 completed_count = unit_index + 1
-                current_percent = int((completed_count / total_units) * 100)
-
+                current_percent = int((completed_count / total_chunks) * 100)
             # 构建消息
             if issues_count is not None and issues_count > 0:
-                message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
+                message = f"已完成第 {completed_count}/{total_chunks} 个单元: {section_label}(已发现{issues_count}个问题)"
             else:
-                message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}"
+                message = f"已完成第 {completed_count}/{total_chunks} 个单元: {section_label}"
 
             logger.info(f"进度更新: {current_percent}% - {message}")
 
@@ -1167,4 +1147,27 @@ class AIReviewCoreFun:
                 for val in mapped_value:
                     result.append(f"{prefix}_{val}")
         
-        return result
+        return result
+    
+    def _check_item_mapping_order(self, review_item_config: List[str]) -> Dict[str, List[str]]:
+        """
+        根据标准顺序对项目进行映射排序
+        """
+        review_item_dict = {}
+        for item in review_item_config:
+            key, value = item.split("_", 1)
+            review_item_dict.setdefault(key, []).append(value)
+        
+        # 依据方案标准章节顺序进行排序
+        sgfa_chapter_index_order = ["catalogue", "basis", "overview", "plan","technology", "safety", "quality", "environment", 
+"management", "acceptance", "other"]
+        
+        all_keys = review_item_dict.keys()
+        sorted_keys = sorted(
+            all_keys,
+            key=lambda x :sgfa_chapter_index_order.index(x)
+        )
+        review_item_dict_sorted = {}
+        for key in sorted_keys:
+            review_item_dict_sorted[key] = review_item_dict[key]
+        return review_item_dict_sorted