Răsfoiți Sursa

v0.0.3-解决第一个processing_flag丢失问题2.0

WangXuMing 2 luni în urmă
părinte
comite
132bb9c184

+ 1 - 1
config/config.ini

@@ -1,7 +1,7 @@
 
 
 [model]
-MODEL_TYPE=lq_qwen3_8b
+MODEL_TYPE=gemini
 
 
 

+ 3 - 1
core/base/progress_manager.py

@@ -1,4 +1,5 @@
 import json
+import time
 import asyncio
 from typing import Dict, Any, Optional
 from datetime import datetime
@@ -289,7 +290,8 @@ class ProgressManager:
 
     async def complete_task(self, callback_task_id: str, user_id: str = None, current_data: dict = None):
         """标记任务完成 - 使用单一同步强制关闭逻辑"""
-
+        # logger.info(f"审查任务已完成,任务准备关闭...: {callback_task_id}")
+        # time.sleep(1)
         try:
             logger.info(f"取消注册任务: {callback_task_id}")
             await unified_sse_manager.close_connection(callback_task_id)

+ 77 - 26
core/construction_review/workflows/ai_review_workflow.py

@@ -415,6 +415,7 @@ class AIReviewCoreFun:
         Returns:
             List[ReviewResult]: 审查结果列表
         """
+        
         try:
             # 简化方案:并发执行,每个单元完成时立即推送消息
             semaphore = asyncio.Semaphore(3)  # 允许3个并发审查
@@ -442,7 +443,9 @@ class AIReviewCoreFun:
                         current = int(((unit_index + 1) / total_units) * 100)
 
                         # 立即发送单元审查详情(包含unit_review和processing_flag事件)
-                        await self._send_unit_review_progress(state, unit_index, total_units, section_label, issues, current)
+                        issues_count =  await self._send_unit_review_progress(state, unit_index, total_units, section_label, issues, current)
+                        #await self._send_unit_overall_progress(state, unit_index, total_units, section_label, issues_count)
+
                     else:
                         logger.error(f"执行单个单元审查失败: {str(result.error_message)}")
                     return result
@@ -552,6 +555,7 @@ class AIReviewCoreFun:
             # 格式化审查结果(注意:实际进度推送现在在 _execute_concurrent_reviews 中按顺序处理)
             # 这里保存格式化结果到ReviewResult中,供后续推送使用
 
+            
             return ReviewResult(
                 unit_index=unit_index,
                 unit_content=unit_content,
@@ -603,24 +607,69 @@ class AIReviewCoreFun:
         except Exception as e:
             logger.warning(f"发送开始进度更新失败: {str(e)}")
 
-    async def _send_unit_review_progress(self, state: AIReviewState, unit_index: int,
-                                        total_units: int, section_label: str,
-                                        issues: List[Dict], current: int) -> None:
-        """
-        发送单元审查详细信息
+    # async def _send_unit_review_progress(self, state: AIReviewState, unit_index: int,
+    #                                     total_units: int, section_label: str,
+    #                                     issues: List[Dict], current: int) -> None:
+    #     """
+    #     发送单元审查详细信息
+
+    #     Args:
+    #         state: AI审查状态
+    #         unit_index: 单元索引
+    #         total_units: 总单元数
+    #         section_label: 章节标签
+    #         issues: 问题列表
+    #         current: 当前进度
+    #     """
+
+
+    #     try:
+
+    #         if state["progress_manager"]:
+    #             # 计算问题数量(可以为0)
+    #             issues_count = 0
+    #             if isinstance(issues, list) and issues:
+    #                 issues_count = sum(
+    #                     1 for issue in issues
+    #                     for issue_data in issue.values()
+    #                     for review_item in issue_data.get("review_lists", [])
+    #                     if review_item.get("exist_issue", False)
+    #                 )
+
+    #             # 🔧 修复:unit_review事件只在有问题时发送
+    #             if issues_count > 0:
+    #                 stage_name = f"AI审查:{section_label}"
+    #                 await state["progress_manager"].update_stage_progress(
+    #                     callback_task_id=state["callback_task_id"],
+    #                     stage_name=stage_name,
+    #                     current=current,
+    #                     status="unit_review_update",
+    #                     message=f"发现{issues_count}个问题: {section_label}",
+    #                     issues=issues,
+    #                     user_id=state.get("user_id", ""),
+    #                     overall_task_status="processing",
+    #                     event_type="unit_review"
+    #                 )
+
+    #                 # 清空当前issues
+    #                 await state["progress_manager"].update_stage_progress(
+    #                     callback_task_id=state["callback_task_id"],
+    #                     issues=['clear']
+    #                 )
+
+    #         return  issues_count
+    #     except Exception as e:
+    #         logger.warning(f"发送单元审查详情失败: {str(e)}")
 
-        Args:
-            state: AI审查状态
-            unit_index: 单元索引
-            total_units: 总单元数
-            section_label: 章节标签
-            issues: 问题列表
-            current: 当前进度
-        """
-        try:
-            # 🔧 修复:全局进度更新应该总是执行,与是否存在问题无关
-            if state["progress_manager"]:
-                # 计算问题数量(可以为0)
+
+    async def _send_unit_review_progress(self, state: AIReviewState, unit_index: int,
+                                            total_units: int, section_label: str,
+                                            issues: List[Dict], current: int) -> None:
+            """
+            发送单元审查详细信息
+            """
+            try:
+                # 1. 计算问题数量 (即使 issues 为空,这里算出来是 0 也是安全的)
                 issues_count = 0
                 if isinstance(issues, list) and issues:
                     issues_count = sum(
@@ -630,16 +679,19 @@ class AIReviewCoreFun:
                         if review_item.get("exist_issue", False)
                     )
 
-                # 全局进度更新移到外层,总是执行
+                # 【修复点】:无论是否有问题,都必须发送全局进度更新
+                # 这保证了 Redis 计数正确,且进度条能走动
                 await self._send_unit_overall_progress(state, unit_index, total_units, section_label, issues_count)
 
-                # 🔧 修复:unit_review事件只在有问题时发送
-                if issues_count > 0:
+                # 2. 只有当确实存在问题时,才发送单元详情 (unit_review 事件)
+                if isinstance(issues, list) and issues and state["progress_manager"]:
                     stage_name = f"AI审查:{section_label}"
+                    
+                    # 同步等待进度消息推送完成
                     await state["progress_manager"].update_stage_progress(
                         callback_task_id=state["callback_task_id"],
                         stage_name=stage_name,
-                        current=current,
+                        current=current, # 注意:这里的 current 是基于 index 的,可能与 overall 中的基于 count 的有差异
                         status="unit_review_update",
                         message=f"发现{issues_count}个问题: {section_label}",
                         issues=issues,
@@ -653,13 +705,12 @@ class AIReviewCoreFun:
                         callback_task_id=state["callback_task_id"],
                         issues=['clear']
                     )
-
-        except Exception as e:
-            logger.warning(f"发送单元审查详情失败: {str(e)}")
+            except Exception as e:
+                logger.warning(f"发送单元审查详情失败: {str(e)}")
 
     async def _send_unit_overall_progress(self, state: AIReviewState, unit_index: int,
                                            total_units: int, section_label: str,
-                                           issues_count: int) -> None:
+                                           issues_count: int =None) -> None:
         """
         发送单元完成进度更新 - 基于Redis分布式计数(多任务安全)