|
|
@@ -81,13 +81,19 @@ class OutlineReviewer:
|
|
|
# 阶段1:一级大纲完整性审查(仅在有数据时执行)
|
|
|
if overall_outline and overall_outline.strip():
|
|
|
logger.info("启动阶段1:一级大纲完整性审查...")
|
|
|
- overall_task = self._overall_completeness_review(overall_outline, trace_id, state, stage_name)
|
|
|
+ # 创建Task对象
|
|
|
+ overall_task = asyncio.create_task(
|
|
|
+ self._overall_completeness_review(overall_outline, trace_id, state, stage_name)
|
|
|
+ )
|
|
|
tasks.append(("overall", overall_task))
|
|
|
|
|
|
# 阶段2:次级大纲逐项审查
|
|
|
if detailed_outline:
|
|
|
logger.info("启动阶段2:次级大纲逐项审查...")
|
|
|
- detailed_task = self._detailed_item_review(detailed_outline, trace_id, state, stage_name)
|
|
|
+ # 创建Task对象
|
|
|
+ detailed_task = asyncio.create_task(
|
|
|
+ self._detailed_item_review(detailed_outline, trace_id, state, stage_name)
|
|
|
+ )
|
|
|
tasks.append(("detailed", detailed_task))
|
|
|
|
|
|
# 处理空数据情况
|
|
|
@@ -101,48 +107,94 @@ class OutlineReviewer:
|
|
|
detailed_review_results = []
|
|
|
logger.warning("没有可执行的审查任务")
|
|
|
else:
|
|
|
- # 等待所有阶段完成
|
|
|
- try:
|
|
|
- results = await asyncio.gather(*[task for _, task in tasks], return_exceptions=True)
|
|
|
-
|
|
|
- # 处理结果
|
|
|
- overall_review_result = None
|
|
|
- detailed_review_results = []
|
|
|
-
|
|
|
- for i, (stage_name, _) in enumerate(tasks):
|
|
|
- result = results[i]
|
|
|
-
|
|
|
- if stage_name == "overall":
|
|
|
- if isinstance(result, Exception):
|
|
|
- logger.error(f"阶段1执行异常: {str(result)}")
|
|
|
- overall_review_result = {
|
|
|
- "success": False,
|
|
|
- "error_message": f"阶段1异常: {str(result)}",
|
|
|
- "overall_outline": overall_outline,
|
|
|
- "parsed_result": None
|
|
|
- }
|
|
|
- else:
|
|
|
- overall_review_result = result
|
|
|
- logger.info(f"阶段1完成,成功: {overall_review_result.get('success', False)}")
|
|
|
-
|
|
|
- elif stage_name == "detailed":
|
|
|
- if isinstance(result, Exception):
|
|
|
- logger.error(f"阶段2执行异常: {str(result)}")
|
|
|
- detailed_review_results = []
|
|
|
- else:
|
|
|
- detailed_review_results = result
|
|
|
- logger.info(f"阶段2完成,审查项目数: {len(detailed_review_results)}")
|
|
|
-
|
|
|
- logger.info("两阶段并发审查全部完成")
|
|
|
+ # 等待所有阶段完成 - 使用 asyncio.wait 替代 gather
|
|
|
+ # 整体超时:阶段1(60s) + 阶段2(基于任务数动态计算)
|
|
|
+ stage1_timeout = 60
|
|
|
+ stage2_timeout = (50 * len(detailed_outline) / 3) + 60 if detailed_outline else 0
|
|
|
+ total_timeout = max(stage1_timeout, stage2_timeout) + 30 # 并发执行,取最大值+缓冲
|
|
|
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"并发执行两阶段审查失败: {str(e)}")
|
|
|
- # 降级为串行处理
|
|
|
- logger.warning("降级为串行执行两阶段审查")
|
|
|
- if overall_outline and overall_outline.strip():
|
|
|
- overall_review_result = await self._overall_completeness_review(overall_outline, trace_id, state, stage_name)
|
|
|
- if detailed_outline:
|
|
|
- detailed_review_results = await self._detailed_item_review(detailed_outline, trace_id, state, stage_name)
|
|
|
+ logger.info(f"[大纲审查] 两阶段整体超时设置: {total_timeout:.0f}秒")
|
|
|
+
|
|
|
+ # 提取任务列表
|
|
|
+ task_list = [task for _, task in tasks]
|
|
|
+
|
|
|
+ done, pending = await asyncio.wait(
|
|
|
+ task_list,
|
|
|
+ timeout=total_timeout,
|
|
|
+ return_when=asyncio.ALL_COMPLETED
|
|
|
+ )
|
|
|
+
|
|
|
+ # 取消未完成的任务
|
|
|
+ for task in pending:
|
|
|
+ task.cancel()
|
|
|
+ logger.warning(f"[大纲审查] 阶段任务超时,已取消")
|
|
|
+
|
|
|
+ # 构建任务到阶段名称的映射(关键修复:asyncio.wait返回的done是无序集合)
|
|
|
+ task_to_stage = {task: stage_name for stage_name, task in tasks}
|
|
|
+
|
|
|
+ # 收集结果并按阶段名称分类
|
|
|
+ stage_results = {}
|
|
|
+ for task in done:
|
|
|
+ stage_name_key = task_to_stage.get(task)
|
|
|
+ try:
|
|
|
+ result = task.result()
|
|
|
+ logger.info(f"[大纲审查] {stage_name_key} task.result()返回, 类型: {type(result).__name__}")
|
|
|
+ if isinstance(result, dict):
|
|
|
+ logger.info(f"[大纲审查] result是字典, 包含键: {list(result.keys())}")
|
|
|
+ elif isinstance(result, list):
|
|
|
+ logger.info(f"[大纲审查] result是列表, 长度: {len(result)}")
|
|
|
+ stage_results[stage_name_key] = result
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ logger.error(f"[大纲审查] {stage_name_key} 阶段任务被取消")
|
|
|
+ stage_results[stage_name_key] = Exception("Stage task cancelled")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"[大纲审查] {stage_name_key} 阶段任务执行失败: {str(e)}", exc_info=True)
|
|
|
+ stage_results[stage_name_key] = e
|
|
|
+
|
|
|
+ # 处理pending任务
|
|
|
+ for task in pending:
|
|
|
+ stage_name_key = task_to_stage.get(task)
|
|
|
+ stage_results[stage_name_key] = Exception("Task not completed (timeout)")
|
|
|
+
|
|
|
+ # 处理结果
|
|
|
+ overall_review_result = None
|
|
|
+ detailed_review_results = []
|
|
|
+
|
|
|
+ for stage_name_key, _ in tasks:
|
|
|
+ result = stage_results.get(stage_name_key)
|
|
|
+ logger.info(f"[大纲审查] 处理阶段: stage_name={stage_name_key}, result类型={type(result).__name__ if result else 'None'}")
|
|
|
+
|
|
|
+ if stage_name_key == "overall":
|
|
|
+ if isinstance(result, Exception):
|
|
|
+ logger.error(f"阶段1执行异常: {str(result)}")
|
|
|
+ overall_review_result = {
|
|
|
+ "success": False,
|
|
|
+ "error_message": f"阶段1异常: {str(result)}",
|
|
|
+ "overall_outline": overall_outline,
|
|
|
+ "parsed_result": None
|
|
|
+ }
|
|
|
+ elif isinstance(result, dict):
|
|
|
+ overall_review_result = result
|
|
|
+ logger.info(f"阶段1完成,成功: {overall_review_result.get('success', False)}")
|
|
|
+ else:
|
|
|
+ # 处理意外的返回类型(如列表)
|
|
|
+ logger.error(f"阶段1返回了意外的类型: {type(result).__name__}")
|
|
|
+ overall_review_result = {
|
|
|
+ "success": False,
|
|
|
+ "error_message": f"阶段1返回了意外的类型: {type(result).__name__}",
|
|
|
+ "overall_outline": overall_outline,
|
|
|
+ "parsed_result": None
|
|
|
+ }
|
|
|
+
|
|
|
+ elif stage_name_key == "detailed":
|
|
|
+ if isinstance(result, Exception):
|
|
|
+ logger.error(f"阶段2执行异常: {str(result)}")
|
|
|
+ detailed_review_results = []
|
|
|
+ else:
|
|
|
+ detailed_review_results = result
|
|
|
+ logger.info(f"阶段2完成,审查项目数: {len(detailed_review_results)}")
|
|
|
+
|
|
|
+ logger.info("两阶段并发审查全部完成")
|
|
|
|
|
|
# 返回完整结果
|
|
|
return {
|
|
|
@@ -179,78 +231,94 @@ class OutlineReviewer:
|
|
|
Returns:
|
|
|
一级大纲审查结果
|
|
|
"""
|
|
|
- if not overall_outline or not overall_outline.strip():
|
|
|
- logger.warning("一级大纲为空或仅包含空白字符")
|
|
|
- return {
|
|
|
- "success": False,
|
|
|
- "error_message": "一级大纲为空,无法进行完整性审查",
|
|
|
- "overall_outline": overall_outline,
|
|
|
- "parsed_result": None
|
|
|
- }
|
|
|
+ try:
|
|
|
+ if not overall_outline or not overall_outline.strip():
|
|
|
+ logger.warning("一级大纲为空或仅包含空白字符")
|
|
|
+ return {
|
|
|
+ "success": False,
|
|
|
+ "error_message": "一级大纲为空,无法进行完整性审查",
|
|
|
+ "overall_outline": overall_outline,
|
|
|
+ "parsed_result": None
|
|
|
+ }
|
|
|
|
|
|
- logger.info("执行一级大纲完整性审查...")
|
|
|
+ logger.info("执行一级大纲完整性审查...")
|
|
|
|
|
|
- # 构建提示词参数
|
|
|
- prompt_kwargs = {}
|
|
|
- prompt_kwargs["review_content"] = overall_outline
|
|
|
+ # 构建提示词参数
|
|
|
+ prompt_kwargs = {}
|
|
|
+ prompt_kwargs["review_content"] = overall_outline
|
|
|
|
|
|
- # 获取一级大纲审查提示词模板
|
|
|
- task_prompt = self.prompt_loader.get_prompt_template(
|
|
|
- self.reviewer_type,
|
|
|
- "overall_outline_completeness_review",
|
|
|
- **prompt_kwargs
|
|
|
- )
|
|
|
+ # 获取一级大纲审查提示词模板
|
|
|
+ task_prompt = self.prompt_loader.get_prompt_template(
|
|
|
+ self.reviewer_type,
|
|
|
+ "overall_outline_completeness_review",
|
|
|
+ **prompt_kwargs
|
|
|
+ )
|
|
|
|
|
|
- task_prompt_info = {
|
|
|
- "task_prompt": task_prompt,
|
|
|
- "task_name": "一级大纲完整性审查"
|
|
|
- }
|
|
|
+ task_prompt_info = {
|
|
|
+ "task_prompt": task_prompt,
|
|
|
+ "task_name": "一级大纲完整性审查"
|
|
|
+ }
|
|
|
|
|
|
- # 调用模型进行审查
|
|
|
- model_response = await self.model_client.get_model_generate_invoke(
|
|
|
- trace_id=trace_id,
|
|
|
- task_prompt_info=task_prompt_info
|
|
|
- )
|
|
|
+ # 调用模型进行审查 - 大纲审查设置90秒超时
|
|
|
+ model_response = await self.model_client.get_model_generate_invoke(
|
|
|
+ trace_id=trace_id,
|
|
|
+ task_prompt_info=task_prompt_info,
|
|
|
+ timeout=90
|
|
|
+ )
|
|
|
|
|
|
- response_text = model_response
|
|
|
- # 直接提取JSON数据,避免关键词误判
|
|
|
- json_data = self.inter_tool._extract_json_data(response_text)
|
|
|
- overall_completeness_result = []
|
|
|
+ response_text = model_response
|
|
|
+ # 直接提取JSON数据,避免关键词误判
|
|
|
+ json_data = self.inter_tool._extract_json_data(response_text)
|
|
|
+ overall_completeness_result = []
|
|
|
+
|
|
|
+ if json_data and isinstance(json_data, list):
|
|
|
+ for item in json_data:
|
|
|
+ overall_completeness_result.append(self.inter_tool._create_issue_item(item, "completeness_check"))
|
|
|
+ elif json_data and isinstance(json_data, dict):
|
|
|
+ overall_completeness_result.append(self.inter_tool._create_issue_item(json_data, "completeness_check"))
|
|
|
+ #filtered_issues = [r for r in overall_completeness_result if self._is_non_compliant_item(r)]
|
|
|
+ # 只统计exist_issue为true的项目数量
|
|
|
+ issue_count = sum(1 for item in overall_completeness_result if item.get('exist_issue', False))
|
|
|
+ message=f"一级大纲完整性审查完成,发现 {issue_count} 个问题",
|
|
|
+ if issue_count == 0:
|
|
|
+ message = "一级大纲完整性审查已通过,未发现缺失项"
|
|
|
|
|
|
- if json_data and isinstance(json_data, list):
|
|
|
- for item in json_data:
|
|
|
- overall_completeness_result.append(self.inter_tool._create_issue_item(item, "completeness_check"))
|
|
|
- elif json_data and isinstance(json_data, dict):
|
|
|
- overall_completeness_result.append(self.inter_tool._create_issue_item(json_data, "completeness_check"))
|
|
|
- #filtered_issues = [r for r in overall_completeness_result if self._is_non_compliant_item(r)]
|
|
|
- # 只统计exist_issue为true的项目数量
|
|
|
- issue_count = sum(1 for item in overall_completeness_result if item.get('exist_issue', False))
|
|
|
- message=f"一级大纲完整性审查完成,发现 {issue_count} 个问题",
|
|
|
- if issue_count == 0:
|
|
|
- message = "一级大纲完整性审查已通过,未发现缺失项"
|
|
|
-
|
|
|
- if state and state.get("progress_manager"):
|
|
|
- # 使用try-catch确保SSE推送失败不会影响主流程
|
|
|
- try:
|
|
|
- await state["progress_manager"].update_stage_progress(
|
|
|
- callback_task_id=state["callback_task_id"],
|
|
|
- stage_name=f"{stage_name} - 阶段1:一级大纲完整性审查",
|
|
|
- current=None, # 明确不更新current,保持主流程进度
|
|
|
- status="processing",
|
|
|
- message=message,
|
|
|
- issues=overall_completeness_result,
|
|
|
- event_type="processing" # 使用专门的事件类型
|
|
|
- )
|
|
|
- logger.info("SSE推送成功: 一级大纲完整性审查完成")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"SSE推送失败: 一级大纲完整性审查, 错误: {str(e)}")
|
|
|
- # 不抛出异常,避免影响主流程
|
|
|
+ if state and state.get("progress_manager"):
|
|
|
+ # 使用try-catch确保SSE推送失败不会影响主流程
|
|
|
+ try:
|
|
|
+ # 【修复】明确传递issues的副本,避免变量混淆
|
|
|
+ issues_copy = list(overall_completeness_result) if overall_completeness_result else []
|
|
|
+ await state["progress_manager"].update_stage_progress(
|
|
|
+ callback_task_id=state["callback_task_id"],
|
|
|
+ stage_name=f"{stage_name} - 阶段1:一级大纲完整性审查",
|
|
|
+ current=None, # 明确不更新current,保持主流程进度
|
|
|
+ status="processing",
|
|
|
+ message=message,
|
|
|
+ issues=issues_copy,
|
|
|
+ event_type="processing" # 使用专门的事件类型
|
|
|
+ )
|
|
|
+ logger.info("SSE推送成功: 一级大纲完整性审查完成")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"SSE推送失败: 一级大纲完整性审查, 错误: {str(e)}")
|
|
|
+ # 不抛出异常,避免影响主流程
|
|
|
|
|
|
- return {
|
|
|
- "success": True,
|
|
|
- "overall_outline": overall_outline,
|
|
|
- "parsed_result": overall_completeness_result
|
|
|
- }
|
|
|
+ # 【调试】明确返回字典,使用新的变量名避免混淆
|
|
|
+ final_result = {
|
|
|
+ "success": True,
|
|
|
+ "overall_outline": overall_outline,
|
|
|
+ "parsed_result": overall_completeness_result
|
|
|
+ }
|
|
|
+ logger.info(f"[大纲审查-阶段1] 准备返回final_result, 类型: {type(final_result).__name__}, 包含键: {list(final_result.keys())}")
|
|
|
+ return final_result
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"一级大纲完整性审查异常: {str(e)}", exc_info=True)
|
|
|
+ return {
|
|
|
+ "success": False,
|
|
|
+ "error_message": f"一级大纲完整性审查异常: {str(e)}",
|
|
|
+ "overall_outline": overall_outline,
|
|
|
+ "parsed_result": None
|
|
|
+ }
|
|
|
|
|
|
async def _detailed_item_review(self, detailed_outline: list, trace_id: str,state, stage_name) -> list:
|
|
|
"""
|
|
|
@@ -276,22 +344,46 @@ class OutlineReviewer:
|
|
|
|
|
|
logger.info(f"开始次级大纲并发审查,有效项目数量: {len(valid_items)}")
|
|
|
|
|
|
- # 创建并发审查任务
|
|
|
- semaphore = asyncio.Semaphore(20) # 限制并发数为5,避免过载
|
|
|
+ # 创建并发审查任务 - 降低并发数避免模型服务过载
|
|
|
+ semaphore = asyncio.Semaphore(3) # 限制并发数为3,避免过载
|
|
|
+
|
|
|
tasks = []
|
|
|
|
|
|
for i, outline_item in valid_items:
|
|
|
- task = self._concurrent_single_review(i, outline_item, trace_id, semaphore, state, stage_name)
|
|
|
+ # 只用信号量控制并发,不添加外层wait_for(避免双重超时控制)
|
|
|
+ task = asyncio.create_task(
|
|
|
+ self._concurrent_single_review(i, outline_item, trace_id, semaphore, state, stage_name)
|
|
|
+ )
|
|
|
tasks.append(task)
|
|
|
|
|
|
- # 等待所有任务完成
|
|
|
- try:
|
|
|
- results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
- logger.info(f"并发审查完成,总任务数: {len(tasks)}")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"并发审查失败: {str(e)}")
|
|
|
- # 如果并发失败,降级为串行处理
|
|
|
- return await self._fallback_sequential_review(valid_items, trace_id,state, stage_name)
|
|
|
+ # 使用 asyncio.wait 提供超时控制
|
|
|
+ # 整体超时:每个任务预计最多48秒(15×3+0.5+1+2),乘以任务数的1/3(并发数为3)
|
|
|
+ estimated_time_per_task = 50 # 秒
|
|
|
+ total_timeout = (estimated_time_per_task * len(tasks) / 3) + 60 # 加60秒缓冲
|
|
|
+
|
|
|
+ logger.info(f"[大纲审查] 设置整体超时: {total_timeout:.0f}秒,任务数: {len(tasks)}")
|
|
|
+
|
|
|
+ done, pending = await asyncio.wait(tasks, timeout=total_timeout)
|
|
|
+
|
|
|
+ # 取消未完成的任务
|
|
|
+ for task in pending:
|
|
|
+ task.cancel()
|
|
|
+ logger.warning(f"[大纲审查] 任务超时,已取消")
|
|
|
+
|
|
|
+ # 收集结果
|
|
|
+ results = []
|
|
|
+ for task in done:
|
|
|
+ try:
|
|
|
+ result = task.result()
|
|
|
+ results.append(result)
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ logger.error(f"[大纲审查] 任务被取消")
|
|
|
+ results.append(Exception("Task cancelled"))
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"[大纲审查] 任务执行失败: {str(e)}", exc_info=True)
|
|
|
+ results.append(e)
|
|
|
+
|
|
|
+ logger.info(f"并发审查完成,总任务数: {len(tasks)}, 成功: {len(done)}, 超时: {len(pending)}")
|
|
|
|
|
|
# 处理结果
|
|
|
detailed_review_results = []
|
|
|
@@ -431,10 +523,11 @@ class OutlineReviewer:
|
|
|
"task_name": f"单项大纲完整性审查-{category}"
|
|
|
}
|
|
|
|
|
|
- # 调用模型进行审查
|
|
|
+ # 调用模型进行审查 - 大纲审查设置90秒超时
|
|
|
model_response = await self.model_client.get_model_generate_invoke(
|
|
|
trace_id=f"{trace_id}_item_{item_index}",
|
|
|
- task_prompt_info=task_prompt_info
|
|
|
+ task_prompt_info=task_prompt_info,
|
|
|
+ timeout=90
|
|
|
)
|
|
|
|
|
|
response_text = model_response
|
|
|
@@ -513,10 +606,11 @@ class OutlineReviewer:
|
|
|
"task_name": "章节目录分类器"
|
|
|
}
|
|
|
|
|
|
- # 调用模型
|
|
|
+ # 调用模型 - 大纲审查设置90秒超时
|
|
|
model_response = await self.model_client.get_model_generate_invoke(
|
|
|
trace_id=trace_id,
|
|
|
- task_prompt_info=task_prompt_info
|
|
|
+ task_prompt_info=task_prompt_info,
|
|
|
+ timeout=90
|
|
|
)
|
|
|
|
|
|
# 提取分类结果
|