{ "success": true, "content": "#!/usr/bin/env python\n# -*- coding: utf-8 -*-\n\n\"\"\"\n@Project : lq-agent-api\n@File : construction_review/ai_review_engine.py\n@IDE : VsCode\n@Author : 王旭明\n@Date : 2025-12-01 11:07:12\n@Description: AI审查引擎核心组件,负责执行各类文档审查任务,支持并发处理和多种审查模式\n\n=================================\n📋 方法总览 (Method Overview)\n=================================\n\n🏗️ 核心审查流程:\n├── basic_compliance_check() # 基础合规性检查 (语法/语义/完整性)\n├── technical_compliance_check() # 技术性合规检查 (标准/设计/参数)\n├── rag_enhanced_check() # RAG增强审查 (向量/混合检索)\n└── _calculate_overall_risk() # 计算总体风险等级\n\n🔍 专项检查方法:\n├── check_grammar() # 词句语法检查\n├── check_semantic_logic() # 语义逻辑检查\n├── check_completeness() # 完整性检查\n├── check_mandatory_standards() # 强制性标准检查\n├── check_design_values() # 设计值检查\n├── check_technical_parameters() # 技术参数检查\n└── prep_basis_review() # 编制依据审查\n\n🔍 RAG检索增强:\n├── vector_search() # 向量检索\n├── hybrid_search() # 混合检索\n├── rerank_results() # 重排序结果\n└── generate_enhanced_suggestions() # 生成增强建议\n\n🛠️ 工具辅助方法:\n├── _process_review_result() # 处理审查结果\n├── _execute_check_with_semaphore() # 信号量并发控制\n├── _calculate_basic_score() # 计算基础得分\n├── _calculate_technical_score() # 计算技术得分\n└── _aggregate_results() # 汇总审查结果\n\n⚙️ 配置管理:\n├── __init__() # 初始化引擎 (max_concurrent_reviews=8)\n└── set_review_location_label() # 设置审查位置标签\n\"\"\"\n\nimport asyncio\nimport concurrent.futures\nimport json\nimport os\nimport time\nfrom dataclasses import dataclass\nfrom enum import Enum\nfrom typing import Any, Dict, List, Optional, Sequence\nimport pandas as pd\nimport json\nimport ast # 用于安全解析字符串为Python对象\nimport pandas as pd\n\nfrom core.base.task_models import TaskFileInfo\nfrom core.construction_review.component.infrastructure.milvus import MilvusConfig, MilvusManager\nfrom core.construction_review.component.infrastructure.parent_tool import (\n enhance_with_parent_docs_grouped,\n extract_query_pairs_results\n)\nfrom core.construction_review.component.infrastructure.relevance import is_relevant_async\nfrom core.construction_review.component.reviewers.base_reviewer import BaseReviewer\nfrom core.construction_review.component.reviewers.utils.text_split import split_text\nfrom foundation.ai.rag.retrieval.entities_enhance import entity_enhance\nfrom foundation.ai.rag.retrieval.query_rewrite import query_rewrite_manager\nfrom foundation.infrastructure.config.config import config_handler\nfrom foundation.observability.logger.loggering import review_logger as logger\nfrom foundation.observability.cachefiles import cache, CacheBaseDir\nfrom core.construction_review.component.reviewers.utils.directory_extraction import BasisItems\n\nfrom pathlib import Path\nimport sys\nimport json\n\n\n\nfrom .reviewers.check_completeness.components.data_loader import CSVDataLoader\nfrom .reviewers.check_completeness.components.prompt_builder import PromptBuilder\nfrom .reviewers.check_completeness.components.llm_client import LLMClient\nfrom .reviewers.check_completeness.components.result_processor import ResultProcessor\nfrom .reviewers.check_completeness.components.review_pipeline import ReviewPipeline\nfrom .reviewers.check_completeness.components.result_saver import ResultSaver\nfrom .reviewers.check_completeness.components.result_analyzer import ResultAnalyzer\nfrom .reviewers.check_completeness.utils.file_utils import write_json\nfrom core.construction_review.component.reviewers.base_reviewer import ReviewResult\nfrom .reviewers.outline_check import outline_review_results_df, get_empty_list_keys\nfrom .reviewers.check_completeness.utils.redis_csv_utils import (\n get_redis_connection,\n)\nfrom .reviewers.catalogues_check.utils import get_redis_manager\nfrom .reviewers.catalogues_check.catalogues_check import CatalogCheckProcessor, remove_common_elements_between_dataframes\n\n@dataclass\nclass ReviewResult:\n \"\"\"审查结果\"\"\"\n unit_index: int\n unit_content: Dict[str, Any]\n basic_compliance: Dict[str, Any]\n technical_compliance: Dict[str, Any]\n rag_enhanced: Dict[str, Any]\n overall_risk: str\n\nclass Stage(Enum):\n \"\"\"工作流状态\"\"\"\n BASIC = {\n 'reviewer_type':'basic',\n 'grammar': 'sensitive_word_check',\n 'sensitive':'sensitive_check',\n 'semantic': 'semantic_logic_check',\n 'completeness': 'completeness_check',\n 'timeliness': 'timeliness_check',\n 'reference': 'reference_check'\n }\n TECHNICAL = {\n 'reviewer_type':'technical',\n 'non_parameter': 'non_parameter_compliance_check',\n 'parameter': 'parameter_compliance_check'\n }\n RAG = {\n 'rag': 'rag_enhanced_review',\n }\n AI = {\n 'reviewer_type':'ai',\n 'professional': 'professional_suggestion',\n 'standardization': 'standardization_suggestion',\n 'completeness': 'completeness_suggestion',\n 'readability': 'readability_suggestion'\n }\n\n\nclass AIReviewEngine(BaseReviewer):\n \"\"\"AI审查引擎 - 支持审查条目并发\"\"\"\n\n def __init__(self, task_file_info: TaskFileInfo = None, max_concurrent_reviews: int = 8):\n \"\"\"\n 初始化AI审查引擎\n\n Args:\n task_file_info: TaskFileInfo 实例,包含任务相关信息\n max_concurrent_reviews: 最大并发审查数量\n \"\"\"\n super().__init__()\n\n # ✅ 保存 TaskFileInfo 实例\n self.task_info = task_file_info\n\n self.file_id = task_file_info.file_id\n self.callback_task_id = task_file_info.callback_task_id\n self.user_id = task_file_info.user_id\n self.review_config = task_file_info.review_config\n self.project_plan_type = task_file_info.project_plan_type\n self.tendency_review_role = task_file_info.tendency_review_role\n\n self.max_concurrent_reviews = max_concurrent_reviews\n self.semaphore = asyncio.Semaphore(max_concurrent_reviews)\n self.milvus_collection = config_handler.get('milvus', 'MILVUS_COLLECTION', 'default')\n\n\n self.milvus = MilvusManager(MilvusConfig())\n self.redis_client = get_redis_connection() # 获取Redis连接\n\n def _process_review_result(self, result):\n \"\"\"\n 处理审查结果,统一转换为字典格式\n\n Args:\n result: 原始审查结果\n\n Returns:\n Dict: 处理后的审查结果字典\n \"\"\"\n if isinstance(result, Exception):\n return {\"error\": str(result), \"success\": False}\n elif hasattr(result, '__dict__'): # ReviewResult对象\n return {\n \"success\": result.success if hasattr(result, 'success') else False,\n \"details\": result.details if hasattr(result, 'details') else {},\n \"error_message\": result.error_message if hasattr(result, 'error_message') else None,\n \"execution_time\": result.execution_time if hasattr(result, 'execution_time') else None\n }\n else:\n return result # 已经是字典\n\n def _execute_check_with_semaphore(self, check_func, *args, **kwargs):\n \"\"\"\n 使用信号量执行检查任务的包装器\n\n Args:\n check_func: 检查函数\n *args: 位置参数\n **kwargs: 关键字参数\n\n Returns:\n 包装后的异步函数\n \"\"\"\n async def wrapped_check():\n async with self.semaphore:\n return await check_func(*args, **kwargs)\n return wrapped_check()\n\n def rag_enhanced_check(self, unit_content: Dict[str, Any]) -> Dict[str, Any]:\n \"\"\"\n RAG增强审查\n\n 流程:\n 1. 提取查询对\n 2. 实体增强检索\n 3. 自动拼接父文档内容\n\n Args:\n unit_content: 待审查单元内容\n\n Returns:\n Dict[str, Any]: RAG增强审查结果\n \"\"\"\n logger.info(f\"[RAG增强] 开始处理, 内容长度: {len(unit_content)}\")\n\n # Step 2: 构建查询对\n query_pairs = query_rewrite_manager.query_extract(unit_content)\n\n # 检查 query_pairs 是否为 None(模型调用失败时的防护)\n if query_pairs is None:\n logger.warning(\"[RAG增强] Query提取失败,返回空结果\")\n return {\n 'vector_search': [],\n 'retrieval_status': 'query_extract_failed',\n 'file_name': '',\n 'text_content': '',\n 'metadata': {},\n 'entity_results': [], # 保持结构一致\n 'total_entities': 0 # 保持结构一致\n }\n\n logger.info(f\"[RAG增强] 提取到 {len(query_pairs)} 个查询对\")\n\n # Step 3: 根据查询对主实体、辅助实体,进行实体增强召回\n bfp_result_lists = entity_enhance.entities_enhance_retrieval(query_pairs)\n\n # Step 4: 检查检索结果\n if not bfp_result_lists:\n logger.warning(\"[RAG增强] 实体检索未返回结果\")\n return {\n 'vector_search': [],\n 'retrieval_status': 'no_results',\n 'file_name': '',\n 'text_content': '',\n 'metadata': {}\n }\n\n # Step 5: 父文档增强(使用分组增强策略)\n try:\n enhancement_result = enhance_with_parent_docs_grouped(\n self.milvus,\n bfp_result_lists,\n score_threshold=0.5, # bfp_rerank_score 阈值\n max_parents_per_pair=3, # 每个查询对最多3个父文档\n max_parent_text_length=8000 # 单个父文档最大8000字符(约5300 tokens)\n )\n enhanced_results = enhancement_result['enhanced_results']\n enhanced_count = enhancement_result['enhanced_count']\n enhanced_pairs = enhancement_result.get('enhanced_pairs', 0)\n total_pairs = enhancement_result.get('total_pairs', 0)\n\n logger.info(f\"[RAG增强] 分组增强完成: {enhanced_pairs}/{total_pairs} 个查询对进行了增强\")\n logger.info(f\"[RAG增强] 成功增强 {enhanced_count} 个结果,使用了 {len(enhancement_result['parent_docs'])} 个父文档\")\n except Exception as e:\n logger.error(f\"[RAG增强] 父文档增强失败: {e}\", exc_info=True)\n # 失败时使用原始结果\n enhanced_results = bfp_result_lists\n\n # Step 5: 提取查询对结果(只保留得分>0.5的结果)\n entity_results = extract_query_pairs_results(enhanced_results, query_pairs, score_threshold=0.5)\n\n # 保存最终结果用于调试\n # with open(rf\"temp\\construction_review/ai_review_engine\\extract_query_pairs_results.json\", \"w\", encoding='utf-8') as f:\n # json.dump(entity_results, f, ensure_ascii=False, indent=4)\n\n # 如果没有结果通过阈值过滤,返回空结果\n if not entity_results:\n logger.warning(\"[RAG增强] 没有结果通过阈值过滤(得分>0.5),返回空结果\")\n return {\n 'vector_search': [],\n 'retrieval_status': 'no_results',\n 'file_name': '',\n 'text_content': '',\n 'metadata': {},\n 'entity_results': [],\n 'total_entities': 0\n }\n\n # 返回格式:返回列表形式的 entity_results\n return {\n 'retrieval_status': 'success',\n 'entity_results': entity_results,\n 'total_entities': len(entity_results)\n }\n\n async def basic_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any],state:Dict[str, Any],stage_name:str) -> Dict[str, Any]:\n \"\"\"\n 基础合规性检查\n\n Args:\n trace_id_idx: 追踪ID索引\n unit_content: 待审查单元内容\n state: 状态字典\n stage_name: 阶段名称\n current_progress: 当前进度\n\n Returns:\n Dict[str, Any]: 基础合规性检查结果\n \"\"\"\n review_content = unit_content['content']\n #review_references = unit_content.get('review_references')\n\n logger.info(f\"basic开始基础合规性检查, 内容长度: {len(review_content)}\")\n\n async def check_with_semaphore(check_func, **kwargs):\n async with self.semaphore:\n return await check_func(**kwargs)\n\n TASK_TIMEOUT = 55\n\n basic_tasks = []\n\n if 'sensitive_word_check' in self.task_info.get_review_config_list():\n basic_tasks.append(\n asyncio.create_task(\n asyncio.wait_for(\n check_with_semaphore(self.sensitive_word_check, trace_id_idx=trace_id_idx, review_content=review_content, state=state, stage_name=stage_name),\n timeout=TASK_TIMEOUT\n )\n )\n )\n if 'semantic_logic_check' in self.task_info.get_review_config_list():\n basic_tasks.append(\n asyncio.create_task(\n asyncio.wait_for(\n check_with_semaphore(self.check_semantic_logic, trace_id_idx=trace_id_idx, review_content=review_content, state=state, stage_name=stage_name),\n timeout=TASK_TIMEOUT\n )\n )\n )\n if 'sensitive_check' in self.task_info.get_review_config_list():\n basic_tasks.append(\n asyncio.create_task(\n asyncio.wait_for(\n check_with_semaphore(self.check_sensitive, trace_id_idx=trace_id_idx, review_content=review_content, state=state, stage_name=stage_name),\n timeout=TASK_TIMEOUT\n )\n )\n )\n # if 'completeness_check' in self.task_info.get_review_config_list():\n # basic_tasks.append(\n # asyncio.create_task(\n # asyncio.wait_for(\n # check_with_semaphore(self.check_completeness, trace_id_idx=trace_id_idx, review_content=unit_content, state=state, stage_name=stage_name),\n # timeout=TASK_TIMEOUT\n # )\n # )\n # )\n\n # 一次性执行所有任务,避免重复协程调用\n if not basic_tasks:\n return {\n \"sensitive_word_check\": self._process_review_result(None),\n \"semantic_logic_check\": self._process_review_result(None),\n \"sensitive_check\": self._process_review_result(None),\n }\n\n # 使用 asyncio.wait 替代 gather,提供更好的超时控制\n # 整体超时时间 = 单个任务超时 + 缓冲时间\n total_timeout = TASK_TIMEOUT * len(basic_tasks) + 10\n\n done, pending = await asyncio.wait(basic_tasks, timeout=total_timeout)\n\n # 取消未完成的任务\n for task in pending:\n task.cancel()\n logger.warning(f\"[基础审查] 任务超时,已取消\")\n\n # 收集结果\n results = []\n for task in done:\n try:\n result = task.result()\n results.append(result)\n except Exception as e:\n logger.error(f\"[基础审查] 任务执行失败: {str(e)}\", exc_info=True)\n logger.error(f\"[基础审查] 异常类型: {type(e).__name__}\")\n logger.error(f\"[基础审查] 异常详情: {repr(e)}\")\n results.append(e)\n\n # 根据配置项分配结果\n grammar_result = self._process_review_result(None)\n semantic_result = self._process_review_result(None)\n sensitive_result = self._process_review_result(None)\n #completeness_result = self._process_review_result(None)\n #logger.info(f\"completeness_result: {completeness_result}\")\n result_index = 0\n\n if 'sensitive_word_check' in self.task_info.get_review_config_list():\n if result_index < len(results):\n grammar_result = self._process_review_result(results[result_index])\n result_index += 1\n cache.ai_review_engine(grammar_result, base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW)\n if 'semantic_logic_check' in self.task_info.get_review_config_list():\n if result_index < len(results):\n semantic_result = self._process_review_result(results[result_index])\n result_index += 1\n\n if 'sensitive_check' in self.task_info.get_review_config_list():\n if result_index < len(results):\n sensitive_result = self._process_review_result(results[result_index])\n result_index += 1\n return {\n 'sensitive_word_check': grammar_result,\n 'semantic_logic_check': semantic_result,\n 'sensitive_check': sensitive_result,\n #'completeness_check': completeness_result,\n }\n\n async def technical_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any],\n state:Dict[str, Any],stage_name:str) -> Dict[str, Any]:\n \"\"\"\n 技术性合规检查(包含RAG增强审查)\n\n 支持基于 entity_results 列表的动态审查任务创建:\n - entity_results 列表中每个查询对都会创建独立的审查任务\n - 支持参数性和非参数性审查的动态任务创建\n\n Args:\n trace_id_idx: 追踪ID索引\n unit_content: 待审查单元内容\n state: 状态字典\n stage_name: 阶段名称\n\n Returns:\n Dict[str, Any]: 技术性合规检查结果\n \"\"\"\n review_content = unit_content['content']\n logger.info(f\"开始技术性合规检查,内容长度: {len(review_content)}\")\n\n # 先执行RAG增强检索,获取相关标准文档作为参考\n logger.info(f\"检查审查项列表:{self.task_info.get_review_config_list():}\")\n entity_results = [] # 初始化 entity_results 列表\n\n if 'non_parameter_compliance_check' in self.task_info.get_review_config_list() or 'parameter_compliance_check' in self.task_info.get_review_config_list():\n logger.info(\"执行专业性审查,开始RAG增强检索\")\n rag_result = self.rag_enhanced_check(review_content)\n entity_results = rag_result.get('entity_results', [])\n logger.info(f\"[RAG增强] 获取到 {len(entity_results)} 个查询对结果\")\n\n # 如果有 entity_results,记录详细信息\n if entity_results:\n for idx, entity_item in enumerate(entity_results):\n logger.info(f\"[RAG增强] 查询对 {idx}: entity={entity_item.get('entity')}, combined_query={entity_item.get('combined_query')[:50]}...\")\n\n async def check_with_semaphore(check_func, **kwargs):\n async with self.semaphore:\n return await check_func(**kwargs)\n\n # 根据配置动态创建技术性检查任务\n technical_tasks = []\n task_mapping = []\n\n TASK_TIMEOUT = 150\n\n # 判断是否需要基于 entity_results 创建动态任务\n if entity_results and len(entity_results) > 0:\n logger.info(f\"[技术审查] 基于 entity_results 创建动态审查任务,共 {len(entity_results)} 个查询对\")\n\n # 为每个查询对创建独立的审查任务\n for idx, entity_item in enumerate(entity_results):\n combined_query = entity_item.get('combined_query', '')\n entity = entity_item.get('entity', f'entity_{idx}')\n text_content = entity_item.get('text_content', '')\n file_name = entity_item.get('file_name', '')\n\n logger.info(f\"[技术审查] 为查询对 {idx} ({entity}) 创建审查任务\")\n\n # 根据配置创建参数性或非参数性审查任务\n if 'parameter_compliance_check' in self.task_info.get_review_config_list():\n # 参数性合规检查\n task_mapping.append(f'parameter_compliance_{idx}')\n technical_tasks.append(\n asyncio.create_task(\n asyncio.wait_for(\n check_with_semaphore(\n self.check_parameter_compliance,\n trace_id_idx=trace_id_idx,\n review_content=review_content,\n review_references=text_content, # 使用查询对的检索结果作为参考\n reference_source=file_name,\n state=state,\n stage_name=stage_name,\n entity_query=combined_query # 传入组合查询条文\n ),\n timeout=TASK_TIMEOUT\n )\n )\n )\n\n if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():\n # 非参数性合规检查\n task_mapping.append(f'non_parameter_compliance_{idx}')\n technical_tasks.append(\n asyncio.create_task(\n asyncio.wait_for(\n check_with_semaphore(\n self.check_non_parameter_compliance,\n trace_id_idx=trace_id_idx,\n review_content=review_content,\n review_references=text_content, # 使用查询对的检索结果作为参考\n reference_source=file_name,\n state=state,\n stage_name=stage_name,\n entity_query=combined_query # 传入组合查询条文\n ),\n timeout=TASK_TIMEOUT\n )\n )\n )\n\n logger.info(f\"[技术审查] 总共创建了 {len(technical_tasks)} 个动态审查任务\")\n\n else:\n # 没有entity_results,直接返回None\n logger.warning(\"[技术审查] 未提取到实体,跳过技术性审查\")\n return None\n\n # 一次性执行所有任务,避免重复协程调用\n\n # 使用 asyncio.wait 替代 gather,提供更好的超时控制\n # 整体超时时间 = 单个任务超时 × 任务数量 + 缓冲时间\n total_timeout = TASK_TIMEOUT * len(technical_tasks) + 10\n\n done, pending = await asyncio.wait(technical_tasks, timeout=total_timeout)\n\n # 取消未完成的任务\n for task in pending:\n task.cancel()\n logger.warning(f\"[技术审查] 任务超时,已取消\")\n\n # 收集结果\n results = []\n for task in done:\n try:\n result = task.result()\n results.append(result)\n except Exception as e:\n logger.error(f\"[技术审查] 任务执行失败: {str(e)}\", exc_info=True)\n logger.error(f\"[技术审查] 异常类型: {type(e).__name__}\")\n logger.error(f\"[技术审查] 异常详情: {repr(e)}\")\n results.append(e)\n\n # 扁平化处理结果:直接返回每个实体的审查结果\n logger.info(f\"[技术审查] 处理审查结果,共 {len(entity_results)} 个实体\")\n\n flattened_results = {}\n result_index = 0\n\n for idx, entity_item in enumerate(entity_results):\n entity = entity_item.get('entity', f'entity_{idx}')\n\n # 提取非参数性审查结果(如果配置了)\n if 'non_parameter_compliance_check' in self.task_info.get_review_config_list():\n key = f'non_parameter_compliance_{entity}_{idx}'\n if result_index < len(results):\n flattened_results[key] = self._process_review_result(results[result_index])\n logger.info(f\"[技术审查] {key} 结果已处理\")\n else:\n logger.warning(f\"[技术审查] {key} 缺少结果\")\n result_index += 1\n\n # 提取参数性审查结果(如果配置了)\n if 'parameter_compliance_check' in self.task_info.get_review_config_list():\n key = f'parameter_compliance_{entity}_{idx}'\n if result_index < len(results):\n flattened_results[key] = self._process_review_result(results[result_index])\n logger.info(f\"[技术审查] {key} 结果已处理\")\n else:\n logger.warning(f\"[技术审查] {key} 缺少结果\")\n result_index += 1\n\n logger.info(f\"[技术审查] 成功处理 {len(flattened_results)} 个审查结果\")\n return flattened_results\n\n async def sensitive_word_check(self, trace_id_idx: str, review_content: str,\n state: str, stage_name: str) -> Dict[str, Any]:\n \"\"\"\n 词句语法检查\n\n Args:\n trace_id_idx: 追踪ID索引\n review_content: 审查内容\n state: 状态字典\n stage_name: 阶段名称\n\n Returns:\n ReviewResult: 语法检查结果\n \"\"\"\n from core.construction_review.component.reviewers.sensitive_word_check import sensitive_word_check_reviewer\n \n # 构造trace_id\n prompt_name = Stage.BASIC.value['grammar']\n trace_id = prompt_name + trace_id_idx\n \n # 调用语法检查审查模块\n result = await sensitive_word_check_reviewer.check_grammar(\n trace_id=trace_id,\n review_content=review_content,\n state=state,\n stage_name=stage_name\n )\n \n return result\n\n async def check_semantic_logic(self, trace_id_idx: str, review_content: str,\n state: str, stage_name: str) -> Dict[str, Any]:\n \"\"\"\n 语义逻辑检查\n\n Args:\n trace_id_idx: 追踪ID索引\n review_content: 审查内容\n state: 状态字典\n stage_name: 阶段名称\n\n Returns:\n ReviewResult: 语义逻辑检查结果\n \"\"\"\n from core.construction_review.component.reviewers.semantic_logic import semantic_logic_reviewer\n \n # 构造trace_id\n prompt_name = Stage.BASIC.value['semantic']\n trace_id = prompt_name + trace_id_idx\n \n # 调用语义逻辑审查模块\n result = await semantic_logic_reviewer.check_semantic_logic(\n trace_id=trace_id,\n review_content=review_content,\n state=state,\n stage_name=stage_name\n )\n \n return result\n \n async def check_completeness(self, trace_id_idx: str, review_content: List[Dict[str, Any]],\n state: str, stage_name: str) -> Dict[str, Any]:\n \"\"\"\n 完整性检查\n\n Args:\n trace_id_idx: 追踪ID索引\n review_content: 审查内容,文档块(chunks)列表\n state: 状态字典\n stage_name: 阶段名称\n\n Returns:\n Dict[str, Any]: 完整性检查结果\n \"\"\"\n\n # with open(r'temp\\structured_content.json', 'w', encoding='utf-8') as f:\n # json.dump(review_content, f, ensure_ascii=False, indent=4)\n name = \"completeness_check\"\n start_time = time.time()\n try:\n # 验证review_content格式\n if not isinstance(review_content, list):\n raise ValueError(f\"review_content必须是列表类型,当前类型: {type(review_content)}\")\n \n # # 获取文档块信息\n # doc = review_content\n # chunk_id = doc.get('chunk_id', 'unknown')\n # chapter_classification = doc.get('chapter_classification', '')\n # content = doc.get('content', '')\n doc = 'doc'\n chunk_id = 'chunk_id'\n chapter_classification = 'chunk_id'\n content = 'chunk_id'\n \n logger.info(f\"开始执行 {name} 审查,trace_id: {trace_id_idx}, chunk_id: {chunk_id}, chapter_classification: {chapter_classification}\")\n \n # 检查必要字段\n if not chapter_classification:\n raise ValueError(f\"文档块 {chunk_id} 缺少chapter_classification字段\")\n \n if not content:\n raise ValueError(f\"文档块 {chunk_id} 缺少content字段\")\n\n\n # 配置文件路径(使用基于当前文件的跨平台相对路径)\n base_dir = Path(__file__).parent / 'reviewers' / 'check_completeness'\n csv_path = base_dir / 'config' / 'Construction_Plan_Content_Specification.csv'\n # json_path = base_dir / 'data' / '文档切分预处理结果.json'\n prompt_config_path = base_dir / 'config' / 'prompt.yaml'\n api_config_path = base_dir / 'config' / 'llm_api.yaml'\n \n logger.info(\"=\" * 60)\n logger.info(\"文件要点审查模块\")\n logger.info(\"=\" * 60)\n \n # 1. 加载数据\n logger.info(\"\\n[1/5] 加载规范文件...\")\n data_loader = CSVDataLoader()\n specification = data_loader.load_specification(str(csv_path))\n logger.info(f\" 加载完成,共 {len(specification)} 个标签类别\")\n \n logger.info(\"\\n[2/5] 加载文档数据...\")\n documents = review_content\n logger.info(f\" 加载完成,共 {len(documents)} 个文档块\")\n \n # 2. 初始化组件\n logger.info(\"\\n[3/5] 初始化组件...\")\n prompt_builder = PromptBuilder(str(prompt_config_path))\n llm_client = LLMClient(str(api_config_path))\n result_processor = ResultProcessor()\n \n # 获取并发数配置\n api_config = llm_client.config\n concurrent_workers = api_config.get('keywords', {}).get('concurrent_workers', 20)\n \n review_pipeline = ReviewPipeline(\n prompt_builder=prompt_builder,\n llm_client=llm_client,\n result_processor=result_processor,\n max_concurrent=concurrent_workers\n )\n logger.info(\" 组件初始化完成\")\n\n\n # 3. 执行审查\n logger.info(\"\\n[4/5] 开始执行审查...\")\n logger.info(f\" 使用模型: {llm_client.model_type}\")\n logger.info(f\" 最大并发数: {concurrent_workers}\")\n\n review_results = await review_pipeline.review(documents, specification)\n review_results_df = pd.DataFrame(review_results)\n chapter_labels = review_results_df['section_label'].str.split('->').str[0]\n review_results_df['title'] = chapter_labels\n review_results_df.to_csv(str(Path(\"temp\") / f'{trace_id_idx}_completeness_review_results.csv'), encoding='utf-8-sig', index=False)\n # 将审查结果存储到Redis,供 outline_check 使用\n logger.info(f\"[完整性检查] 准备将大纲审查结果存储到Redis,bind_id: {trace_id_idx}\")\n from .reviewers.check_completeness.utils.redis_csv_utils import df_store_to_redis\n df_store_to_redis(self.redis_client, data=review_results_df, bind_id=trace_id_idx)\n logger.info(f\"[完整性检查] 数据已成功存储到Redis,bind_id: {trace_id_idx}\")\n\n df_filtered = review_results_df.drop_duplicates(subset='title', keep='first').reset_index(drop=True)\n unique_chapter_labels = chapter_labels.unique().tolist()\n chapter_classifications = df_filtered['chapter_classification']\n review_results_flag = chapter_classifications.unique().tolist()\n\n\n # 统计结果\n success_count = sum(1 for r in review_results if isinstance(r.get('review_result', {}), dict) and 'error' not in r.get('review_result', {}))\n error_count = len(review_results) - success_count\n logger.info(f\"\\n 审查完成: 成功 {success_count} 个, 失败 {error_count} 个\")\n\n # 6. 使用结果解析处理组件,生成规范覆盖汇总表\n logger.info(\"\\n[5/5] 生成规范要点覆盖汇总表...\")\n analyzer = ResultAnalyzer(str(csv_path))\n processed_results = analyzer.process_results(review_results)\n #spec_summary_csv_path = Path('temp') / 'document_temp' / '3_spec_review_summary.csv'\n summary_rows = analyzer.build_spec_summary(processed_results)\n # logger.info(f\" 规范覆盖汇总结果已保存至: {spec_summary_csv_path}\")\n summary_rows = pd.DataFrame(summary_rows)\n summary_rows = summary_rows[summary_rows['标签'].isin(review_results_flag)]\n # summary_rows.to_csv(str(spec_summary_csv_path), encoding='utf-8-sig', index=False)\n summary_rows = summary_rows.to_dict('records')\n # 生成缺失要点 JSON 列表,便于前端消费\n\n issues = analyzer.build_missing_issue_list(summary_rows)\n # with open(r'temp\\document_temp\\2_spec_review_missing_issues.json', 'w', encoding='utf-8') as f:\n # json.dump(issues, f, ensure_ascii=False, indent=4)\n # issues[\"response\"] += outline_review_result\n # issues[\"response\"].extend(outline_review_result)\n # 包装成外层格式化期望的结构\n execution_time = time.time() - start_time\n check_result = {\n \"details\": {\n \"name\": \"completeness_check\",\n \"response\": issues.get(\"response\", []),\n \"review_location_label\": issues.get(\"review_location_label\", \"\"),\n \"chapter_code\": issues.get(\"chapter_code\", \"\"),\n \"original_content\": issues.get(\"original_content\", \"\")\n },\n \"success\": True,\n \"execution_time\": execution_time\n } \n \n return check_result,trace_id_idx\n except Exception as e:\n execution_time = time.time() - start_time\n error_msg = f\"{name} 审查失败: {str(e)}\"\n logger.error(error_msg, exc_info=True)\n\n # 返回包含错误信息的字典,由外层统一格式化\n return {\n 'error': error_msg,\n 'exception': str(e)\n }\n\n async def check_sensitive(self, trace_id_idx: str, review_content: str,\n state: str, stage_name: str) -> Dict[str, Any]:\n \"\"\"\n 敏感信息检查\n\n Args:\n trace_id_idx: 追踪ID索引\n review_content: 审查内容\n state: 状态字典\n stage_name: 阶段名称\n\n Returns:\n Dict[str, Any]: 敏感信息检查结果\n \"\"\"\n from core.construction_review.component.reviewers.utils import (\n check_sensitive_words_async,\n format_check_results\n )\n from foundation.observability.logger.loggering import review_logger as logger\n import time\n \n start_time = time.time()\n trace_id = \"sensitive_check\" + trace_id_idx\n first_results = await check_sensitive_words_async(review_content) # 先使用关键词匹配式审查\n \n # 判断是否检测到敏感词\n if first_results:\n logger.info(f\"检测到 {len(first_results)} 个敏感词,准备送入大模型二审\")\n # 有敏感词,拼接原文与敏感词列表,进入大模型二审\n # 格式化敏感词列表\n sensitive_words_info = []\n for item in first_results:\n sensitive_words_info.append(\n f\"敏感词: {item['word']}, 位置: {item['position']}-{item['end_position']}, 来源: {item['source']}\"\n )\n formatted_sensitive_words = \"\\n\".join(sensitive_words_info)\n \n logger.info(f\"格式化后的敏感词信息:\\n{formatted_sensitive_words}\")\n \n # 调用大模型得到敏感词审查结果\n return await self.review(\"sensitive_check\", trace_id, \"basic\", \"sensitive_word_check\",\n review_content, formatted_sensitive_words,\n None, state, stage_name, timeout=60, model_name=\"qwen3_30b\")\n else:\n # 没有检测到敏感词,构造返回体\n logger.info(\"没有检测到敏感词,未进入二审\")\n from core.construction_review.component.reviewers.base_reviewer import ReviewResult\n \n execution_time = time.time() - start_time\n result = ReviewResult(\n success = True,\n details = {\"name\": \"sensitive_check\", \"response\": \"无明显问题\"},\n error_message = None,\n execution_time = execution_time\n )\n \n # 推送审查完成信息\n if state and state.get(\"progress_manager\"):\n import asyncio\n review_result_data = {\n 'name': 'sensitive_check',\n 'success': result.success,\n 'details': result.details,\n 'error_message': result.error_message,\n 'execution_time': result.execution_time,\n 'timestamp': time.time()\n }\n asyncio.create_task(\n state[\"progress_manager\"].update_stage_progress(\n callback_task_id=state[\"callback_task_id\"],\n stage_name=stage_name,\n current=None,\n status=\"processing\",\n message=f\"sensitive_check 审查完成,未检测到敏感词,耗时: {result.execution_time:.2f}s\",\n issues=[review_result_data],\n event_type=\"processing\"\n )\n )\n \n logger.info(f\"sensitive_check 审查完成,未检测到敏感词,耗时: {result.execution_time:.2f}s\")\n \n return result\n\n async def check_non_parameter_compliance(self, trace_id_idx: str, review_content: str, review_references: str,\n reference_source: str, state: str, stage_name: str,\n entity_query: str = None) -> Dict[str, Any]:\n \"\"\"\n 非参数合规性检查 - 安全相关/强制性条文知识库\n\n Args:\n trace_id_idx: 追踪ID索引\n review_content: 审查内容\n review_references: 审查参考信息\n reference_source: 参考来源\n state: 状态字典\n stage_name: 阶段名称\n entity_query: 实体组合查询条文(可选,来自 entity_results)\n\n Returns:\n Dict[str, Any]: 非参数合规性检查结果\n \"\"\"\n reviewer_type = Stage.TECHNICAL.value['reviewer_type']\n prompt_name = Stage.TECHNICAL.value['non_parameter']\n trace_id = prompt_name+trace_id_idx\n\n # 如果有 entity_query,拼接到 review_content 前面\n if entity_query:\n logger.info(f\"[非参数审查] 使用实体查询条文: {entity_query[:100]}...\")\n combined_content = f\"【实体查询条文】\\n{entity_query}\\n\\n【待审查内容】\\n{review_content}\"\n logger.info(f\"[非参数审查] 组合后内容长度: {len(combined_content)} (原内容: {len(review_content)}, 查询条文: {len(entity_query)})\")\n else:\n combined_content = review_content\n\n return await self.review(\"non_parameter_compliance_check\", trace_id, reviewer_type, prompt_name, combined_content, review_references,\n reference_source, state, stage_name, timeout=45, model_name=\"qwen3_30b\")\n\n async def check_parameter_compliance(self, trace_id_idx: str, review_content: str, review_references: str,\n reference_source: str, state: str, stage_name: str,\n entity_query: str = None) -> Dict[str, Any]:\n \"\"\"\n 参数合规性检查 - 实体概念/工程术语知识库\n\n Args:\n trace_id_idx: 追踪ID索引\n review_content: 审查内容\n review_references: 审查参考信息\n reference_source: 参考来源\n state: 状态字典\n stage_name: 阶段名称\n entity_query: 实体组合查询条文(可选,来自 entity_results)\n\n Returns:\n Dict[str, Any]: 参数合规性检查结果\n \"\"\"\n reviewer_type = Stage.TECHNICAL.value['reviewer_type']\n prompt_name = Stage.TECHNICAL.value['parameter']\n trace_id = prompt_name+trace_id_idx\n\n # 如果有 entity_query,拼接到 review_content 前面\n if entity_query:\n logger.info(f\"[参数审查] 使用实体查询条文: {entity_query[:100]}...\")\n combined_content = f\"【实体查询条文】\\n{entity_query}\\n\\n【待审查内容】\\n{review_content}\"\n logger.info(f\"[参数审查] 组合后内容长度: {len(combined_content)} (原内容: {len(review_content)}, 查询条文: {len(entity_query)})\")\n else:\n combined_content = review_content\n\n return await self.review(\"parameter_compliance_check\", trace_id, reviewer_type, prompt_name, combined_content, review_references,\n reference_source, state, stage_name, timeout=45, model_name=\"qwen3_30b\")\n\n async def outline_check(self, outline_content: pd.DataFrame,trace_id_idx: str,\n state:dict =None,stage_name:str =None) -> Dict[str, Any]:\n \"\"\"\n 大纲审查\n\n Args:\n trace_id_idx: 追踪ID索引\n outline_content: 大纲内容\n state: 状态\n stage_name: 阶段名称\n \"\"\"\n start_time = time.time()\n logger.info(f\"开始大纲审查,trace_id: {trace_id_idx}\")\n\n # CSV文件路径\n csv_path = Path('temp') / 'construction_review' / 'document_temp' / 'outlines_review_results.csv'\n \n # 存储所有缺失项\n missing_items = []\n metadata = {}\n try:\n # 从Redis读取并保存为新的CSV文件\n # 如果传入的 outline_content 为 None,尝试从 Redis 获取数据\n if outline_content is None:\n logger.info(f\"[大纲审查] outline_content 为 None,尝试从 Redis 获取数据 (bind_id: {trace_id_idx})\")\n from .reviewers.check_completeness.utils.redis_csv_utils import read_from_redis_and_save_csv\n df = read_from_redis_and_save_csv(self.redis_client, bind_id=trace_id_idx)\n else:\n df = outline_content\n \n # df = merge_results_by_classification(rows_df)\n # 兼容新旧字段名\n review_results_col = 'review_results_summary' if 'review_results_summary' in df.columns else ('merged_review_results' if 'merged_review_results' in df.columns else 'review_result')\n df['miss_outline'] = df[review_results_col].apply(get_empty_list_keys)\n \n # 兼容 chapter_label 字段名\n chapter_label_col = 'chapter_label' if 'chapter_label' in df.columns else ('section_label_first' if 'section_label_first' in df.columns else 'section_label')\n df['chapter_label'] = df[chapter_label_col]\n \n miss_outline_df = df[['chapter_label', 'chapter_classification', 'miss_outline']]\n \n # 从 Redis 查询目录审查结果,使用 chapter_label 作为 title 查询\n redis_manager = get_redis_manager()\n chapter_labels = miss_outline_df['chapter_label'].unique().tolist()\n # 过滤掉不需要查询Redis的特殊章节(如\"目录\")\n chapter_labels = [label for label in chapter_labels if label not in ['目录']]\n\n # 只有当存在需要查询的章节时,才进行Redis操作\n if chapter_labels:\n redis_data = redis_manager.read_catalogues_data_by_chapters(state['callback_task_id'], chapter_labels)\n # 去除两个DataFrame中相同chapter_label行的miss_outline列与missing_items列的公共元素\n miss_outline_df, redis_data, common_elements_list = remove_common_elements_between_dataframes(miss_outline_df, redis_data)\n logger.info(f\"[大纲审查] 公共元素列表: {common_elements_list}\")\n\n # 将更新后的数据写回Redis\n for index, row in redis_data.iterrows():\n chapter_label = row['chapter_label']\n # 准备要更新的数据\n update_data = {\n 'title': chapter_label,\n 'chapter_label': chapter_label,\n 'chapter_classification': row.get('chapter_classification', ''),\n 'missing_items': row.get('missing_items', []),\n 'common_elements_list': common_elements_list,\n 'miss_outline': miss_outline_df.loc[miss_outline_df['chapter_label'] == chapter_label, 'miss_outline'].values[0]\n if len(miss_outline_df.loc[miss_outline_df['chapter_label'] == chapter_label]) > 0 else [],\n 'specification_items': row.get('specification_items', []),\n }\n # 使用 update_row_by_title 方法更新Redis中的数据\n update_success = redis_manager.update_row_by_title(state['callback_task_id'], chapter_label, update_data)\n if update_success:\n logger.info(f\"[大纲审查] 成功将章节 '{chapter_label}' 的更新数据写回Redis\")\n else:\n logger.warning(f\"[大纲审查] 未能将章节 '{chapter_label}' 的数据写回Redis\")\n else:\n logger.info(f\"[大纲审查] 过滤后没有需要查询Redis的章节,跳过Redis操作\")\n \n # if os.path.exists(path_redis):\n # # 文件已存在,追加时不写表头\n # redis_data.to_csv(path_redis, mode='a', encoding='utf-8-sig', index=False, header=False)\n # # df.to_csv(csv_path, mode='a', encoding='utf-8-sig', index=False, header=False)\n\n # else:\n # # 文件不存在,首次写入时写表头\n # redis_data.to_csv(path_redis, mode='w', encoding='utf-8-sig', index=False, header=True)\n\n\n # # 判断文件是否存在,决定是否写入表头\n # if os.path.exists(csv_path):\n # # 文件已存在,追加时不写表头\n # miss_outline_df.to_csv(csv_path, mode='a', encoding='utf-8-sig', index=False, header=False)\n # # df.to_csv(csv_path, mode='a', encoding='utf-8-sig', index=False, header=False)\n\n # else:\n # # 文件不存在,首次写入时写表头\n # miss_outline_df.to_csv(csv_path, mode='w', encoding='utf-8-sig', index=False, header=True)\n # # df.to_csv(csv_path, mode='w', encoding='utf-8-sig', index=False, header=True)\n \n # df['']\n # 检查 df 是否为 None\n # if df is None:\n # logger.error(f\"[大纲审查] Redis中不存在ID '{trace_id_idx}' 的数据,无法进行大纲审查\")\n # return {\n # 'outline_review_result': {\n # \"response\": [],\n # },\n # 'error': f'Redis中不存在ID \\'{trace_id_idx}\\' 的数据'\n # }\n \n logger.info(f\"[大纲审查] 成功从Redis读取数据,共 {len(df)} 行\")\n\n # 检查 df 是否为空\n if df.empty or len(df) == 0:\n logger.warning(f\"[大纲审查] DataFrame为空,无法进行大纲审查\")\n return {\n 'outline_review_result': {\n \"response\": [],\n },\n 'success': False,\n 'execution_time': time.time() - start_time\n }\n\n # 兼容新旧字段名\n chapter_label_col = 'chapter_label' if 'chapter_label' in df.columns else ('section_label_first' if 'section_label_first' in df.columns else 'section_label')\n review_results_col = 'review_results_summary' if 'review_results_summary' in df.columns else ('merged_review_results' if 'merged_review_results' in df.columns else 'review_result')\n\n logger.info(f\"[大纲审查] 使用 chapter_label_col={chapter_label_col}, review_results_col={review_results_col}\")\n logger.info(f\"[大纲审查] DataFrame 列: {list(df.columns)}\")\n\n # 遍历每一行\n for index, row in df.iterrows():\n chapter_label = row.get(chapter_label_col, '')\n merged_results_str = row.get(review_results_col, '')\n\n logger.info(f\"第 {index} 行: chapter_label={chapter_label}, {review_results_col}={merged_results_str}\")\n\n # 解析review_results_summary字典字符串\n try:\n if pd.isna(merged_results_str) or merged_results_str == '':\n merged_results = {}\n elif isinstance(merged_results_str, dict):\n # 如果已经是字典,直接使用\n merged_results = merged_results_str\n else:\n # 尝试使用ast.literal_eval解析\n merged_results = ast.literal_eval(merged_results_str)\n except (ValueError, SyntaxError):\n try:\n # 尝试使用json.loads解析\n merged_results = json.loads(merged_results_str)\n except (json.JSONDecodeError, TypeError):\n logger.warning(f\"第 {index} 行无法解析review_results_summary: {merged_results_str}\")\n merged_results = {}\n\n # 检查字典中的每个字段\n if isinstance(merged_results, dict):\n # 检查是否有错误信息\n if 'error' in merged_results:\n logger.warning(f\"第 {index} 行审查结果包含错误: {merged_results.get('error')}\")\n continue # 跳过错误行,不进行大纲审查\n\n logger.info(f\"第 {index} 行 merged_results 键: {list(merged_results.keys())}\")\n\n # 获取chapter_label列表\n chapter_labels_list = row.get(chapter_label_col, [])\n if not isinstance(chapter_labels_list, list):\n chapter_labels_list = [str(chapter_labels_list)]\n\n # 获取review_results_summary字典的所有键,用于reason字段\n merged_results_keys = list(merged_results.keys())\n # 排除error等非审查字段\n merged_results_keys = [k for k in merged_results_keys if k not in ['error', 'chunk_id', 'page', 'section_label', 'chapter_classification', 'chapter_code', 'title']]\n\n logger.info(f\"第 {index} 行过滤后的有效字段: {merged_results_keys}\")\n\n if not merged_results_keys:\n logger.info(f\"第 {index} 行没有有效的审查字段,跳过大纲审查\")\n continue\n\n merged_results_keys_str = \"、\".join(merged_results_keys)\n\n # 只处理有效的审查字段\n for field_name, field_value in merged_results.items():\n # 跳过非审查字段\n if field_name in ['error', 'chunk_id', 'page']:\n continue\n # 检查列表是否为空\n if isinstance(field_value, list) and len(field_value) == 0:\n # 为chapter_label列表中的每个值创建单独的缺失项\n for chapter_label in chapter_labels_list:\n missing_item = {\n # \"check_item_code\": \"catalogue_completeness_check\",\n \"issue_point\": f\"{field_name}缺失\",\n \"location\": chapter_label,\n \"suggestion\": f\"在待审查目录中未找到与'{field_name}'对应的章节;当前章节仅涉及'{chapter_label}',未涵盖'{field_name}'相关内容;整改建议:建议在本章或前序章节中增设'{field_name}'相关内容,确保与审查规范要求一致。\",\n \"reason\": f\"本章应包含{merged_results_keys_str}等{len(merged_results_keys)}个方面。\",\n \"risk_level\": \"高风险\",\n # \"review_references\": '',\n \"reference_source\": '《桥梁公司危险性较大工程管理实施细则(2025版)》',\n\n }\n missing_items.append(missing_item)\n\n # if not metadata:\n # metadata = {\n # \"review_location_label\": df['chapter_label'].to_list()[-1],\n # \"chapter_code\": \"catalogue\",\n # \"original_content\": '',\n # }\n logger.info(f\"大纲审查完成,共发现 {len(missing_items)} 个缺失项\")\n execution_time = time.time() - start_time\n except FileNotFoundError:\n logger.error(f\"CSV文件不存在: {csv_path}\")\n execution_time = time.time() - start_time\n return {\n \"details\": {\n \"name\": \"completeness_check\",\n \"response\": [],\n \"review_location_label\": \"\",\n \"chapter_code\": \"catalogue\",\n \"original_content\": \"\"\n },\n \"success\": False,\n \"execution_time\": execution_time\n }\n except Exception as e:\n logger.error(f\"大纲审查失败: {str(e)}\", exc_info=True)\n execution_time = time.time() - start_time\n return {\n \"details\": {\n \"name\": \"completeness_check\",\n \"response\": [],\n \"review_location_label\": \"\",\n \"chapter_code\": \"catalogue\",\n \"original_content\": \"\"\n },\n \"success\": False,\n \"execution_time\": execution_time\n }\n logger.info(f\"大纲审查完成,耗时 {execution_time:.2f} 秒\")\n\n # 获取 review_location_label,使用兼容的字段名\n review_location_col = 'chapter_label' if 'chapter_label' in df.columns else 'section_label_first'\n if review_location_col not in df.columns or len(df) == 0:\n review_location_label = \"未知位置\"\n else:\n review_location_label = df[review_location_col].to_list()[-1]\n\n outcheck_result = {\n \"details\": {\n \"name\": \"completeness_check\",\n \"response\": missing_items,\n \"review_location_label\": review_location_label,\n \"chapter_code\": 'catalogue',\n \"original_content\": \"\"\n },\n \"success\": True,\n \"execution_time\": execution_time\n }\n logger.info(f\"大纲审查结果: {outcheck_result}\")\n return outcheck_result\n \n async def reference_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,\n state: dict = None, stage_name: str = None) -> Dict[str, Any]:\n \"\"\"\n 执行编制依据审查:调用prep_basis_reviewer中的异步审查功能\n\n Args:\n review_data: 待审查的编制依据数据,包含编制依据文本内容\n trace_id: 追踪ID\n state: 状态字典\n stage_name: 阶段名称\n\n Returns:\n 审查结果字典,包含编制依据审查结果\n \"\"\"\n start_time = time.time()\n try:\n logger.info(f\"开始编制依据审查,trace_id: {trace_id}\")\n\n # 提取关键数据\n basis_items: BasisItems = review_data.get('basis_items')\n review_content_text = review_data.get('content', '')\n max_concurrent = review_data.get('max_concurrent', 10)\n\n basis_list = []\n if basis_items and getattr(basis_items, \"items\", None):\n basis_list = [item.raw for item in basis_items.items if getattr(item, \"raw\", None)]\n review_content_text = review_content_text or \"\\n\".join(basis_list)\n\n logger.info(f\"提取的编制依据条目数: {len(basis_list)}\")\n if basis_list:\n logger.info(f\"编制依据内容预览: {basis_list[0][:50]}...\")\n elif review_content_text:\n logger.info(f\"编制依据内容预览(文本): {review_content_text[:50]}...\")\n else:\n logger.warning(\"编制依据内容为空,将跳过审查\")\n\n # 检查是否有有效的编制依据内容\n if not basis_list:\n logger.warning(\"没有可执行的编制依据审查任务\")\n return {\n \"reference_basis_review_results\": {\n \"review_results\": [],\n \"review_content\": review_content_text,\n \"total_basis_items\": 0,\n \"valid_items\": 0,\n \"standard_items\": 0,\n \"execution_time\": time.time() - start_time,\n \"error_message\": \"编制依据内容为空,无法进行审查\"\n }\n }\n\n # 调用prep_basis_reviewer中的异步审查方法\n logger.info(\"开始调用编制依据异步审查...\")\n\n try:\n # 使用信号量控制并发\n async with self.semaphore:\n # 从state中获取progress_manager和callback_task_id\n progress_manager = state.get('progress_manager') if state else None\n callback_task_id = state.get('callback_task_id') if state else None\n\n # 调用带有SSE推送功能的review_all方法\n from core.construction_review.component.reviewers.reference_basis_reviewer import BasisReviewService\n async with BasisReviewService(max_concurrent=max_concurrent) as service:\n reference_basis_review_results = await service.review_all(\n basis_items,\n collection_name=\"first_bfp_collection_status\",\n progress_manager=progress_manager,\n callback_task_id=callback_task_id\n )\n\n logger.info(f\"编制依据审查完成,批次数量: {len(reference_basis_review_results)}\")\n\n # 统计审查结果\n total_items = 0\n valid_items = 0\n standard_items = 0\n\n for batch in reference_basis_review_results:\n if isinstance(batch, list):\n total_items += len(batch)\n for item in batch:\n if isinstance(item, dict):\n valid_items += 1\n if item.get('is_standard', False):\n standard_items += 1\n\n logger.info(f\"审查统计 - 总编制依据: {total_items}, 有效项: {valid_items}, 标准项: {standard_items}\")\n\n except Exception as e:\n logger.error(f\"编制依据异步审查失败: {str(e)}\")\n return {\n \"reference_basis_review_results\": {\n \"review_results\": [],\n \"review_content\": review_content_text,\n \"total_basis_items\": 0,\n \"valid_items\": 0,\n \"standard_items\": 0,\n \"execution_time\": time.time() - start_time,\n \"error_message\": f\"编制依据审查失败: {str(e)}\"\n }\n }\n\n # 返回完整结果\n return {\n \"reference_basis_review_results\": {\n \"review_results\": reference_basis_review_results,\n \"review_content\": review_content_text,\n \"total_basis_items\": total_items,\n \"valid_items\": valid_items,\n \"standard_items\": standard_items,\n \"execution_time\": time.time() - start_time,\n \"error_message\": None\n }\n }\n\n except Exception as e:\n execution_time = time.time() - start_time\n error_msg = f\"编制依据审查失败: {str(e)}\"\n logger.error(error_msg, exc_info=True)\n\n return {\n \"reference_basis_review_results\": {\n \"review_results\": [],\n \"review_content\": review_data.get('content', ''),\n \"total_basis_items\": 0,\n \"valid_items\": 0,\n \"standard_items\": 0,\n \"execution_time\": execution_time,\n \"error_message\": error_msg\n }\n }\n \n async def timeliness_basis_reviewer(self, review_data: Dict[str, Any], trace_id: str,\n state: dict = None, stage_name: str = None) -> Dict[str, Any]:\n \"\"\"\n 执行编制依据审查:调用prep_basis_reviewer中的异步审查功能\n\n Args:\n review_data: 待审查的编制依据数据,包含编制依据文本内容\n trace_id: 追踪ID\n state: 状态字典\n stage_name: 阶段名称\n\n Returns:\n 审查结果字典,包含编制依据审查结果\n \"\"\"\n start_time = time.time()\n try:\n logger.info(f\"开始编制依据审查,trace_id: {trace_id}\")\n\n # 提取关键数据\n basis_items: BasisItems = review_data.get('basis_items')\n review_content_text = review_data.get('content', '')\n max_concurrent = review_data.get('max_concurrent', 10)\n\n # 基于BasisItems计算统计信息\n basis_list = []\n if basis_items and getattr(basis_items, \"items\", None):\n basis_list = [item.raw for item in basis_items.items if getattr(item, \"raw\", None)]\n review_content_text = review_content_text or \"\\n\".join(basis_list)\n\n logger.info(f\"提取的编制依据条目数: {len(basis_list)}\")\n if basis_list:\n logger.info(f\"编制依据内容预览: {basis_list[0][:50]}...\")\n elif review_content_text:\n logger.info(f\"编制依据内容预览(文本): {review_content_text[:50]}...\")\n else:\n logger.warning(\"编制依据内容为空,将跳过审查\")\n\n # 检查是否有有效的编制依据内容\n if not basis_list:\n logger.warning(\"没有可执行的编制依据审查任务\")\n return {\n \"timeliness_basis_review_results\": {\n \"review_results\": [],\n \"review_content\": review_content_text,\n \"total_basis_items\": 0,\n \"valid_items\": 0,\n \"standard_items\": 0,\n \"execution_time\": time.time() - start_time,\n \"error_message\": \"编制依据内容为空,无法进行审查\"\n }\n }\n\n # 调用prep_basis_reviewer中的异步审查方法\n logger.info(\"开始调用编制依据异步审查...\")\n\n try:\n # 使用信号量控制并发\n async with self.semaphore:\n # 从state中获取progress_manager和callback_task_id\n progress_manager = state.get('progress_manager') if state else None\n callback_task_id = state.get('callback_task_id') if state else None\n\n # 调用带有SSE推送功能的review_all方法\n from core.construction_review.component.reviewers.timeliness_basis_reviewer import BasisReviewService\n async with BasisReviewService(max_concurrent=max_concurrent) as service:\n timeliness_basis_review_results = await service.review_all(\n basis_items,\n collection_name=\"first_bfp_collection_status\",\n progress_manager=progress_manager,\n callback_task_id=callback_task_id\n )\n\n logger.info(f\"编制依据审查完成,批次数量: {len(timeliness_basis_review_results)}\")\n\n # 统计审查结果\n total_items = 0\n valid_items = 0\n standard_items = 0\n\n for batch in timeliness_basis_review_results:\n if isinstance(batch, list):\n total_items += len(batch)\n for item in batch:\n if isinstance(item, dict):\n valid_items += 1\n if item.get('is_standard', False):\n standard_items += 1\n\n logger.info(f\"审查统计 - 总编制依据: {total_items}, 有效项: {valid_items}, 标准项: {standard_items}\")\n\n except Exception as e:\n logger.error(f\"编制依据异步审查失败: {str(e)}\")\n return {\n \"timeliness_basis_review_results\": {\n \"review_results\": [],\n \"review_content\": review_content_text,\n \"total_basis_items\": 0,\n \"valid_items\": 0,\n \"standard_items\": 0,\n \"execution_time\": time.time() - start_time,\n \"error_message\": f\"编制依据审查失败: {str(e)}\"\n }\n }\n\n # 返回完整结果\n return {\n \"timeliness_basis_review_results\": {\n \"review_results\": timeliness_basis_review_results,\n \"review_content\": review_content_text,\n \"total_basis_items\": total_items,\n \"valid_items\": valid_items,\n \"standard_items\": standard_items,\n \"execution_time\": time.time() - start_time,\n \"error_message\": None\n }\n }\n\n except Exception as e:\n execution_time = time.time() - start_time\n error_msg = f\"编制依据审查失败: {str(e)}\"\n logger.error(error_msg, exc_info=True)\n\n return {\n \"timeliness_basis_review_results\": {\n \"review_results\": [],\n \"review_content\": review_data.get('content', ''),\n \"total_basis_items\": 0,\n \"valid_items\": 0,\n \"standard_items\": 0,\n \"execution_time\": execution_time,\n \"error_message\": error_msg\n }\n }", "start_line": 1, "end_line": 1486, "total_lines": 1486, "has_more": false }