|
|
@@ -400,6 +400,8 @@ class AIReviewCoreFun:
|
|
|
self.ai_review_engine = ai_review_engine
|
|
|
self.max_review_units = max_review_units
|
|
|
self.review_mode = review_mode
|
|
|
+ # 添加消息推送锁,确保事件顺序发送
|
|
|
+ self.message_lock = asyncio.Lock()
|
|
|
|
|
|
|
|
|
async def _execute_concurrent_reviews(self, review_chunks: List[Dict[str, Any]],
|
|
|
@@ -443,8 +445,7 @@ class AIReviewCoreFun:
|
|
|
current = int(((unit_index + 1) / total_units) * 100)
|
|
|
|
|
|
# 立即发送单元审查详情(包含unit_review和processing_flag事件)
|
|
|
- issues_count = await self._send_unit_review_progress(state, unit_index, total_units, section_label, issues, current)
|
|
|
- #await self._send_unit_overall_progress(state, unit_index, total_units, section_label, issues_count)
|
|
|
+ await self._send_unit_review_progress(state, unit_index, total_units, section_label, issues, current)
|
|
|
|
|
|
else:
|
|
|
logger.error(f"执行单个单元审查失败: {str(result.error_message)}")
|
|
|
@@ -467,16 +468,10 @@ class AIReviewCoreFun:
|
|
|
logger.error(f"执行并发审查失败: {str(e)}")
|
|
|
return []
|
|
|
|
|
|
- async def _prepare_review_units(self, state: AIReviewState) -> tuple:
|
|
|
- """
|
|
|
- 准备审查单元数据
|
|
|
|
|
|
- Args:
|
|
|
- state: AI审查状态
|
|
|
|
|
|
- Returns:
|
|
|
- tuple: (review_chunks, total_units, total_all_units)
|
|
|
- """
|
|
|
+ async def _prepare_review_units(self, state: AIReviewState) -> tuple:
|
|
|
+ """准备审查单元数据 (增加清理旧进度缓存)"""
|
|
|
try:
|
|
|
# 筛选要审查的单元
|
|
|
all_chunks = state['structured_content']['chunks']
|
|
|
@@ -485,6 +480,17 @@ class AIReviewCoreFun:
|
|
|
total_units = len(review_chunks)
|
|
|
total_all_units = len(all_chunks)
|
|
|
|
|
|
+ # 【修复 3】: 任务开始前,清理 Redis 中的旧计数,防止进度条计算错误
|
|
|
+ try:
|
|
|
+ task_id = state.get("callback_task_id", "")
|
|
|
+ if task_id:
|
|
|
+ redis_client = await RedisConnectionFactory.get_connection()
|
|
|
+ completed_key = f"ai_review:overall_task_progress:{task_id}:completed"
|
|
|
+ await redis_client.delete(completed_key)
|
|
|
+ logger.info(f"已清理旧进度缓存: {completed_key}")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"清理进度缓存失败 (不影响主流程): {str(e)}")
|
|
|
+
|
|
|
logger.info(f"AI审查开始: 总单元数 {total_all_units}, 实际审查 {total_units} 个单元")
|
|
|
return review_chunks, total_units, total_all_units
|
|
|
except Exception as e:
|
|
|
@@ -607,69 +613,15 @@ class AIReviewCoreFun:
|
|
|
except Exception as e:
|
|
|
logger.warning(f"发送开始进度更新失败: {str(e)}")
|
|
|
|
|
|
- # async def _send_unit_review_progress(self, state: AIReviewState, unit_index: int,
|
|
|
- # total_units: int, section_label: str,
|
|
|
- # issues: List[Dict], current: int) -> None:
|
|
|
- # """
|
|
|
- # 发送单元审查详细信息
|
|
|
-
|
|
|
- # Args:
|
|
|
- # state: AI审查状态
|
|
|
- # unit_index: 单元索引
|
|
|
- # total_units: 总单元数
|
|
|
- # section_label: 章节标签
|
|
|
- # issues: 问题列表
|
|
|
- # current: 当前进度
|
|
|
- # """
|
|
|
-
|
|
|
-
|
|
|
- # try:
|
|
|
-
|
|
|
- # if state["progress_manager"]:
|
|
|
- # # 计算问题数量(可以为0)
|
|
|
- # issues_count = 0
|
|
|
- # if isinstance(issues, list) and issues:
|
|
|
- # issues_count = sum(
|
|
|
- # 1 for issue in issues
|
|
|
- # for issue_data in issue.values()
|
|
|
- # for review_item in issue_data.get("review_lists", [])
|
|
|
- # if review_item.get("exist_issue", False)
|
|
|
- # )
|
|
|
-
|
|
|
- # # 🔧 修复:unit_review事件只在有问题时发送
|
|
|
- # if issues_count > 0:
|
|
|
- # stage_name = f"AI审查:{section_label}"
|
|
|
- # await state["progress_manager"].update_stage_progress(
|
|
|
- # callback_task_id=state["callback_task_id"],
|
|
|
- # stage_name=stage_name,
|
|
|
- # current=current,
|
|
|
- # status="unit_review_update",
|
|
|
- # message=f"发现{issues_count}个问题: {section_label}",
|
|
|
- # issues=issues,
|
|
|
- # user_id=state.get("user_id", ""),
|
|
|
- # overall_task_status="processing",
|
|
|
- # event_type="unit_review"
|
|
|
- # )
|
|
|
-
|
|
|
- # # 清空当前issues
|
|
|
- # await state["progress_manager"].update_stage_progress(
|
|
|
- # callback_task_id=state["callback_task_id"],
|
|
|
- # issues=['clear']
|
|
|
- # )
|
|
|
-
|
|
|
- # return issues_count
|
|
|
- # except Exception as e:
|
|
|
- # logger.warning(f"发送单元审查详情失败: {str(e)}")
|
|
|
-
|
|
|
-
|
|
|
async def _send_unit_review_progress(self, state: AIReviewState, unit_index: int,
|
|
|
total_units: int, section_label: str,
|
|
|
issues: List[Dict], current: int) -> None:
|
|
|
- """
|
|
|
- 发送单元审查详细信息
|
|
|
- """
|
|
|
+ """
|
|
|
+ 发送单元审查详细信息 - 强制串行并统一进度值
|
|
|
+ """
|
|
|
+ async with self.message_lock:
|
|
|
try:
|
|
|
- # 1. 计算问题数量 (即使 issues 为空,这里算出来是 0 也是安全的)
|
|
|
+ # 1. 计算问题数量
|
|
|
issues_count = 0
|
|
|
if isinstance(issues, list) and issues:
|
|
|
issues_count = sum(
|
|
|
@@ -679,19 +631,29 @@ class AIReviewCoreFun:
|
|
|
if review_item.get("exist_issue", False)
|
|
|
)
|
|
|
|
|
|
- # 【修复点】:无论是否有问题,都必须发送全局进度更新
|
|
|
- # 这保证了 Redis 计数正确,且进度条能走动
|
|
|
- await self._send_unit_overall_progress(state, unit_index, total_units, section_label, issues_count)
|
|
|
+ # 2. 发送全局进度 (Processing Flag)
|
|
|
+ # 【修复 1】: 获取 Redis 计算出的真实进度 (real_current)
|
|
|
+ # 即使这里传入了 current (基于 index),我们优先使用 Redis 返回的真实完成度
|
|
|
+ real_current = await self._send_unit_overall_progress(
|
|
|
+ state, unit_index, total_units, section_label, issues_count
|
|
|
+ )
|
|
|
+
|
|
|
+ # 如果 Redis 失败返回 None,则回退使用传入的 current
|
|
|
+ final_current = real_current if real_current is not None else current
|
|
|
+
|
|
|
+ # 【修复 2】: 强制休眠 50ms。
|
|
|
+ # 这解决了 "Client didn't synchronously receive" 的问题。
|
|
|
+ # 它迫使网络缓冲区刷新,并给前端 UI 渲染循环留出时间处理上一个 Flag 事件。
|
|
|
+ await asyncio.sleep(0.05)
|
|
|
|
|
|
- # 2. 只有当确实存在问题时,才发送单元详情 (unit_review 事件)
|
|
|
+ # 3. 发送单元详情 (Unit Review)
|
|
|
if isinstance(issues, list) and issues and state["progress_manager"]:
|
|
|
stage_name = f"AI审查:{section_label}"
|
|
|
-
|
|
|
- # 同步等待进度消息推送完成
|
|
|
+
|
|
|
await state["progress_manager"].update_stage_progress(
|
|
|
callback_task_id=state["callback_task_id"],
|
|
|
stage_name=stage_name,
|
|
|
- current=current, # 注意:这里的 current 是基于 index 的,可能与 overall 中的基于 count 的有差异
|
|
|
+ current=final_current, # 【关键】使用与 Flag 事件完全一致的进度值
|
|
|
status="unit_review_update",
|
|
|
message=f"发现{issues_count}个问题: {section_label}",
|
|
|
issues=issues,
|
|
|
@@ -700,82 +662,80 @@ class AIReviewCoreFun:
|
|
|
event_type="unit_review"
|
|
|
)
|
|
|
|
|
|
+ # 再次微小延迟,确保 Clear 不会吞掉 Review
|
|
|
+ await asyncio.sleep(0.02)
|
|
|
+
|
|
|
# 清空当前issues
|
|
|
await state["progress_manager"].update_stage_progress(
|
|
|
callback_task_id=state["callback_task_id"],
|
|
|
issues=['clear']
|
|
|
)
|
|
|
+
|
|
|
except Exception as e:
|
|
|
logger.warning(f"发送单元审查详情失败: {str(e)}")
|
|
|
|
|
|
async def _send_unit_overall_progress(self, state: AIReviewState, unit_index: int,
|
|
|
total_units: int, section_label: str,
|
|
|
- issues_count: int =None) -> None:
|
|
|
+ issues_count: int = None) -> Optional[int]:
|
|
|
"""
|
|
|
- 发送单元完成进度更新 - 基于Redis分布式计数(多任务安全)
|
|
|
-
|
|
|
- Args:
|
|
|
- state: AI审查状态
|
|
|
- unit_index: 单元索引
|
|
|
- total_units: 总单元数
|
|
|
- section_label: 章节标签
|
|
|
- issues_count: 问题数量
|
|
|
+ 发送单元完成进度更新 - 返回计算出的实时进度
|
|
|
+ Returns:
|
|
|
+ int: 基于 Redis 统计的实时进度百分比
|
|
|
"""
|
|
|
+ current_percent = None
|
|
|
try:
|
|
|
- # 获取任务ID
|
|
|
task_id = state.get("callback_task_id", "")
|
|
|
-
|
|
|
- # 尝试使用Redis进行分布式计数
|
|
|
redis_client = None
|
|
|
try:
|
|
|
redis_client = await RedisConnectionFactory.get_connection()
|
|
|
except Exception as e:
|
|
|
- logger.warning(f"Redis连接失败,使用降级方案: {str(e)}")
|
|
|
+ logger.warning(f"Redis连接失败: {str(e)}")
|
|
|
|
|
|
+ completed_count = 0
|
|
|
+
|
|
|
if redis_client and task_id:
|
|
|
- # 使用Redis分布式计数,避免多任务间的变量混淆
|
|
|
completed_key = f"ai_review:overall_task_progress:{task_id}:completed"
|
|
|
-
|
|
|
- # 原子操作:添加单元索引到Redis集合(自动去重)
|
|
|
+ # 原子操作
|
|
|
await redis_client.sadd(completed_key, str(unit_index))
|
|
|
- await redis_client.expire(completed_key, 3600) # 1小时过期,防止内存泄漏
|
|
|
-
|
|
|
- # 获取实际完成的单元数量
|
|
|
+ await redis_client.expire(completed_key, 3600)
|
|
|
completed_count = await redis_client.scard(completed_key)
|
|
|
-
|
|
|
- # 基于实际完成数量计算进度,避免并发乱序导致的进度倒退
|
|
|
- current = int((completed_count / total_units) * 100)
|
|
|
-
|
|
|
- logger.info(f"Redis分布式进度更新: 任务{task_id} 完成数量{completed_count}/{total_units} 进度{current}%")
|
|
|
+
|
|
|
+ # 计算进度
|
|
|
+ current_percent = int((completed_count / total_units) * 100)
|
|
|
else:
|
|
|
- # Redis连接失败时的降级方案(仅在单任务模式下安全)
|
|
|
- logger.warning("Redis连接失败,使用降级进度计算方案(可能存在并发问题)")
|
|
|
- current = int(((unit_index + 1) / total_units) * 100)
|
|
|
+ # 降级方案
|
|
|
completed_count = unit_index + 1
|
|
|
+ current_percent = int((completed_count / total_units) * 100)
|
|
|
|
|
|
- # 构建完成消息
|
|
|
- if issues_count > 0:
|
|
|
+ # 构建消息
|
|
|
+ if issues_count is not None and issues_count > 0:
|
|
|
message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
|
|
|
else:
|
|
|
message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}"
|
|
|
|
|
|
- logger.info(f"单元审查完成,更新进度: {current}% {message}")
|
|
|
+ logger.info(f"进度更新: {current_percent}% - {message}")
|
|
|
|
|
|
- # 发送processing_flag事件
|
|
|
if state["progress_manager"]:
|
|
|
- logger.info(f"发送processing_flag事件: current={current}, message={message}")
|
|
|
await state["progress_manager"].update_stage_progress(
|
|
|
callback_task_id=state["callback_task_id"],
|
|
|
stage_name="AI审查",
|
|
|
- current=current,
|
|
|
+ current=current_percent,
|
|
|
status="processing",
|
|
|
message=message,
|
|
|
user_id=state.get("user_id", ""),
|
|
|
overall_task_status="processing",
|
|
|
event_type="processing_flag"
|
|
|
)
|
|
|
+
|
|
|
+ return current_percent
|
|
|
+
|
|
|
except Exception as e:
|
|
|
logger.warning(f"发送单元完成进度更新失败: {str(e)}")
|
|
|
+ # 发生异常时,尝试返回一个基于 index 的估算值
|
|
|
+ try:
|
|
|
+ return int(((unit_index + 1) / total_units) * 100)
|
|
|
+ except:
|
|
|
+ return 0
|
|
|
|
|
|
def _format_review_results_to_issues(self,callback_task_id: str, unit_index: int, review_location_label: str,
|
|
|
unit_content: Dict[str, Any], basic_result: Dict[str, Any],
|