|
|
@@ -54,6 +54,10 @@ class TextProcessor:
|
|
|
if not text:
|
|
|
return []
|
|
|
|
|
|
+ # 检查是否是"无明显问题"
|
|
|
+ if "无明显问题" in text or "无明显" in text:
|
|
|
+ return []
|
|
|
+
|
|
|
try:
|
|
|
obj = json.loads(text.strip())
|
|
|
if isinstance(obj, list):
|
|
|
@@ -81,7 +85,7 @@ class ResultFormatter:
|
|
|
name = obj.get("name", "")
|
|
|
is_standard = obj.get("is_standard", False)
|
|
|
status = obj.get("status", "")
|
|
|
- meg = obj.get("meg", "")
|
|
|
+ meg = obj.get("meg", "") # 注意:这里应该是meg字段
|
|
|
|
|
|
if isinstance(is_standard, str):
|
|
|
is_standard = is_standard.strip().lower() in ("true", "1", "yes")
|
|
|
@@ -121,13 +125,16 @@ class MessageBuilder:
|
|
|
])
|
|
|
|
|
|
try:
|
|
|
+ # 强制重新加载提示词,避免缓存问题
|
|
|
template = self.prompt_loader.get_prompt_template(
|
|
|
reviewer_type="prep_basis",
|
|
|
- prompt_name="basis_status_check"
|
|
|
+ prompt_name="basis_status_check",
|
|
|
+ force_reload=True # 强制重新加载
|
|
|
)
|
|
|
|
|
|
# 验证返回的是ChatPromptTemplate对象
|
|
|
if hasattr(template, 'format_messages'):
|
|
|
+ logger.info(f"成功加载编制依据审查提示词")
|
|
|
return template
|
|
|
else:
|
|
|
logger.warning(f" PromptLoader返回了意外类型: {type(template)}")
|
|
|
@@ -311,8 +318,6 @@ class LLMReviewClient:
|
|
|
|
|
|
if not trace_id:
|
|
|
trace_id = f"prep_basis_review_{int(time.time())}"
|
|
|
-
|
|
|
- # ==================== 修复开始 ====================
|
|
|
try:
|
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
|
|
|
|
@@ -329,10 +334,7 @@ class LLMReviewClient:
|
|
|
|
|
|
# 情况2: 如果传入的是 List (消息列表)
|
|
|
elif isinstance(prompt_template, list):
|
|
|
- # 将 List 包装回 ChatPromptTemplate 对象
|
|
|
- # 这样底层调用 .format_messages() 时就不会报错了
|
|
|
final_prompt_obj = ChatPromptTemplate.from_messages(prompt_template)
|
|
|
- # 如果列表中包含未格式化的变量(罕见情况),尝试 partial,否则忽略
|
|
|
try:
|
|
|
final_prompt_obj = final_prompt_obj.partial(review_content=user_content)
|
|
|
except Exception:
|
|
|
@@ -505,68 +507,100 @@ class BasisReviewService:
|
|
|
batch = items[i:i + 3]
|
|
|
batches.append(batch)
|
|
|
|
|
|
- # 逐个批次执行,每个批次完成后立即推送SSE
|
|
|
- processed_results = []
|
|
|
- total_items = 0
|
|
|
- standard_items = 0
|
|
|
- successful_batches = 0
|
|
|
-
|
|
|
- for i, batch in enumerate(batches):
|
|
|
+ # 异步并发执行所有批次,使用回调处理SSE推送
|
|
|
+ async def process_batch_with_callback(batch_index: int, batch: List[str]) -> List[Dict[str, Any]]:
|
|
|
+ """处理单个批次并执行SSE回调"""
|
|
|
try:
|
|
|
# 执行单个批次审查
|
|
|
result = await self.review_batch(batch, collection_name)
|
|
|
- processed_results.append(result)
|
|
|
- successful_batches += 1
|
|
|
|
|
|
# 统计当前批次结果
|
|
|
batch_standard_count = 0
|
|
|
for item in result:
|
|
|
- total_items += 1
|
|
|
if isinstance(item, dict) and item.get('is_standard', False):
|
|
|
- standard_items += 1
|
|
|
batch_standard_count += 1
|
|
|
|
|
|
# 立即推送当前批次完成的SSE消息
|
|
|
- logger.info("应触发SSE消息")
|
|
|
+ logger.info(f"批次{batch_index + 1}完成,准备推送SSE")
|
|
|
if progress_manager and callback_task_id:
|
|
|
- logger.info("已触发SSE消息")
|
|
|
try:
|
|
|
- progress_percent = int((i + 1) / total_batches * 100)
|
|
|
+ progress_percent = int((batch_index + 1) / total_batches * 100)
|
|
|
await progress_manager.update_stage_progress(
|
|
|
callback_task_id=callback_task_id,
|
|
|
- stage_name=f"编制依据审查-批次{i + 1}",
|
|
|
+ stage_name=f"编制依据审查-批次{batch_index + 1}",
|
|
|
current=progress_percent,
|
|
|
status="processing",
|
|
|
- message=f"完成第{i + 1}/{total_batches}批次编制依据审查,{len(batch)}项,其中{batch_standard_count}项为标准",
|
|
|
+ message=f"完成第{batch_index + 1}/{total_batches}批次编制依据审查,{len(batch)}项,其中{batch_standard_count}项为标准",
|
|
|
overall_task_status="processing",
|
|
|
event_type="processing",
|
|
|
issues=result # 推送该批次的审查结果
|
|
|
)
|
|
|
+ logger.info(f"批次{batch_index + 1} SSE推送成功")
|
|
|
except Exception as e:
|
|
|
- logger.error(f"SSE推送批次{i + 1}结果失败: {e}")
|
|
|
+ logger.error(f"SSE推送批次{batch_index + 1}结果失败: {e}")
|
|
|
+
|
|
|
+ return result
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f" 批次 {i} 处理失败: {e}")
|
|
|
+ logger.error(f" 批次 {batch_index} 处理失败: {e}")
|
|
|
error_result = [{"name": name, "is_standard": False, "status": "", "meg": f"批次处理失败: {str(e)}"}
|
|
|
for name in batch]
|
|
|
- processed_results.append(error_result)
|
|
|
|
|
|
# 即使失败也要推送结果
|
|
|
if progress_manager and callback_task_id:
|
|
|
try:
|
|
|
- progress_percent = int((i + 1) / total_batches * 100)
|
|
|
+ progress_percent = int((batch_index + 1) / total_batches * 100)
|
|
|
await progress_manager.update_stage_progress(
|
|
|
callback_task_id=callback_task_id,
|
|
|
- stage_name=f"编制依据审查-批次{i + 1}",
|
|
|
+ stage_name=f"编制依据审查-批次{batch_index + 1}",
|
|
|
current=progress_percent,
|
|
|
status="processing",
|
|
|
- message=f"第{i + 1}/{total_batches}批次处理失败",
|
|
|
+ message=f"第{batch_index + 1}/{total_batches}批次处理失败",
|
|
|
overall_task_status="processing",
|
|
|
event_type="processing",
|
|
|
issues=error_result
|
|
|
)
|
|
|
except Exception as push_e:
|
|
|
- logger.error(f"SSE推送失败批次{i + 1}结果失败: {push_e}")
|
|
|
+ logger.error(f"SSE推送失败批次{batch_index + 1}结果失败: {push_e}")
|
|
|
+
|
|
|
+ return error_result
|
|
|
+
|
|
|
+ # 创建所有批次的异步任务
|
|
|
+ batch_tasks = []
|
|
|
+ for i, batch in enumerate(batches):
|
|
|
+ task = process_batch_with_callback(i, batch)
|
|
|
+ batch_tasks.append(task)
|
|
|
+
|
|
|
+ # 并发执行所有批次
|
|
|
+ logger.info(f"开始并发执行{total_batches}个批次编制依据审查")
|
|
|
+ processed_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
|
|
|
+
|
|
|
+ # 处理异常结果并统计
|
|
|
+ total_items = 0
|
|
|
+ standard_items = 0
|
|
|
+ successful_batches = 0
|
|
|
+
|
|
|
+ # 重新构建结果列表,过滤异常
|
|
|
+ final_results = []
|
|
|
+ for i, result in enumerate(processed_results):
|
|
|
+ if isinstance(result, Exception):
|
|
|
+ logger.error(f" 批次 {i} 返回异常: {result}")
|
|
|
+ error_batch = batches[i] if i < len(batches) else []
|
|
|
+ error_result = [{"name": name, "is_standard": False, "status": "", "meg": f"批次异常: {str(result)}"}
|
|
|
+ for name in error_batch]
|
|
|
+ final_results.append(error_result)
|
|
|
+ else:
|
|
|
+ final_results.append(result)
|
|
|
+ successful_batches += 1
|
|
|
+
|
|
|
+ # 统计总结果
|
|
|
+ for result in final_results:
|
|
|
+ for item in result:
|
|
|
+ total_items += 1
|
|
|
+ if isinstance(item, dict) and item.get('is_standard', False):
|
|
|
+ standard_items += 1
|
|
|
+
|
|
|
+ logger.info(f"并发执行完成,成功批次: {successful_batches}/{total_batches}")
|
|
|
|
|
|
|
|
|
# 发送完成审查的SSE推送
|
|
|
@@ -587,7 +621,7 @@ class BasisReviewService:
|
|
|
|
|
|
logger.info(f" 异步审查完成,耗时: {elapsed_time:.4f} 秒")
|
|
|
logger.info(f" 总编制依据: {total_items}, 标准项: {standard_items}, 成功批次: {successful_batches}/{total_batches}")
|
|
|
- return processed_results
|
|
|
+ return final_results
|
|
|
|
|
|
|
|
|
# 便捷函数
|