|
|
@@ -1,690 +0,0 @@
|
|
|
-# -*- coding: utf-8 -*-
|
|
|
-"""
|
|
|
-精确审查器模块
|
|
|
-重构自outline_detect,保留核心处理逻辑
|
|
|
-独立实现,不继承BaseReviewer
|
|
|
-"""
|
|
|
-
|
|
|
-import json
|
|
|
-import re
|
|
|
-import asyncio
|
|
|
-import time
|
|
|
-from typing import Dict, Any
|
|
|
-from foundation.observability.logger.loggering import server_logger as logger
|
|
|
-from foundation.ai.agent.generate.model_generate import generate_model_client
|
|
|
-from core.construction_review.component.reviewers.utils.inter_tool import InterTool
|
|
|
-from core.construction_review.component.reviewers.utils.prompt_loader import prompt_loader
|
|
|
-
|
|
|
-
|
|
|
-class OutlineReviewer:
|
|
|
- """
|
|
|
- 精确审查器
|
|
|
- 集成章节分类和大纲完整性审查功能
|
|
|
- """
|
|
|
-
|
|
|
- # 分类标准映射
|
|
|
- CLASSIFICATION_STANDARDS = {
|
|
|
- "一、编制依据": "本章应包含法律法规、标准规范、文件制度、编制原则、编制范围等五个方面。",
|
|
|
- "二、工程概况": "本章应包含设计概况、工程地质与水文气象、周边环境、施工平面及立面布置、施工要求和技术保证条件、风险辨识与分级、参建各方责任主体单位等七个方面。",
|
|
|
- "三、施工计划": "本章应包含施工进度计划、施工材料计划、施工设备计划、劳动力计划、安全生产费用使用计划等五个方面。",
|
|
|
- "四、施工工艺技术": "本章应包含主要施工方法概述、技术参数、工艺流程、施工准备、施工方法及操作要求、检查要求等六个方面。",
|
|
|
- "五、安全保证措施": "本章应包含安全保证体系、组织保证措施、技术保证措施、监测监控措施、应急处置措施等五个方面。",
|
|
|
- "六、质量保证措施": "本章应包含质量保证体系、质量目标、工程创优规划、质量控制程序与具体措施等四个方面。",
|
|
|
- "七、环境保证措施": "本章应包含环境保证体系、环境保护组织机构、环境保护及文明施工措施等三个方面。",
|
|
|
- "八、施工管理及作业人员配备与分工": "本章应包含施工管理人员、专职安全生产管理人员、特种作业人员、其他作业人员等四个方面。",
|
|
|
- "九、验收要求": "本章应包含验收标准、验收程序、验收内容、验收时间、验收人员等五个方面。",
|
|
|
- "十、其他资料": "本章应包含计算书、相关施工图纸、附图附表、编制及审核人员情况等四个方面。"
|
|
|
- }
|
|
|
-
|
|
|
- def __init__(self):
|
|
|
- """初始化精确审查器"""
|
|
|
- self.model_client = generate_model_client
|
|
|
- self.prompt_loader = prompt_loader
|
|
|
- # self.review_location_label = review_location_label
|
|
|
- self.inter_tool = InterTool()
|
|
|
- self.reviewer_type = "outline"
|
|
|
-
|
|
|
- async def outline_review(self, review_data: Dict[str, Any], trace_id: str, state: dict = None,stage_name: str = None) -> Dict[str, Any]:
|
|
|
- """
|
|
|
- 执行两阶段大纲审查:1.一级大纲完整性审查 2.次级大纲逐项审查
|
|
|
-
|
|
|
- Args:
|
|
|
- review_data: 待审查的大纲数据,包含outline_content、overall_outline、detailed_outline等
|
|
|
- trace_id: 追踪ID
|
|
|
- stage_name: 阶段名称
|
|
|
- state: 状态字典
|
|
|
-
|
|
|
- Returns:
|
|
|
- 审查结果字典,包含一级大纲审查结果和次级大纲逐项审查结果
|
|
|
- """
|
|
|
- start_time = time.time()
|
|
|
- try:
|
|
|
- logger.debug(f"开始两阶段大纲审查,trace_id: {trace_id}")
|
|
|
-
|
|
|
- # 提取关键数据
|
|
|
- overall_outline = review_data.get('overall_outline', '')
|
|
|
- detailed_outline = review_data.get('detailed_outline', [])
|
|
|
-
|
|
|
- # # 添加调试信息
|
|
|
- # logger.debug(f"提取的数据 - overall_outline长度: {len(overall_outline)}, detailed_outline数量: {len(detailed_outline)}")
|
|
|
- # if overall_outline:
|
|
|
- # logger.debug(f"overall_outline内容预览: {overall_outline[:100]}...")
|
|
|
- # else:
|
|
|
- # logger.warning("overall_outline为空,将跳过阶段1审查")
|
|
|
-
|
|
|
- # 并发执行阶段1和阶段2
|
|
|
- logger.debug("开始并发执行两阶段大纲审查...")
|
|
|
-
|
|
|
- # 创建并发任务
|
|
|
- tasks = []
|
|
|
-
|
|
|
- # 阶段1:一级大纲完整性审查(仅在有数据时执行)
|
|
|
- if overall_outline and overall_outline.strip():
|
|
|
- logger.debug("启动阶段1:一级大纲完整性审查...")
|
|
|
- # 创建Task对象
|
|
|
- overall_task = asyncio.create_task(
|
|
|
- self._overall_completeness_review(overall_outline, trace_id, state, stage_name)
|
|
|
- )
|
|
|
- tasks.append(("overall", overall_task))
|
|
|
-
|
|
|
- # 阶段2:次级大纲逐项审查
|
|
|
- if detailed_outline:
|
|
|
- logger.debug("启动阶段2:次级大纲逐项审查...")
|
|
|
- # 创建Task对象
|
|
|
- detailed_task = asyncio.create_task(
|
|
|
- self._detailed_item_review(detailed_outline, trace_id, state, stage_name)
|
|
|
- )
|
|
|
- tasks.append(("detailed", detailed_task))
|
|
|
-
|
|
|
- # 处理空数据情况
|
|
|
- if not tasks:
|
|
|
- overall_review_result = {
|
|
|
- "success": False,
|
|
|
- "error_message": "没有有效的审查任务",
|
|
|
- "overall_outline": overall_outline,
|
|
|
- "parsed_result": None
|
|
|
- } if (overall_outline and overall_outline.strip()) else None
|
|
|
- detailed_review_results = []
|
|
|
- logger.warning("没有可执行的审查任务")
|
|
|
- else:
|
|
|
- # 等待所有阶段完成 - 使用 asyncio.wait 替代 gather
|
|
|
- # 整体超时:阶段1(60s) + 阶段2(基于任务数动态计算)
|
|
|
- stage1_timeout = 60
|
|
|
- stage2_timeout = (50 * len(detailed_outline) / 3) + 60 if detailed_outline else 0
|
|
|
- total_timeout = max(stage1_timeout, stage2_timeout) + 30 # 并发执行,取最大值+缓冲
|
|
|
-
|
|
|
- logger.debug(f"[大纲审查] 两阶段整体超时设置: {total_timeout:.0f}秒")
|
|
|
-
|
|
|
- # 提取任务列表
|
|
|
- task_list = [task for _, task in tasks]
|
|
|
-
|
|
|
- done, pending = await asyncio.wait(
|
|
|
- task_list,
|
|
|
- timeout=total_timeout,
|
|
|
- return_when=asyncio.ALL_COMPLETED
|
|
|
- )
|
|
|
-
|
|
|
- # 取消未完成的任务
|
|
|
- for task in pending:
|
|
|
- task.cancel()
|
|
|
- logger.warning(f"[大纲审查] 阶段任务超时,已取消")
|
|
|
-
|
|
|
- # 构建任务到阶段名称的映射(关键修复:asyncio.wait返回的done是无序集合)
|
|
|
- task_to_stage = {task: stage_name for stage_name, task in tasks}
|
|
|
-
|
|
|
- # 收集结果并按阶段名称分类
|
|
|
- stage_results = {}
|
|
|
- for task in done:
|
|
|
- stage_name_key = task_to_stage.get(task)
|
|
|
- try:
|
|
|
- result = task.result()
|
|
|
- logger.debug(f"[大纲审查] {stage_name_key} task.result()返回, 类型: {type(result).__name__}")
|
|
|
- if isinstance(result, dict):
|
|
|
- logger.debug(f"[大纲审查] result是字典, 包含键: {list(result.keys())}")
|
|
|
- elif isinstance(result, list):
|
|
|
- logger.debug(f"[大纲审查] result是列表, 长度: {len(result)}")
|
|
|
- stage_results[stage_name_key] = result
|
|
|
- except asyncio.CancelledError:
|
|
|
- logger.error(f"[大纲审查] {stage_name_key} 阶段任务被取消")
|
|
|
- stage_results[stage_name_key] = Exception("Stage task cancelled")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"[大纲审查] {stage_name_key} 阶段任务执行失败: {str(e)}", exc_info=True)
|
|
|
- stage_results[stage_name_key] = e
|
|
|
-
|
|
|
- # 处理pending任务
|
|
|
- for task in pending:
|
|
|
- stage_name_key = task_to_stage.get(task)
|
|
|
- stage_results[stage_name_key] = Exception("Task not completed (timeout)")
|
|
|
-
|
|
|
- # 处理结果
|
|
|
- overall_review_result = None
|
|
|
- detailed_review_results = []
|
|
|
-
|
|
|
- for stage_name_key, _ in tasks:
|
|
|
- result = stage_results.get(stage_name_key)
|
|
|
- logger.debug(f"[大纲审查] 处理阶段: stage_name={stage_name_key}, result类型={type(result).__name__ if result else 'None'}")
|
|
|
-
|
|
|
- if stage_name_key == "overall":
|
|
|
- if isinstance(result, Exception):
|
|
|
- logger.error(f"阶段1执行异常: {str(result)}")
|
|
|
- overall_review_result = {
|
|
|
- "success": False,
|
|
|
- "error_message": f"阶段1异常: {str(result)}",
|
|
|
- "overall_outline": overall_outline,
|
|
|
- "parsed_result": None
|
|
|
- }
|
|
|
- elif isinstance(result, dict):
|
|
|
- overall_review_result = result
|
|
|
- logger.debug(f"阶段1完成,成功: {overall_review_result.get('success', False)}")
|
|
|
- else:
|
|
|
- # 处理意外的返回类型(如列表)
|
|
|
- logger.error(f"阶段1返回了意外的类型: {type(result).__name__}")
|
|
|
- overall_review_result = {
|
|
|
- "success": False,
|
|
|
- "error_message": f"阶段1返回了意外的类型: {type(result).__name__}",
|
|
|
- "overall_outline": overall_outline,
|
|
|
- "parsed_result": None
|
|
|
- }
|
|
|
-
|
|
|
- elif stage_name_key == "detailed":
|
|
|
- if isinstance(result, Exception):
|
|
|
- logger.error(f"阶段2执行异常: {str(result)}")
|
|
|
- detailed_review_results = []
|
|
|
- else:
|
|
|
- detailed_review_results = result
|
|
|
- logger.debug(f"阶段2完成,审查项目数: {len(detailed_review_results)}")
|
|
|
-
|
|
|
- logger.debug("两阶段并发审查全部完成")
|
|
|
-
|
|
|
- # 返回完整结果
|
|
|
- return {
|
|
|
- "success": True,
|
|
|
- "stage1_overall_review": overall_review_result,
|
|
|
- "stage2_detailed_review": detailed_review_results,
|
|
|
- "total_detailed_items": len([item for item in detailed_outline if item.strip()]),
|
|
|
- "execution_time": time.time() - start_time
|
|
|
- }
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- execution_time = time.time() - start_time
|
|
|
- error_msg = f"两阶段大纲审查失败: {str(e)}"
|
|
|
- logger.error(error_msg, exc_info=True)
|
|
|
-
|
|
|
- return {
|
|
|
- "success": False,
|
|
|
- "error_message": error_msg,
|
|
|
- "execution_time": execution_time,
|
|
|
- "stage1_overall_review": None,
|
|
|
- "stage2_detailed_review": []
|
|
|
- }
|
|
|
-
|
|
|
- async def _overall_completeness_review(self, overall_outline: str, trace_id: str, state: dict = None, stage_name: str = None) -> Dict[str, Any]:
|
|
|
- """
|
|
|
- 阶段1:一级大纲完整性审查 - 检查十大类章节是否有缺失
|
|
|
-
|
|
|
- Args:
|
|
|
- overall_outline: 一级大纲内容
|
|
|
- trace_id: 追踪ID
|
|
|
- state: 状态字典
|
|
|
- stage_name: 阶段名称
|
|
|
-
|
|
|
- Returns:
|
|
|
- 一级大纲审查结果
|
|
|
- """
|
|
|
- try:
|
|
|
- if not overall_outline or not overall_outline.strip():
|
|
|
- logger.warning("一级大纲为空或仅包含空白字符")
|
|
|
- return {
|
|
|
- "success": False,
|
|
|
- "error_message": "一级大纲为空,无法进行完整性审查",
|
|
|
- "overall_outline": overall_outline,
|
|
|
- "parsed_result": None
|
|
|
- }
|
|
|
-
|
|
|
- logger.debug("执行一级大纲完整性审查...")
|
|
|
-
|
|
|
- # 构建提示词参数
|
|
|
- prompt_kwargs = {}
|
|
|
- prompt_kwargs["review_content"] = overall_outline
|
|
|
-
|
|
|
- # 获取一级大纲审查提示词模板
|
|
|
- task_prompt = self.prompt_loader.get_prompt_template(
|
|
|
- self.reviewer_type,
|
|
|
- "overall_outline_completeness_review",
|
|
|
- **prompt_kwargs
|
|
|
- )
|
|
|
-
|
|
|
- task_prompt_info = {
|
|
|
- "task_prompt": task_prompt,
|
|
|
- "task_name": "一级大纲完整性审查"
|
|
|
- }
|
|
|
-
|
|
|
- # 调用模型进行审查 - 大纲审查设置90秒超时
|
|
|
- model_response = await self.model_client.get_model_generate_invoke(
|
|
|
- trace_id=trace_id,
|
|
|
- task_prompt_info=task_prompt_info,
|
|
|
- timeout=90
|
|
|
- )
|
|
|
-
|
|
|
- response_text = model_response
|
|
|
- # 直接提取JSON数据,避免关键词误判
|
|
|
- json_data = self.inter_tool._extract_json_data(response_text)
|
|
|
- overall_completeness_result = []
|
|
|
-
|
|
|
- if json_data and isinstance(json_data, list):
|
|
|
- for item in json_data:
|
|
|
- overall_completeness_result.append(self.inter_tool._create_issue_item(item, "completeness_check","catalogue","catalogue_completeness_check"))
|
|
|
- elif json_data and isinstance(json_data, dict):
|
|
|
- overall_completeness_result.append(self.inter_tool._create_issue_item(json_data, "completeness_check","catalogue","catalogue_completeness_check"))
|
|
|
- #filtered_issues = [r for r in overall_completeness_result if self._is_non_compliant_item(r)]
|
|
|
- # 只统计exist_issue为true的项目数量
|
|
|
- issue_count = sum(1 for item in overall_completeness_result if item.get('exist_issue', False))
|
|
|
- message=f"一级大纲完整性审查完成,发现 {issue_count} 个问题",
|
|
|
- if issue_count == 0:
|
|
|
- message = "一级大纲完整性审查已通过,未发现缺失项"
|
|
|
-
|
|
|
- if state and state.get("progress_manager"):
|
|
|
- # 使用try-catch确保SSE推送失败不会影响主流程
|
|
|
- try:
|
|
|
- # 【修复】明确传递issues的副本,避免变量混淆
|
|
|
- issues_copy = list(overall_completeness_result) if overall_completeness_result else []
|
|
|
- await state["progress_manager"].update_stage_progress(
|
|
|
- callback_task_id=state["callback_task_id"],
|
|
|
- stage_name=f"{stage_name} - 阶段1:一级大纲完整性审查",
|
|
|
- current=None, # 明确不更新current,保持主流程进度
|
|
|
- status="processing",
|
|
|
- message=message,
|
|
|
- issues=issues_copy,
|
|
|
- event_type="processing" # 使用专门的事件类型
|
|
|
- )
|
|
|
- logger.debug("SSE推送成功: 一级大纲完整性审查完成")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"SSE推送失败: 一级大纲完整性审查, 错误: {str(e)}")
|
|
|
- # 不抛出异常,避免影响主流程
|
|
|
-
|
|
|
- # 【调试】明确返回字典,使用新的变量名避免混淆
|
|
|
- final_result = {
|
|
|
- "success": True,
|
|
|
- "overall_outline": overall_outline,
|
|
|
- "parsed_result": overall_completeness_result
|
|
|
- }
|
|
|
- logger.debug(f"[大纲审查-阶段1] 准备返回final_result, 类型: {type(final_result).__name__}, 包含键: {list(final_result.keys())}")
|
|
|
- return final_result
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"一级大纲完整性审查异常: {str(e)}", exc_info=True)
|
|
|
- return {
|
|
|
- "success": False,
|
|
|
- "error_message": f"一级大纲完整性审查异常: {str(e)}",
|
|
|
- "overall_outline": overall_outline,
|
|
|
- "parsed_result": None
|
|
|
- }
|
|
|
-
|
|
|
- async def _detailed_item_review(self, detailed_outline: list, trace_id: str,state, stage_name) -> list:
|
|
|
- """
|
|
|
- 阶段2:次级大纲并发审查 - 对detailed_outline中的所有项目进行并发审查
|
|
|
-
|
|
|
- Args:
|
|
|
- detailed_outline: 次级大纲列表
|
|
|
- trace_id: 追踪ID
|
|
|
-
|
|
|
- Returns:
|
|
|
- 次级大纲逐项审查结果列表
|
|
|
- """
|
|
|
- if not detailed_outline:
|
|
|
- logger.warning("次级大纲列表为空")
|
|
|
- return []
|
|
|
-
|
|
|
- # 过滤空项目并创建任务列表
|
|
|
- valid_items = [(i, item) for i, item in enumerate(detailed_outline) if item.strip()]
|
|
|
-
|
|
|
- if not valid_items:
|
|
|
- logger.warning("没有有效的次级大纲项目")
|
|
|
- return []
|
|
|
-
|
|
|
- logger.debug(f"开始次级大纲并发审查,有效项目数量: {len(valid_items)}")
|
|
|
-
|
|
|
- # 创建并发审查任务 - 降低并发数避免模型服务过载
|
|
|
- semaphore = asyncio.Semaphore(3) # 限制并发数为3,避免过载
|
|
|
-
|
|
|
- tasks = []
|
|
|
-
|
|
|
- for i, outline_item in valid_items:
|
|
|
- # 只用信号量控制并发,不添加外层wait_for(避免双重超时控制)
|
|
|
- task = asyncio.create_task(
|
|
|
- self._concurrent_single_review(i, outline_item, trace_id, semaphore, state, stage_name)
|
|
|
- )
|
|
|
- tasks.append(task)
|
|
|
-
|
|
|
- # 使用 asyncio.wait 提供超时控制
|
|
|
- # 整体超时:每个任务预计最多48秒(15×3+0.5+1+2),乘以任务数的1/3(并发数为3)
|
|
|
- estimated_time_per_task = 50 # 秒
|
|
|
- total_timeout = (estimated_time_per_task * len(tasks) / 3) + 60 # 加60秒缓冲
|
|
|
-
|
|
|
- logger.debug(f"[大纲审查] 设置整体超时: {total_timeout:.0f}秒,任务数: {len(tasks)}")
|
|
|
-
|
|
|
- done, pending = await asyncio.wait(tasks, timeout=total_timeout)
|
|
|
-
|
|
|
- # 取消未完成的任务
|
|
|
- for task in pending:
|
|
|
- task.cancel()
|
|
|
- logger.warning(f"[大纲审查] 任务超时,已取消")
|
|
|
-
|
|
|
- # 收集结果
|
|
|
- results = []
|
|
|
- for task in done:
|
|
|
- try:
|
|
|
- result = task.result()
|
|
|
- results.append(result)
|
|
|
- except asyncio.CancelledError:
|
|
|
- logger.error(f"[大纲审查] 任务被取消")
|
|
|
- results.append(Exception("Task cancelled"))
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"[大纲审查] 任务执行失败: {str(e)}", exc_info=True)
|
|
|
- results.append(e)
|
|
|
-
|
|
|
- logger.debug(f"并发审查完成,总任务数: {len(tasks)}, 成功: {len(done)}, 超时: {len(pending)}")
|
|
|
-
|
|
|
- # 处理结果
|
|
|
- detailed_review_results = []
|
|
|
- for i, result in enumerate(results):
|
|
|
- original_index = valid_items[i][0]
|
|
|
- outline_item = valid_items[i][1]
|
|
|
-
|
|
|
- if isinstance(result, Exception):
|
|
|
- logger.error(f"第{original_index+1}项审查异常: {str(result)}")
|
|
|
- detailed_review_results.append({
|
|
|
- "item_index": original_index,
|
|
|
- "outline_item": outline_item,
|
|
|
- "review_result": {
|
|
|
- "success": False,
|
|
|
- "error_message": f"审查异常: {str(result)}",
|
|
|
- "category": None,
|
|
|
- "parsed_result": None
|
|
|
- }
|
|
|
- })
|
|
|
- else:
|
|
|
- detailed_review_results.append({
|
|
|
- "item_index": original_index,
|
|
|
- "outline_item": outline_item,
|
|
|
- "review_result": result
|
|
|
- })
|
|
|
-
|
|
|
- return detailed_review_results
|
|
|
-
|
|
|
- async def _concurrent_single_review(self, item_index: int, outline_item: str, trace_id: str, semaphore: asyncio.Semaphore, state, stage_name) -> Dict[str, Any]:
|
|
|
- """
|
|
|
- 单个项目的并发审查
|
|
|
-
|
|
|
- Args:
|
|
|
- item_index: 项目索引
|
|
|
- outline_item: 大纲项目内容
|
|
|
- trace_id: 追踪ID
|
|
|
- semaphore: 并发控制信号量
|
|
|
- state: 状态字典
|
|
|
- stage_name: 阶段名称
|
|
|
-
|
|
|
- Returns:
|
|
|
- 单项审查结果
|
|
|
- """
|
|
|
- async with semaphore:
|
|
|
- try:
|
|
|
- logger.debug(f"开始审查第{item_index+1}项: {outline_item[:50]}...")
|
|
|
- result = await self._single_item_review(outline_item, trace_id, item_index, state, stage_name)
|
|
|
- logger.debug(f"完成审查第{item_index+1}项,成功: {result.get('success', False)}")
|
|
|
- return result
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"第{item_index+1}项审查失败: {str(e)}")
|
|
|
- return {
|
|
|
- "success": False,
|
|
|
- "error_message": f"审查失败: {str(e)}",
|
|
|
- "category": None,
|
|
|
- "parsed_result": None
|
|
|
- }
|
|
|
-
|
|
|
- async def _fallback_sequential_review(self, valid_items: list, trace_id: str, state, stage_name) -> list:
|
|
|
- """
|
|
|
- 降级串行审查(当并发失败时使用)
|
|
|
-
|
|
|
- Args:
|
|
|
- valid_items: 有效项目列表 [(index, content), ...]
|
|
|
- trace_id: 追踪ID
|
|
|
-
|
|
|
- Returns:
|
|
|
- 串行审查结果列表
|
|
|
- """
|
|
|
- logger.warning("降级为串行审查模式")
|
|
|
- detailed_review_results = []
|
|
|
-
|
|
|
- for i, outline_item in valid_items:
|
|
|
- try:
|
|
|
- logger.debug(f"串行审查第{i+1}项: {outline_item[:50]}...")
|
|
|
- item_review_result = await self._single_item_review(outline_item, trace_id, i, state, stage_name)
|
|
|
-
|
|
|
- detailed_review_results.append({
|
|
|
- "item_index": i,
|
|
|
- "outline_item": outline_item,
|
|
|
- "review_result": item_review_result
|
|
|
- })
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"串行审查第{i+1}项失败: {str(e)}")
|
|
|
- detailed_review_results.append({
|
|
|
- "item_index": i,
|
|
|
- "outline_item": outline_item,
|
|
|
- "review_result": {
|
|
|
- "success": False,
|
|
|
- "error_message": f"串行审查失败: {str(e)}",
|
|
|
- "category": None,
|
|
|
- "parsed_result": None
|
|
|
- }
|
|
|
- })
|
|
|
-
|
|
|
- return detailed_review_results
|
|
|
-
|
|
|
- async def _single_item_review(self, outline_item: str, trace_id: str, item_index: int, state: dict = None, stage_name: str = None) -> Dict[str, Any]:
|
|
|
- """
|
|
|
- 单项大纲审查 - 调用原有逻辑
|
|
|
-
|
|
|
- Args:
|
|
|
- outline_item: 单个大纲项目
|
|
|
- trace_id: 追踪ID
|
|
|
- item_index: 项目索引
|
|
|
- state: 状态字典
|
|
|
- stage_name: 阶段名称
|
|
|
-
|
|
|
- Returns:
|
|
|
- 单项审查结果
|
|
|
- """
|
|
|
- # 第一步:分类
|
|
|
- category = await self._classify(outline_item, f"{trace_id}_item_{item_index}")
|
|
|
- logger.debug(f"次级大纲outline_item调试: {outline_item}")
|
|
|
- logger.debug(f"第{item_index+1}项分类结果: {category}")
|
|
|
-
|
|
|
- if not category:
|
|
|
- logger.warning(f"无法分类第{item_index+1}项,使用默认分类")
|
|
|
- category = "二、工程概况" # 默认分类
|
|
|
-
|
|
|
- # 第二步:完整性审查
|
|
|
- review_standard = self.CLASSIFICATION_STANDARDS.get(category,
|
|
|
- "本章应包含设计概况、工程地质与水文气象、周边环境等基本情况。")
|
|
|
-
|
|
|
- # 构建提示词参数
|
|
|
- prompt_kwargs = {}
|
|
|
- prompt_kwargs["review_content"] = outline_item
|
|
|
-
|
|
|
- prompt_kwargs["review_references"] = review_standard
|
|
|
- logger.debug(f"第{item_index+1}项审查参考标准: {review_standard}")
|
|
|
- # 获取提示词模板
|
|
|
- task_prompt = self.prompt_loader.get_prompt_template(
|
|
|
- self.reviewer_type,
|
|
|
- "outline_completeness_review",
|
|
|
- **prompt_kwargs
|
|
|
- )
|
|
|
-
|
|
|
- task_prompt_info = {
|
|
|
- "task_prompt": task_prompt,
|
|
|
- "task_name": f"单项大纲完整性审查-{category}"
|
|
|
- }
|
|
|
-
|
|
|
- # 调用模型进行审查 - 大纲审查设置90秒超时
|
|
|
- model_response = await self.model_client.get_model_generate_invoke(
|
|
|
- trace_id=f"{trace_id}_item_{item_index}",
|
|
|
- task_prompt_info=task_prompt_info,
|
|
|
- timeout=90,
|
|
|
- model_name="qwen3_30b"
|
|
|
- )
|
|
|
-
|
|
|
- response_text = model_response
|
|
|
- # 直接提取JSON数据,避免关键词误判
|
|
|
- json_data = self.inter_tool._extract_json_data(response_text)
|
|
|
- parsed_result = []
|
|
|
-
|
|
|
- if json_data and isinstance(json_data, list):
|
|
|
- for item in json_data:
|
|
|
- parsed_result.append(self.inter_tool._create_issue_item(item, 'completeness_check','catalogue','catalogue_completeness_check'))
|
|
|
- elif json_data and isinstance(json_data, dict):
|
|
|
- parsed_result.append(self.inter_tool._create_issue_item(json_data, 'completeness_check','catalogue','catalogue_completeness_check'))
|
|
|
-
|
|
|
- # with open(f"temp\outline_result_temp\次级大纲审查中间结果.json", "a", encoding="utf-8") as f:
|
|
|
- # f.write(response_text)
|
|
|
- # # 发送单项审查完成进度
|
|
|
- # logger.debug(f"state参数检查: state存在={state is not None}")
|
|
|
- if state:
|
|
|
- logger.debug(f"state keys: {list(state.keys())}")
|
|
|
- logger.debug(f"progress_manager存在: {'progress_manager' in state}")
|
|
|
- if state and state.get("progress_manager"):
|
|
|
- # 只统计exist_issue为true的项目数量
|
|
|
- issue_count = sum(1 for item in parsed_result if item.get('exist_issue', False))
|
|
|
- message = f"第{item_index+1}项{category}审查完成,发现 {issue_count} 个问题",
|
|
|
- # 使用try-catch确保SSE推送失败不会影响主流程
|
|
|
- try:
|
|
|
- await state["progress_manager"].update_stage_progress(
|
|
|
- callback_task_id=state["callback_task_id"],
|
|
|
- stage_name=f"{stage_name} - 阶段2:次级大纲审查",
|
|
|
- current=item_index + 1, # 显示当前审查项目索引
|
|
|
- status="processing",
|
|
|
- message=message,
|
|
|
- issues=parsed_result,
|
|
|
- event_type="processing" # 使用专门的事件类型
|
|
|
- )
|
|
|
- logger.debug(f"SSE推送成功: 第{item_index+1}项{category}审查完成")
|
|
|
- logger.debug(f"发送单项审查完成进度: 第{item_index+1}项{category}审查完成")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"SSE推送失败: 第{item_index+1}项{category}, 错误: {str(e)}")
|
|
|
- # 不抛出异常,避免影响主流程
|
|
|
-
|
|
|
- return {
|
|
|
- "success": True,
|
|
|
- "category": category,
|
|
|
- "review_standard": review_standard,
|
|
|
- "result": response_text,
|
|
|
- "parsed_result": parsed_result
|
|
|
- }
|
|
|
-
|
|
|
- async def _classify(self, outline_components: str, trace_id: str) -> str:
|
|
|
- """
|
|
|
- 执行分类
|
|
|
-
|
|
|
- Args:
|
|
|
- outline_components: 待分类的章节目录文本
|
|
|
- trace_id: 追踪ID
|
|
|
-
|
|
|
- Returns:
|
|
|
- 分类结果字符串
|
|
|
- """
|
|
|
- try:
|
|
|
- # 构建提示词参数
|
|
|
- prompt_kwargs = {}
|
|
|
- prompt_kwargs["review_content"] = outline_components
|
|
|
- # review_location_label 在当前版本中不使用,注释掉相关逻辑
|
|
|
-
|
|
|
- # 获取提示词模板
|
|
|
- task_prompt = self.prompt_loader.get_prompt_template(
|
|
|
- self.reviewer_type,
|
|
|
- "outline_completeness_classifier",
|
|
|
- **prompt_kwargs
|
|
|
- )
|
|
|
-
|
|
|
- task_prompt_info = {
|
|
|
- "task_prompt": task_prompt,
|
|
|
- "task_name": "章节目录分类器"
|
|
|
- }
|
|
|
-
|
|
|
- # 调用模型 - 大纲审查设置90秒超时
|
|
|
- model_response = await self.model_client.get_model_generate_invoke(
|
|
|
- trace_id=trace_id,
|
|
|
- task_prompt_info=task_prompt_info,
|
|
|
- timeout=90
|
|
|
- )
|
|
|
-
|
|
|
- # 提取分类结果
|
|
|
- category = self._extract_category_from_response(model_response)
|
|
|
-
|
|
|
- if category and category in self.CLASSIFICATION_STANDARDS:
|
|
|
- return category
|
|
|
- else:
|
|
|
- # 尝试模糊匹配
|
|
|
- return self._fuzzy_match_category(model_response)
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"分类失败: {str(e)}")
|
|
|
- return None
|
|
|
-
|
|
|
- def _extract_category_from_response(self, response: str) -> str:
|
|
|
- """
|
|
|
- 从LLM响应中提取类别名称
|
|
|
-
|
|
|
- Args:
|
|
|
- response: LLM返回的文本
|
|
|
-
|
|
|
- Returns:
|
|
|
- 提取的类别名称,如果无法提取则返回None
|
|
|
- """
|
|
|
- if not response:
|
|
|
- return None
|
|
|
-
|
|
|
- response = response.strip()
|
|
|
-
|
|
|
- # 精确匹配完整类别名称
|
|
|
- for category in self.CLASSIFICATION_STANDARDS.keys():
|
|
|
- if category in response:
|
|
|
- return category
|
|
|
-
|
|
|
- # 使用正则表达式匹配类别格式
|
|
|
- category_patterns = [
|
|
|
- r'["""]([一二三四五六七八九十]、[^"""]+)["""]',
|
|
|
- r'类别[::]\s*([一二三四五六七八九十]、[^\n。,;]+)',
|
|
|
- r'属于[::]\s*([一二三四五六七八九十]、[^\n。,;]+)',
|
|
|
- r'(?:^|[\s\n])([一二三四五六七八九十]、[^\n。,;\s]+)(?:[\s\n]|$)',
|
|
|
- ]
|
|
|
-
|
|
|
- for pattern in category_patterns:
|
|
|
- matches = re.finditer(pattern, response)
|
|
|
- for match in matches:
|
|
|
- category = match.group(1) if match.groups() else match.group(0)
|
|
|
- category = category.strip()
|
|
|
- if category in self.CLASSIFICATION_STANDARDS:
|
|
|
- return category
|
|
|
-
|
|
|
- return None
|
|
|
-
|
|
|
- def _fuzzy_match_category(self, response: str) -> str:
|
|
|
- """
|
|
|
- 模糊匹配类别
|
|
|
-
|
|
|
- Args:
|
|
|
- response: LLM返回的文本
|
|
|
-
|
|
|
- Returns:
|
|
|
- 模糊匹配的类别名称
|
|
|
- """
|
|
|
- category_fragments = re.findall(r'[一二三四五六七八九十]、[^\n。,;\s]+', response)
|
|
|
-
|
|
|
- for fragment in category_fragments:
|
|
|
- fragment = fragment.strip()
|
|
|
- for category in self.CLASSIFICATION_STANDARDS.keys():
|
|
|
- if fragment in category or category.startswith(fragment):
|
|
|
- return category
|
|
|
-
|
|
|
- return None
|
|
|
-
|