Просмотр исходного кода

dev:大纲审查模块的j接入整体框架,完成字段对齐;

ChenJiSheng 1 месяц назад
Родитель
Сommit
708f5332b5

+ 87 - 43
core/construction_review/component/ai_review_engine.py

@@ -92,13 +92,7 @@ from .reviewers.check_completeness.utils.file_utils import write_json
 from core.construction_review.component.reviewers.base_reviewer import ReviewResult
 from .reviewers.outline_check import outline_review_results_df
 from .reviewers.check_completeness.utils.redis_csv_utils import (
-    df_store_to_redis,
     get_redis_connection,
-    main,
-    display_redis_data,
-    store_header_to_redis,
-    read_from_redis_and_save_csv,
-    store_row_to_redis,
 )
 @dataclass
 class ReviewResult:
@@ -770,15 +764,20 @@ class AIReviewEngine(BaseReviewer):
             review_results_df['title'] = chapter_labels
             # review_results_df.to_csv(Path('temp') / 'document_temp' / '2_spec_review_results.csv', encoding='utf-8-sig', index=False)
             # csv_file = rf'temp\document_temp\2_spec_review_results.csv'
-            path2 = rf'temp\document_temp\outlines_review_results.csv'
+            # path2 = rf'temp\document_temp\outlines_review_results.csv'
             # data_df = pd.read_csv(csv_file, encoding='utf-8-sig')
             # data_df = review_results_df
-            outline_review_results = outline_review_results_df(data=review_results_df, path=path2)
-            
+            outline_review_results = outline_review_results_df(data=review_results_df)
+  
             logger.info(f"[完整性检查] 准备将大纲审查结果存储到Redis,bind_id: {trace_id_idx}")
             logger.info(f"[完整性检查] 大纲审查结果行数: {len(outline_review_results) if outline_review_results is not None else 'None'}")
-
-            df_store_to_redis(self.redis_client, data=outline_review_results, bind_id=trace_id_idx)
+            outline_review_result = await self.outline_check(
+                trace_id_idx = state["callback_task_id"],
+                outline_content = outline_review_results,
+                state = state,
+                stage_name = state.get("stage_name", "大纲审查")
+            )
+            # df_store_to_redis(self.redis_client, data=outline_review_results, bind_id=trace_id_idx)
             
             logger.info(f"[完整性检查] 数据已成功存储到Redis,bind_id: {trace_id_idx}")
             
@@ -799,7 +798,7 @@ class AIReviewEngine(BaseReviewer):
             analyzer = ResultAnalyzer(str(csv_path))
             processed_results = analyzer.process_results(review_results)
             spec_summary_csv_path = Path('temp') / 'document_temp' / '3_spec_review_summary.csv'
-            summary_rows = analyzer.build_spec_summary(processed_results)
+            summary_rows = analyzer.build_spec_summary(processed_results, spec_summary_csv_path)
             # logger.info(f"  规范覆盖汇总结果已保存至: {spec_summary_csv_path}")
             summary_rows = pd.DataFrame(summary_rows)
             summary_rows = summary_rows[summary_rows['标签'].isin(review_results_flag)]
@@ -808,10 +807,11 @@ class AIReviewEngine(BaseReviewer):
             # 生成缺失要点 JSON 列表,便于前端消费
 
             issues = analyzer.build_missing_issue_list(summary_rows)
-
+            # issues["response"] += outline_review_result
+            # issues["response"].extend(outline_review_result)
             # 包装成外层格式化期望的结构
             execution_time = time.time() - start_time
-            return {
+            check_result = {
                 "details": {
                     "name": "completeness_check",
                     "response": issues.get("response", []),
@@ -822,6 +822,8 @@ class AIReviewEngine(BaseReviewer):
                 "success": True,
                 "execution_time": execution_time
             } 
+            
+            return check_result,trace_id_idx
         except Exception as e:
             execution_time = time.time() - start_time
             error_msg = f"{name} 审查失败: {str(e)}"
@@ -982,8 +984,8 @@ class AIReviewEngine(BaseReviewer):
         return await self.review("parameter_compliance_check", trace_id, reviewer_type, prompt_name, combined_content, review_references,
                                reference_source, state, stage_name, timeout=45, model_name="qwen3_30b")
 
-    async def outline_check(self, trace_id_idx: str, outline_content: Dict[str, Any],
-                                   state:dict,stage_name:str) -> Dict[str, Any]:
+    async def outline_check(self,  outline_content: pd.DataFrame,trace_id_idx: str,
+                                   state:dict =None,stage_name:str =None) -> Dict[str, Any]:
         """
         大纲审查
 
@@ -993,6 +995,7 @@ class AIReviewEngine(BaseReviewer):
             state: 状态
             stage_name: 阶段名称
         """
+        start_time = time.time()
         logger.info(f"开始大纲审查,trace_id: {trace_id_idx}")
 
         # CSV文件路径
@@ -1000,12 +1003,13 @@ class AIReviewEngine(BaseReviewer):
         
         # 存储所有缺失项
         missing_items = []
-        
+        metadata = {}
         try:
             # 从Redis读取并保存为新的CSV文件
-            rows_df = read_from_redis_and_save_csv(self.redis_client, bind_id=trace_id_idx)
-            df = rows_df
-            
+            # rows_df = read_from_redis_and_save_csv(self.redis_client, bind_id=trace_id_idx)
+            df = outline_content
+            # df = merge_results_by_classification(rows_df)
+            df.to_csv(csv_path, encoding='utf-8-sig', index=False)
             # 检查 df 是否为 None
             if df is None:
                 logger.error(f"[大纲审查] Redis中不存在ID '{trace_id_idx}' 的数据,无法进行大纲审查")
@@ -1048,44 +1052,84 @@ class AIReviewEngine(BaseReviewer):
                 # 检查字典中的每个字段
                 if isinstance(merged_results, dict):
                     logger.info(f"开始大纲审查")
+                    
+                    # 获取chapter_label列表
+                    chapter_labels_list = row.get(chapter_label_col, [])
+                    if not isinstance(chapter_labels_list, list):
+                        chapter_labels_list = [str(chapter_labels_list)]
+                    
+                    # 获取review_results_summary字典的所有键,用于reason字段
+                    merged_results_keys = list(merged_results.keys())
+                    merged_results_keys_str = "、".join(merged_results_keys)
+                    
                     for field_name, field_value in merged_results.items():
                         # 检查列表是否为空
                         if isinstance(field_value, list) and len(field_value) == 0:
-                            # 生成缺失项
-                            missing_item = {
-                                "check_item_code": "catalogue_completeness_check",
-                                "check_result": {
+                            # 为chapter_label列表中的每个值创建单独的缺失项
+                            for chapter_label in chapter_labels_list:
+                                missing_item = {
+                                    # "check_item_code": "catalogue_completeness_check",
                                     "issue_point": f"{field_name}缺失",
-                                    "location": "",
-                                    "suggestion": "",
-                                    "reason": "",
-                                    "risk_level": ""
-                                }
-                            }
-                            missing_items.append(missing_item)
+                                    "location": chapter_label,
+                                    "suggestion": f"在待审查目录中未找到与'{field_name}'对应的章节;当前章节仅涉及'{chapter_label}',未涵盖'{field_name}'相关内容;整改建议:建议在本章或前序章节中增设'{field_name}'相关内容,确保与审查规范要求一致。",
+                                    "reason": f"本章应包含{merged_results_keys_str}等{len(merged_results_keys)}个方面。",
+                                    "risk_level": "高风险",
+                                    "review_references": '',
+                                    "reference_source": '',
 
-            
+                                }
+                                missing_items.append(missing_item)
+
+            # if not metadata:
+            #     metadata = {
+            #         "review_location_label": df['chapter_label'].to_list()[-1],
+            #         "chapter_code": "catalogue",
+            #         "original_content": '',
+            #     }
             logger.info(f"大纲审查完成,共发现 {len(missing_items)} 个缺失项")
-            
+            execution_time = time.time() - start_time
         except FileNotFoundError:
             logger.error(f"CSV文件不存在: {csv_path}")
+            execution_time = time.time() - start_time
             return {
-                'outline_review_result': [],
-                'error': f'CSV文件不存在: {csv_path}'
+                "details": {
+                    "name": "outline_check",
+                    "response": [],
+                    "review_location_label": "",
+                    "chapter_code": "catalogue",
+                    "original_content": ""
+                },
+                "success": False,
+                "execution_time": execution_time
             }
         except Exception as e:
             logger.error(f"大纲审查失败: {str(e)}", exc_info=True)
+            execution_time = time.time() - start_time
             return {
-                'outline_review_result': [],
-                'error': f'大纲审查失败: {str(e)}'
+                "details": {
+                    "name": "outline_check",
+                    "response": [],
+                    "review_location_label": "",
+                    "chapter_code": "catalogue",
+                    "original_content": ""
+                },
+                "success": False,
+                "execution_time": execution_time
             }
-        
-        return {
-            'outline_review_result':
-                {
+        logger.info(f"大纲审查完成,耗时 {execution_time:.2f} 秒")
+        outcheck_result = {
+                "details": {
+                    "name": "completeness_check",
                     "response": missing_items,
-                }
-        }
+                    "review_location_label": df['chapter_label'].to_list()[-1],
+                    "chapter_code": 'catalogue',
+                    "original_content": ""
+                },
+                "success": True,
+                "execution_time": execution_time
+            }
+        logger.info(f"大纲审查结果: {outcheck_result}")
+        return outcheck_result
     
     async def reference_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,
                                 state: dict = None, stage_name: str = None) -> Dict[str, Any]:

+ 5 - 1
core/construction_review/component/reviewers/check_completeness/components/result_analyzer.py

@@ -1,6 +1,7 @@
 """
 结果汇总与规范覆盖分析组件
 """
+import json
 from typing import Dict, List, Any, Set
 import ast
 import sys
@@ -274,7 +275,8 @@ class ResultAnalyzer(IResultAnalyzer):
                 "reference_source": reference_source
             }
             all_issues.append(issue_item)
-
+            with open("temp/document_temp/missing_points.json", "w", encoding="utf-8") as f:
+                json.dump(all_issues, f, ensure_ascii=False, indent=4)
             # 收集元数据(从第一行获取)
             if not metadata:
                 metadata = {
@@ -282,6 +284,8 @@ class ResultAnalyzer(IResultAnalyzer):
                     "chapter_code": row.get("标签", ""),
                     "original_content": row.get("content", "")
                 }
+            with open("temp/document_temp/missing_points_metadata.json", "w", encoding="utf-8") as f:
+                json.dump(metadata, f, ensure_ascii=False, indent=4)
         logger.debug(f"build_missing_issue_list_all_issues:{len(all_issues)}")
         # 返回包含问题和元数据的字典,由外层统一格式化
         return {

+ 77 - 23
core/construction_review/component/reviewers/outline_check.py

@@ -29,28 +29,6 @@ def parse_review_result(review_result_str):
         except (ValueError, SyntaxError):
             return {}
 
-def merge_dict_fields_and_deduplicate(group):
-    """
-    合并字典字段的值列表并去重
-    """
-    merged = {}
-    for item in group:
-        if isinstance(item, dict):
-            for key, value in item.items():
-                if key not in merged:
-                    merged[key] = []
-                # 如果值是列表,则合并;否则添加为单个元素
-                if isinstance(value, list):
-                    merged[key].extend(value)
-                else:
-                    merged[key].append(value)
-    # 对每个字段的值去重
-    for key in merged:
-        # 使用dict.fromkeys去重同时保持顺序
-        merged[key] = list(dict.fromkeys(merged[key]))
-    return merged
-
-
 def outline_review_results_df(data, path=None):
     """
     处理大纲审查结果DataFrame,生成合并后的审查结果
@@ -90,7 +68,7 @@ def outline_review_results_df(data, path=None):
             
             # 使用字典推导式合并相同title的字典的字段的值列表并去重
             merged_dict = {
-                title: merge_dict_fields_and_deduplicate(group)
+                title: merge_dict_lists_and_deduplicate(group.tolist())
                 for title, group in grouped_data
             }
             
@@ -123,6 +101,82 @@ def outline_review_results_df(data, path=None):
     except Exception as e:
         logger.error(f"处理大纲审查结果时发生错误: {e}")
 
+
+def merge_dict_lists_and_deduplicate(dict_list):
+    """
+    合并多个字典,其中每个字典的值都是列表,进行顺序去重合并
+    
+    Args:
+        dict_list: 字典列表
+        
+    Returns:
+        合并后的字典,每个键对应的值是去重后的列表
+    """
+    merged = {}
+    for d in dict_list:
+        if isinstance(d, dict):
+            for key, value in d.items():
+                if key not in merged:
+                    merged[key] = []
+                if isinstance(value, list):
+                    merged[key].extend(value)
+                else:
+                    merged[key].append(value)
+    # 对每个字段的值去重(保持顺序)
+    for key in merged:
+        merged[key] = list(dict.fromkeys(merged[key]))
+    return merged
+
+
+def merge_results_by_classification(df):
+    """
+    根据chapter_classification列的标签将review_results_summary列的字典合并,
+    chapter_label列的值合并为列表
+    
+    Args:
+        df: outline_review_results_df返回的DataFrame,包含以下列:
+            - chapter_label: 章节标签
+            - review_results_summary: 审查结果字典(字符串或字典)
+            - chapter_classification: 章节分类
+            
+    Returns:
+        合并后的DataFrame,chapter_classification列值唯一,其他列对应合并
+    """
+    logger.info(f"开始按chapter_classification合并结果,数据行数: {len(df)}")
+    try:
+        # 解析review_results_summary列(如果是字符串)
+        df['parsed_summary'] = df['review_results_summary'].apply(parse_review_result)
+        
+        # 按chapter_classification分组
+        grouped = df.groupby('chapter_classification')
+        
+        # 合并数据
+        result_data = []
+        for classification, group in grouped:
+            # 合并chapter_label为列表
+            chapter_labels = group['chapter_label'].tolist()
+            
+            # 合并review_results_summary字典
+            dict_list = group['parsed_summary'].tolist()
+            merged_dict = merge_dict_lists_and_deduplicate(dict_list)
+            
+            result_data.append({
+                'chapter_classification': classification,
+                'chapter_label': chapter_labels,
+                'review_results_summary': merged_dict
+            })
+        
+        # 创建新的DataFrame
+        result_df = pd.DataFrame(result_data)
+        
+        logger.info(f"合并完成,输出 {len(result_df)} 条记录")
+        return result_df
+        
+    except Exception as e:
+        logger.error(f"按chapter_classification合并结果时发生错误: {e}")
+        raise
+
+
 if __name__ == '__main__':
     csv_file = rf'temp\document_temp\2_spec_review_results.csv'
     path2 = rf'temp\document_temp\outlines_review_results.csv'

+ 7 - 7
core/construction_review/workflows/ai_review_workflow.py

@@ -294,7 +294,7 @@ class AIReviewWorkflow:
             review_func_mapping: Dict[str, Union[str, List[str]]] = {
                 'sensitive_word_check': 'sensitive_word_check',
                 'semantic_logic_check': 'check_semantic_logic',
-                'completeness_check': 'check_completeness',
+                'completeness_check': ['check_completeness','outline_check'],
                 'timeliness_check': 'timeliness_basis_reviewer',
                 'reference_check': 'reference_basis_reviewer',
                 'sensitive_check': 'check_sensitive',
@@ -506,12 +506,12 @@ class AIReviewWorkflow:
                     state = state,
                     stage_name = state.get("stage_name", "完整性审查")
                 )
-                # outline_review_result = {} 
-                outline_review_result = await self.ai_review_engine.outline_check(
-                    trace_id_idx = state["callback_task_id"],
-                    outline_content = state["structured_content"],
-                    state = state,
-                    stage_name = state.get("stage_name", "大纲审查"))
+                outline_review_result = {} 
+                # outline_review_result = await self.ai_review_engine.outline_check(
+                #     trace_id_idx = state["callback_task_id"],
+                #     outline_content = state["structured_content"],
+                #     state = state,
+                #     stage_name = state.get("stage_name", "大纲审查"))
                 # with open(r"temp\document_temp\4_check_completeness_result.json", "w", encoding="utf-8") as f:
                 #     json.dump(check_completeness_result, f, ensure_ascii=False, indent=4)
 

+ 56 - 6
core/construction_review/workflows/core_functions/ai_review_core_fun.py

@@ -43,6 +43,8 @@ from foundation.observability.logger.loggering import server_logger as logger
 from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
 from core.base.task_models import TaskFileInfo
 from ...component.reviewers.utils.inter_tool import InterTool
+# from ...component.reviewers.outline_check import outline_review_results_df, merge_results_by_classification
+from ...component.reviewers.check_completeness.utils.redis_csv_utils import   read_from_redis_and_save_csv
 from ..types import AIReviewState
 
 # 常量定义
@@ -59,6 +61,7 @@ class UnitReviewResult():
     rag_enhanced: Dict[str, Any]
     overall_risk: str
     is_sse_push: bool = True  # 是否成功执行并推送SSE,默认为True
+    trace_id_idx:Optional[str] = None
 
 
 class AIReviewCoreFun:
@@ -122,10 +125,11 @@ class AIReviewCoreFun:
         """
         logger.info(f"📝 处理章节: {chapter_code}, 共 {len(chapter_content)} 个chunk")
         # chapter_total_chunks = len(chapter_content)
-
         # 按块串行遍历(所有章节统一流程)
+        trace_id_idx = None  # 初始化 trace_id_idx
         for chunk_index, chunk in enumerate(chapter_content):
             # 计算全局块索引,避免不同章节间索引重复导致Redis计数错误
+            
             global_chunk_index = completed_chunks + chunk_index
             chunk_label = chunk.get("section_label", f"chunk_{global_chunk_index}")
             logger.info(f"  📄 处理块 {global_chunk_index+1}/{total_chunks}: {chunk_label}")
@@ -135,13 +139,19 @@ class AIReviewCoreFun:
                 logger.warning("块审查检测到终止信号")
                 return chunk_index, all_issues  # 返回已处理的块数和issues
 
+                # continue  # 跳过未成功执行的块
             # 并发执行当前块的所有审查方法
             chunk_results = await self._execute_chunk_methods(
-                chapter_code, chunk, global_chunk_index, func_names, state
+                chapter_code, chunk, global_chunk_index, func_names, state,trace_id_idx
             )
             if not chunk_results.get('is_sse_push', False):
                 logger.info(f"跳过当前未成功审查块 {chunk_index} 处理完成")                
                 continue  # 跳过未成功执行的块
+            
+            if  chunk_results.get('trace_id_idx', None):
+                trace_id_idx = chunk_results.get('trace_id_idx', None)
+            else:
+                trace_id_idx = None
             # 格式化当前块的结果为issues
             chunk_page = chunk.get('page', '')
             review_location_label = f"第{chunk_page}页:{chunk_label}"
@@ -228,7 +238,7 @@ class AIReviewCoreFun:
             chapter_names = list(chapter_map.keys())
         return chapter_map, chapter_names
 
-    async def _execute_chunk_methods(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_names: List[str], state: AIReviewState) -> Dict[str, Any]:
+    async def _execute_chunk_methods(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_names: List[str], state: AIReviewState,trace_id_idx: str =None) -> Dict[str, Any]:
         """
         并发执行单个块的所有审查方法
 
@@ -245,6 +255,7 @@ class AIReviewCoreFun:
         semaphore = asyncio.Semaphore(5)  # 单个块内限制并发数为5
         rag_enhanced_content = None  # 初始化变量,避免作用域错误
         basis_content = None  # 初始化变量,避免作用域错误
+        rows_df = None
         is_complete_field = chunk.get('is_complete_field', False)
         logger.info(f"检查is_complete_field值是否正常: {is_complete_field}")
         # 只有非完整性审查的chunk才执行RAG检索(注意括号位置,确保运算符优先级正确)
@@ -261,11 +272,16 @@ class AIReviewCoreFun:
                 chunk.get('content', ''),
                 chapter_code
             )
+        if 'completeness_check' in func_names and trace_id_idx is not None:
+                logger.debug("开始提取大纲审查数据")
+                rows_df = read_from_redis_and_save_csv(self.redis_client, bind_id=trace_id_idx)
+
+        
         async def execute_with_semaphore(func_name):
             async with semaphore:
                 try:
                     # 创建并执行单个审查任务
-                    result = await self._execute_single_review(chapter_code, chunk, chunk_index, func_name, state,rag_enhanced_content, basis_content)
+                    result = await self._execute_single_review(chapter_code, chunk, chunk_index, func_name, state,rag_enhanced_content, basis_content,rows_df,trace_id_idx)
                     return func_name, result
                 except Exception as e:
                     logger.error(f"审查任务执行失败 [{chapter_code}.chunk{chunk_index}.{func_name}]: {str(e)}")
@@ -277,6 +293,8 @@ class AIReviewCoreFun:
                         rag_enhanced={},
                         overall_risk="error"
                     )
+                
+
 
         # 创建并发任务
         async_tasks = [execute_with_semaphore(func_name) for func_name in func_names]
@@ -284,6 +302,8 @@ class AIReviewCoreFun:
         # 等待当前块所有方法完成
         completed_results = await asyncio.gather(*async_tasks, return_exceptions=True)
 
+
+
         # 合并所有 UnitReviewResult 对象的 basic_compliance 和 technical_compliance
         merged_basic = {}
         merged_technical = {}
@@ -307,6 +327,7 @@ class AIReviewCoreFun:
                     merged_technical.update(review_result.technical_compliance)
                     # 合并 rag_enhanced
                     merged_rag.update(review_result.rag_enhanced)
+                    trace_id_idx = review_result.trace_id_idx    
 
         return {
             'basic_compliance': merged_basic,
@@ -315,7 +336,7 @@ class AIReviewCoreFun:
             'is_sse_push': has_success  # 添加 is_sse_push 字段
         }
 
-    async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None, basis_content: dict = None) -> UnitReviewResult:
+    async def _execute_single_review(self, chapter_code: str, chunk: Dict[str, Any], chunk_index: int, func_name: str, state: AIReviewState,rag_enhanced_content :dict = None, basis_content: dict = None,rows_df = None,trace_id_idx = None) -> UnitReviewResult:
         """
         执行单个块的审查任务
 
@@ -397,7 +418,22 @@ class AIReviewCoreFun:
 
         elif func_name == "check_completeness" and is_complete_field:
             # check_completeness 需要列表类型,将单个 chunk 包装成列表
-            raw_result = await method(trace_id, [chunk], state, stage_name)
+            raw_result,trace_id_idx = await method(trace_id, [chunk], state, stage_name)
+            # 基础审查方法,放入 basic_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: raw_result},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True,
+                trace_id_idx = trace_id_idx
+            )
+        
+        elif func_name == "outline_check" and  trace_id_idx :
+            # check_completeness 需要列表类型,将单个 chunk 包装成列表
+            raw_result = await method(trace_id_idx, rows_df, state, stage_name)
             # 基础审查方法,放入 basic_compliance
             return UnitReviewResult(
                 unit_index=chunk_index,
@@ -408,7 +444,21 @@ class AIReviewCoreFun:
                 overall_risk=self._calculate_single_result_risk(raw_result),
                 is_sse_push=True
             )
+        
 
+        elif func_name == "check_completeness" and is_complete_field:
+            # check_completeness 需要列表类型,将单个 chunk 包装成列表
+            raw_result = await method(trace_id, [chunk], state, stage_name)
+            # 基础审查方法,放入 basic_compliance
+            return UnitReviewResult(
+                unit_index=chunk_index,
+                unit_content=chunk,
+                basic_compliance={func_name: raw_result},
+                technical_compliance={},
+                rag_enhanced={},
+                overall_risk=self._calculate_single_result_risk(raw_result),
+                is_sse_push=True
+            )
         elif func_name == "check_non_parameter_compliance" and not is_complete_field:
             # 技术审查方法需要从 RAG 检索结果中获取 references
             raw_result = await self._execute_technical_review(