Selaa lähdekoodia

v.0.0.3-修复sse结束事件重复推送问题

WangXuMing 2 kuukautta sitten
vanhempi
sitoutus
cef9b5691e

+ 27 - 22
core/base/progress_manager.py

@@ -25,6 +25,10 @@ class SSECallbackManager:
             del self._callbacks[callback_task_id]
             logger.info(f"SSE回调注销, 剩余注册数: {len(self._callbacks)}")
 
+    def is_callback_registered(self, callback_task_id: str) -> bool:
+        """检查回调是否已注册"""
+        return callback_task_id in self._callbacks
+
     async def trigger_callback(self, callback_task_id: str, current_data: dict):
         if callback_task_id in self._callbacks:
             try:
@@ -209,19 +213,19 @@ class ProgressManager:
                 if not hasattr(self, 'current_data'):
                     self.current_data = {}
                 self.current_data[callback_task_id] = task_progress
-
-            # 触发SSE推送 - 使用全局回调管理器
-            logger.debug(f"触发SSE推送: {callback_task_id}")
-            updated_progress = await self.get_progress(callback_task_id)
-            issues = task_progress.get("issues")
-            event_type = task_progress.get("event_type", "processing")
-            logger.debug(f"触发SSE回调: {callback_task_id}, event_type: {event_type}")
-
-            if updated_progress and issues and len(issues) > 0 and issues[0] != 'clear':
-                await sse_callback_manager.trigger_callback(callback_task_id, updated_progress)
-            elif updated_progress and not issues:  # 空列表时也要推送
-                await sse_callback_manager.trigger_callback(callback_task_id, updated_progress)
-
+            if task_progress.get("overall_task_status") == "completed":
+                logger.info(f"任务完成,不触发redis更新SSE推送,将完成信号交由lanch_review上层sse推送: {callback_task_id}")
+            else:
+                logger.info(f"触发SSE推送: {callback_task_id}")
+                updated_progress = await self.get_progress(callback_task_id)
+                issues = task_progress.get("issues")
+                event_type = task_progress.get("event_type", "processing")
+                logger.info(f"触发SSE回调: {callback_task_id}, event_type: {event_type}")
+
+                if updated_progress and issues and len(issues) > 0 and issues[0] != 'clear':
+                    await sse_callback_manager.trigger_callback(callback_task_id, updated_progress)
+                elif updated_progress and not issues:  # 空列表时也要推送
+                    await sse_callback_manager.trigger_callback(callback_task_id, updated_progress)
         except Exception as e:
             logger.error(f"更新阶段进度失败: {str(e)}")
             raise
@@ -285,10 +289,13 @@ class ProgressManager:
             logger.error(f"获取进度失败: {str(e)}")
             return None
 
-    async def complete_task(self, callback_task_id: str, user_id: str = None, overall_task_status: str = "completed", current_data: dict = None):
+    async def complete_task(self, callback_task_id: str, user_id: str = None, current_data: dict = None):
         """标记任务完成"""
+
         try:
-            # 使用update_stage_progress方法更新响应数据
+            logger.info(f"保存审查结果: {callback_task_id}")
+
+            # 使用update_stage_progress方法更新响应数据,但不推送SSE
             await self.update_stage_progress(
                 callback_task_id=callback_task_id,
                 user_id=user_id,
@@ -296,17 +303,15 @@ class ProgressManager:
                 stage_name="审查完成",
                 status="completed",
                 message="施工审查方案处理完成!",
-                overall_task_status=overall_task_status,
+                overall_task_status='completed',
                 issues=current_data.get("issues", []) if current_data else []
             )
 
+            logger.info(f"取消注册任务: {callback_task_id}")
+            # 取消SSE回调注册,避免重复推送
+            sse_callback_manager.unregister_callback(callback_task_id)
+
             logger.info(f"任务关闭: {callback_task_id}")
-            completed_progress = await self.get_progress(callback_task_id)
-            if completed_progress:
-                await sse_callback_manager.trigger_callback(callback_task_id, completed_progress)
-                logger.debug(f"SSE完成进度已推送: {callback_task_id}")
-            else:
-                logger.warning(f"无法获取完成进度数据: {callback_task_id}")
         except Exception as e:
             logger.error(f"标记任务完成失败: {str(e)}")
             raise

+ 2 - 2
core/base/workflow_manager.py

@@ -220,7 +220,7 @@ class WorkflowManager:
             # task_chain.results['report'] = report_result
 
             # 完成任务链
-            task_chain.status = "completed"
+            #task_chain.status = "completed"
             task_chain.completed_at = datetime.now()
 
             # 清理任务注册
@@ -240,7 +240,7 @@ class WorkflowManager:
             return task_chain.results
 
         except Exception as e:
-            task_chain.status = "failed"
+            #task_chain.status = "failed"
             logger.error(f"文档处理任务链失败: {task_chain.callback_task_id}, 错误: {str(e)}")
 
             # 清理任务注册

+ 294 - 56
core/construction_review/component/ai_review_engine.py

@@ -1,6 +1,48 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
 """
-AI审查引擎
-负责执行AI审查,支持审查条目并发处理
+@Project   : lq-agent-api
+@File      : ai_review_engine.py
+@IDE       : VsCode
+@Author    : 王旭明
+@Date      : 2025-12-01 11:07:12
+@Description: AI审查引擎核心组件,负责执行各类文档审查任务,支持并发处理和多种审查模式
+
+=================================
+📋 方法总览 (Method Overview)
+=================================
+
+🏗️ 核心审查流程:
+├── basic_compliance_check()      # 基础合规性检查 (语法/语义/完整性)
+├── technical_compliance_check()  # 技术性合规检查 (标准/设计/参数)
+├── rag_enhanced_check()          # RAG增强审查 (向量/混合检索)
+└── _calculate_overall_risk()     # 计算总体风险等级
+
+🔍 专项检查方法:
+├── check_grammar()               # 词句语法检查
+├── check_semantic_logic()        # 语义逻辑检查
+├── check_completeness()          # 完整性检查
+├── check_mandatory_standards()   # 强制性标准检查
+├── check_design_values()         # 设计值检查
+└── check_technical_parameters()  # 技术参数检查
+
+🔍 RAG检索增强:
+├── vector_search()               # 向量检索
+├── hybrid_search()               # 混合检索
+├── rerank_results()              # 重排序结果
+└── generate_enhanced_suggestions() # 生成增强建议
+
+🛠️ 工具辅助方法:
+├── _process_review_result()      # 处理审查结果
+├── _execute_check_with_semaphore() # 信号量并发控制
+├── _calculate_basic_score()      # 计算基础得分
+├── _calculate_technical_score()  # 计算技术得分
+└── _aggregate_results()          # 汇总审查结果
+
+⚙️ 配置管理:
+├── __init__()                    # 初始化引擎 (max_concurrent_reviews=8)
+└── set_review_location_label()   # 设置审查位置标签
 """
 
 import time
@@ -60,15 +102,68 @@ class AIReviewEngine(BaseReviewer):
         self.semaphore = asyncio.Semaphore(max_concurrent_reviews)
 
     def set_review_location_label(self, location_label: str):
-        """设置审查位置标签"""
+        """
+        设置审查位置标签
+
+        Args:
+            location_label: 位置标签字符串
+        """
         self.review_location_label = location_label
 
+    def _process_review_result(self, result):
+        """
+        处理审查结果,统一转换为字典格式
+
+        Args:
+            result: 原始审查结果
+
+        Returns:
+            Dict: 处理后的审查结果字典
+        """
+        if isinstance(result, Exception):
+            return {"error": str(result), "success": False}
+        elif hasattr(result, '__dict__'):  # ReviewResult对象
+            return {
+                "success": result.success if hasattr(result, 'success') else False,
+                "details": result.details if hasattr(result, 'details') else {},
+                "error_message": result.error_message if hasattr(result, 'error_message') else None,
+                "execution_time": result.execution_time if hasattr(result, 'execution_time') else None
+            }
+        else:
+            return result  # 已经是字典
+
+    def _execute_check_with_semaphore(self, check_func, *args, **kwargs):
+        """
+        使用信号量执行检查任务的包装器
+
+        Args:
+            check_func: 检查函数
+            *args: 位置参数
+            **kwargs: 关键字参数
 
+        Returns:
+            包装后的异步函数
+        """
+        async def wrapped_check():
+            async with self.semaphore:
+                return await check_func(*args, **kwargs)
+        return wrapped_check()
 
-    
-    async def basic_compliance_check(self,trace_id_idx: str, unit_content: Dict[str, Any],
+    async def basic_compliance_check(self, trace_id_idx: str, unit_content: Dict[str, Any],
                                    stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
-        """基础合规性检查"""
+        """
+        基础合规性检查
+
+        Args:
+            trace_id_idx: 追踪ID索引
+            unit_content: 待审查单元内容
+            stage_name: 阶段名称
+            state: 状态字典
+            current_progress: 当前进度
+
+        Returns:
+            Dict[str, Any]: 基础合规性检查结果
+        """
         review_content = unit_content['content']
         review_references = unit_content.get('review_references')
 
@@ -87,23 +182,10 @@ class AIReviewEngine(BaseReviewer):
 
         grammar_result, semantic_result, completeness_result = await asyncio.gather(*basic_tasks, return_exceptions=True)
 
-        def process_result(result):
-            """处理审查结果,统一转换为字典格式"""
-            if isinstance(result, Exception):
-                return {"error": str(result), "success": False}
-            elif hasattr(result, '__dict__'):  # ReviewResult对象
-                return {
-                    "success": result.success if hasattr(result, 'success') else False,
-                    "details": result.details if hasattr(result, 'details') else {},
-                    "error_message": result.error_message if hasattr(result, 'error_message') else None,
-                    "execution_time": result.execution_time if hasattr(result, 'execution_time') else None
-                }
-            else:
-                return result  # 已经是字典
-
-        grammar_result = process_result(grammar_result)
-        semantic_result = process_result(semantic_result)
-        completeness_result = process_result(completeness_result)
+        # 使用公共方法处理结果
+        grammar_result = self._process_review_result(grammar_result)
+        semantic_result = self._process_review_result(semantic_result)
+        completeness_result = self._process_review_result(completeness_result)
 
         return {
             'grammar_check': grammar_result,
@@ -114,7 +196,19 @@ class AIReviewEngine(BaseReviewer):
 
     async def technical_compliance_check(self, trace_id_idx: str, unit_content: Dict[str, Any],
                                       stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
-        """技术性合规检查"""
+        """
+        技术性合规检查
+
+        Args:
+            trace_id_idx: 追踪ID索引
+            unit_content: 待审查单元内容
+            stage_name: 阶段名称
+            state: 状态字典
+            current_progress: 当前进度
+
+        Returns:
+            Dict[str, Any]: 技术性合规检查结果
+        """
         review_content = unit_content['content']
         review_references = unit_content.get('review_references')
         logger.info(f"开始技术性合规检查,内容长度: {len(review_content)}")
@@ -130,23 +224,10 @@ class AIReviewEngine(BaseReviewer):
 
         mandatory_result, design_value_result, technical_param_result = await asyncio.gather(*technical_tasks, return_exceptions=True)
 
-        def process_result(result):
-            """处理审查结果,统一转换为字典格式"""
-            if isinstance(result, Exception):
-                return {"error": str(result), "success": False}
-            elif hasattr(result, '__dict__'):  # ReviewResult对象
-                return {
-                    "success": result.success if hasattr(result, 'success') else False,
-                    "details": result.details if hasattr(result, 'details') else {},
-                    "error_message": result.error_message if hasattr(result, 'error_message') else None,
-                    "execution_time": result.execution_time if hasattr(result, 'execution_time') else None
-                }
-            else:
-                return result  # 已经是字典
-
-        mandatory_result = process_result(mandatory_result)
-        design_value_result = process_result(design_value_result)
-        technical_param_result = process_result(technical_param_result)
+        # 使用公共方法处理结果
+        mandatory_result = self._process_review_result(mandatory_result)
+        design_value_result = self._process_review_result(design_value_result)
+        technical_param_result = self._process_review_result(technical_param_result)
 
         return {
             'mandatory_standards': mandatory_result,
@@ -156,7 +237,15 @@ class AIReviewEngine(BaseReviewer):
         }
 
     async def rag_enhanced_check(self, unit_content: Dict[str, Any]) -> Dict[str, Any]:
-        """RAG增强审查"""
+        """
+        RAG增强审查
+
+        Args:
+            unit_content: 待审查单元内容
+
+        Returns:
+            Dict[str, Any]: RAG增强审查结果
+        """
         # 向量检索
         vector_results = await self.vector_search(unit_content['content'])
 
@@ -176,7 +265,20 @@ class AIReviewEngine(BaseReviewer):
 
     async def check_grammar(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
                           stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
-        """词句语法检查"""
+        """
+        词句语法检查
+
+        Args:
+            trace_id_idx: 追踪ID索引
+            review_content: 审查内容
+            review_references: 审查参考信息
+            stage_name: 阶段名称
+            state: 状态字典
+            current_progress: 当前进度
+
+        Returns:
+            Dict[str, Any]: 语法检查结果
+        """
         reviewer_type = Stage.BASIC.value['reviewer_type']
         prompt_name = Stage.BASIC.value['sensitive']
         trace_id = prompt_name+trace_id_idx
@@ -185,7 +287,20 @@ class AIReviewEngine(BaseReviewer):
 
     async def check_semantic_logic(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
                                  stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
-        """语义逻辑检查"""
+        """
+        语义逻辑检查
+
+        Args:
+            trace_id_idx: 追踪ID索引
+            review_content: 审查内容
+            review_references: 审查参考信息
+            stage_name: 阶段名称
+            state: 状态字典
+            current_progress: 当前进度
+
+        Returns:
+            Dict[str, Any]: 语义逻辑检查结果
+        """
         reviewer_type = Stage.BASIC.value['reviewer_type']
         prompt_name = Stage.BASIC.value['semantic']
         trace_id = prompt_name+trace_id_idx
@@ -194,7 +309,20 @@ class AIReviewEngine(BaseReviewer):
 
     async def check_completeness(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
                                stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
-        """完整性检查"""
+        """
+        完整性检查
+
+        Args:
+            trace_id_idx: 追踪ID索引
+            review_content: 审查内容
+            review_references: 审查参考信息
+            stage_name: 阶段名称
+            state: 状态字典
+            current_progress: 当前进度
+
+        Returns:
+            Dict[str, Any]: 完整性检查结果
+        """
         reviewer_type = Stage.BASIC.value['reviewer_type']
         prompt_name = Stage.BASIC.value['completeness']
         trace_id = prompt_name+trace_id_idx
@@ -203,7 +331,20 @@ class AIReviewEngine(BaseReviewer):
 
     async def check_mandatory_standards(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
                                         stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
-        """强制性标准检查"""
+        """
+        强制性标准检查
+
+        Args:
+            trace_id_idx: 追踪ID索引
+            review_content: 审查内容
+            review_references: 审查参考信息
+            stage_name: 阶段名称
+            state: 状态字典
+            current_progress: 当前进度
+
+        Returns:
+            Dict[str, Any]: 强制性标准检查结果
+        """
         reviewer_type = Stage.TECHNICAL.value['reviewer_type']
         prompt_name = Stage.TECHNICAL.value['mandatory']
         trace_id = prompt_name+trace_id_idx
@@ -212,7 +353,20 @@ class AIReviewEngine(BaseReviewer):
 
     async def check_design_values(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
                                   stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
-        """设计值检查"""
+        """
+        设计值检查
+
+        Args:
+            trace_id_idx: 追踪ID索引
+            review_content: 审查内容
+            review_references: 审查参考信息
+            stage_name: 阶段名称
+            state: 状态字典
+            current_progress: 当前进度
+
+        Returns:
+            Dict[str, Any]: 设计值检查结果
+        """
         reviewer_type = Stage.TECHNICAL.value['reviewer_type']
         prompt_name = Stage.TECHNICAL.value['design']
         trace_id = prompt_name+trace_id_idx
@@ -221,7 +375,20 @@ class AIReviewEngine(BaseReviewer):
 
     async def check_technical_parameters(self, trace_id_idx: str, review_content: str = None, review_references: str = None,
                                          stage_name: str = None, state: dict = None, current_progress: int = None) -> Dict[str, Any]:
-        """技术参数检查"""
+        """
+        技术参数检查
+
+        Args:
+            trace_id_idx: 追踪ID索引
+            review_content: 审查内容
+            review_references: 审查参考信息
+            stage_name: 阶段名称
+            state: 状态字典
+            current_progress: 当前进度
+
+        Returns:
+            Dict[str, Any]: 技术参数检查结果
+        """
         reviewer_type = Stage.TECHNICAL.value['reviewer_type']
         prompt_name = Stage.TECHNICAL.value['technical']
         trace_id = prompt_name+trace_id_idx
@@ -230,37 +397,100 @@ class AIReviewEngine(BaseReviewer):
 
     # RAG检索增强
     async def vector_search(self, content: str) -> List[Dict[str, Any]]:
-        """向量检索"""
+        """
+        向量检索
+
+        Args:
+            content: 检索内容
+
+        Returns:
+            List[Dict[str, Any]]: 向量检索结果列表
+        """
         await asyncio.sleep(0.1)
         return [{"similarity": 0.85, "content": "相关标准1"}, {"similarity": 0.78, "content": "相关标准2"}]
 
     async def hybrid_search(self, content: str) -> List[Dict[str, Any]]:
-        """混合检索"""
+        """
+        混合检索
+
+        Args:
+            content: 检索内容
+
+        Returns:
+            List[Dict[str, Any]]: 混合检索结果列表
+        """
         await asyncio.sleep(0.2)
         return [{"score": 0.88, "content": "混合检索结果1"}, {"score": 0.82, "content": "混合检索结果2"}]
 
     async def rerank_results(self, content: str, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
-        """重排序"""
+        """
+        重排序
+
+        Args:
+            content: 原始内容
+            results: 待重排序的结果列表
+
+        Returns:
+            List[Dict[str, Any]]: 重排序后的结果列表
+        """
         await asyncio.sleep(0.1)
         return sorted(results, key=lambda x: x.get('score', 0), reverse=True)[:5]
 
     def generate_enhanced_suggestions(self, results: List[Dict[str, Any]]) -> List[str]:
-        """生成增强建议"""
+        """
+        生成增强建议
+
+        Args:
+            results: 检索结果列表
+
+        Returns:
+            List[str]: 增强建议列表
+        """
         suggestions = []
         for result in results:
             suggestions.append(f"基于{result.get('content', '相关内容')}的建议")
         return suggestions
 
     def _calculate_basic_score(self, grammar: Dict, semantic: Dict, completeness: Dict) -> float:
-        """计算基础合规性得分"""
+        """
+        计算基础合规性得分
+
+        Args:
+            grammar: 语法检查结果
+            semantic: 语义检查结果
+            completeness: 完整性检查结果
+
+        Returns:
+            float: 基础合规性平均得分
+        """
         return (grammar.get('score', 0) + semantic.get('score', 0) + completeness.get('score', 0)) / 3
 
     def _calculate_technical_score(self, mandatory: Dict, design: Dict, technical: Dict) -> float:
-        """计算技术合规性得分"""
+        """
+        计算技术合规性得分
+
+        Args:
+            mandatory: 强制性标准检查结果
+            design: 设计值检查结果
+            technical: 技术参数检查结果
+
+        Returns:
+            float: 技术合规性平均得分
+        """
         return (mandatory.get('compliance_rate', 0) + design.get('accuracy', 0) + technical.get('precision', 0)) / 3
 
     def _calculate_overall_risk(self, basic: Dict, technical: Dict, rag: Dict) -> str:
-        """计算总体风险等级"""
+        """
+        计算总体风险等级
+
+        Args:
+            basic: 基础合规性结果
+            technical: 技术合规性结果
+            rag: RAG增强审查结果
+
+        Returns:
+            str: 风险等级 ("low", "medium", "high")
+        """
         basic_score = basic.get('overall_score', 0)
         technical_score = technical.get('overall_score', 0)
 
@@ -279,7 +509,15 @@ class AIReviewEngine(BaseReviewer):
             return "high"
 
     def _aggregate_results(self, results: List[ReviewResult]) -> Dict[str, Any]:
-        """汇总审查结果"""
+        """
+        汇总审查结果
+
+        Args:
+            results: 审查结果列表
+
+        Returns:
+            Dict[str, Any]: 汇总后的统计结果
+        """
         risk_counts = {"high": 0, "medium": 0, "low": 0}
 
         for result in results:

+ 366 - 162
core/construction_review/workflows/ai_review_workflow.py

@@ -1,20 +1,63 @@
-"""
-基于LangGraph的AI审查工作流
-负责AI审查的流程控制和业务编排,使用LangGraph进行状态管理
-"""
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+'''
+@Project   : lq-agent-api
+@File      : ai_review_workflow.py
+@IDE       : VsCode
+@Author    :
+@Date      : 2025-12-01 11:53:02
+
+=================================
+
+📋 方法总览 (Method Overview)
+
+🏗️ 工作流核心方法:
+├── _build_workflow()               # 构建LangGraph工作流图
+├── execute()                       # 执行AI审查工作流
+├── _start_node()                   # 开始节点
+├── _initialize_progress_node()     # 初始化进度节点
+├── _ai_review_node()               # AI审查核心节点
+├── _complete_node()                # 完成节点
+└── _error_handler_node()           # 错误处理节点
+
+🔍 审查处理方法:
+├── _filter_review_units()          # 筛选审查单元
+├── _calculate_overall_risk()       # 计算总体风险等级
+├── _aggregate_results()            # 汇总审查结果
+├── _format_review_results_to_issues() # 格式化审查结果为问题列表
+└── _parse_ai_review_response()     # 解析AI审查响应
+
+🛠️ 工具辅助方法:
+├── _check_ai_review_result()       # 检查AI审查结果
+├── _get_workflow_graph()           # 获取工作流图(可视化)
+└── _get_status()                   # 获取工作流状态
+
+⚙️ 配置管理:
+├── __init__()                      # 初始化工作流(支持审查模式配置)
+└── set_review_location_label()     # 设置审查位置标签
+'''
 
 import asyncio
 import json
-from dataclasses import asdict
+import random
 import time
+from dataclasses import dataclass, asdict
 from typing import Optional, Callable, Dict, Any, TypedDict, Annotated, List
-from dataclasses import dataclass
 from langgraph.graph import StateGraph, END
 from langgraph.graph.message import add_messages
 from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
 from foundation.logger.loggering import server_logger as logger
 from ..component import AIReviewEngine
 
+# 常量定义
+DEFAULT_SLICE_START_INDEX = 30
+MAX_PROGRESS_PERCENTAGE = 100
+RISK_LEVELS = {"high": "高风险", "medium": "中风险", "low": "低风险"}
+DEFAULT_RISK_LEVEL = "medium"
+REVIEW_TIMEOUT = 300  # 单个审查单元超时时间(秒)
+WORKFLOW_TIMEOUT = 1800  # 整个工作流超时时间(秒,30分钟)
+
 
 @dataclass
 class ReviewResult:
@@ -79,6 +122,9 @@ class AIReviewWorkflow:
 
         Returns:
             List[Dict[str, Any]]: 筛选后的审查单元
+
+        Note:
+            使用预设的起始索引进行切片,确保不会跳过前面的重要内容
         """
         if self.max_review_units is None or self.review_mode == "all":
             return chunks
@@ -89,7 +135,8 @@ class AIReviewWorkflow:
             return []
 
         # 安全的切片操作,考虑边界情况
-        start_index = min(30, len(chunks) - 1)  # 确保start_index不超过数组边界
+        # 使用预设的起始索引进行切片,避免跳过前面的内容
+        start_index = min(DEFAULT_SLICE_START_INDEX, len(chunks) - 1)  # 确保start_index不超过数组边界
         chunks = chunks[start_index:]
 
         # 再次验证切片后的结果
@@ -107,7 +154,6 @@ class AIReviewWorkflow:
             return chunks[:actual_review_count]
         elif self.review_mode == "random":
             # 随机取N个
-            import random
             return random.sample(chunks, actual_review_count)
         else:
             # 默认取前N个
@@ -121,7 +167,7 @@ class AIReviewWorkflow:
         workflow.add_node("ai_review", self._ai_review_node)
         workflow.add_node("complete", self._complete_node)
         workflow.add_node("error_handler", self._error_handler_node)
-        workflow.set_entry_point("start")# 设置入口节点
+        workflow.set_entry_point("start")
         workflow.add_edge("start", "initialize_progress")
         workflow.add_edge("initialize_progress", "ai_review")
         workflow.add_edge("ai_review", "complete")
@@ -160,8 +206,11 @@ class AIReviewWorkflow:
                 messages=[HumanMessage(content=f"开始AI审查: {self.file_id}")]
             )
 
-            # 执行LangGraph工作流
-            result = await self.graph.ainvoke(initial_state)
+            # 执行LangGraph工作流,添加超时控制
+            result = await asyncio.wait_for(
+                self.graph.ainvoke(initial_state),
+                timeout=WORKFLOW_TIMEOUT
+            )
 
             logger.info(f"LangGraph AI审查工作流完成,文件ID: {self.file_id}")
             review_results = {
@@ -180,6 +229,9 @@ class AIReviewWorkflow:
 
             return review_results
 
+        except asyncio.TimeoutError:
+            logger.error(f"AI审查工作流超时({WORKFLOW_TIMEOUT}秒),文件ID: {self.file_id}")
+            raise TimeoutError(f"AI审查工作流执行超时,请检查文件大小或网络连接")
         except Exception as e:
             logger.error(f"LangGraph AI审查工作流执行失败: {str(e)}")
             raise
@@ -219,31 +271,39 @@ class AIReviewWorkflow:
 
         return state
     
-    async def _ai_review_node(self, state: AIReviewState) -> AIReviewState:
-        """AI审查节点"""
-        try:
-            # logger.info(f"正在执行: {state['file_id']}")
-            # if state["progress_manager"]:
-            #     await state["progress_manager"].update_stage_progress(
-            #         callback_task_id=state["callback_task_id"],
-            #         stage_name="AI审查",
-            #         current=0,
-            #         status="processing",
-            #         message=f"正在执行: {state['file_id']}"
-            #     )
-            state["current_stage"] = "ai_review"
+    async def _prepare_review_units(self, state: AIReviewState) -> tuple:
+        """
+        准备审查单元数据
+
+        Args:
+            state: AI审查状态
 
+        Returns:
+            tuple: (review_chunks, total_units, total_all_units)
+        """
+        try:
             # 筛选要审查的单元
             all_chunks = state['structured_content']['chunks']
             review_chunks = self._filter_review_units(all_chunks)
 
             total_units = len(review_chunks)
             total_all_units = len(all_chunks)
-            completed_units = 0
 
             logger.info(f"AI审查开始: 总单元数 {total_all_units}, 实际审查 {total_units} 个单元")
+            return review_chunks, total_units, total_all_units
+        except Exception as e:
+            logger.error(f"准备审查单元失败: {str(e)}")
+            raise
+
+    async def _send_start_review_progress(self, state: AIReviewState, total_units: int) -> None:
+        """
+        发送开始审查的进度更新
 
-            # 开始AI审查进度
+        Args:
+            state: AI审查状态
+            total_units: 总审查单元数
+        """
+        try:
             if state["progress_manager"]:
                 await state["progress_manager"].update_stage_progress(
                     callback_task_id=state["callback_task_id"],
@@ -253,148 +313,269 @@ class AIReviewWorkflow:
                     message=f"开始AI审查,共 {total_units} 个审查单元",
                     event_type="processing"
                 )
+        except Exception as e:
+            logger.warning(f"发送开始进度更新失败: {str(e)}")
 
-            
-            # 基本审查单元
-            async def review_single_unit(unit_content: Dict[str, Any], unit_index: int,callback_task_id) -> ReviewResult:
-                """使用LangGraph编排的原子化组件方法审查单个单元"""
-                try:
-                        # 构建Trace ID
-                        trace_id_idx = "("+str(callback_task_id)+'-'+str(unit_index)+")"
-
-                        # 获取section_label用于stage_name
-                        section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
-                        page = unit_content.get('page', '')
-                        review_location_label = f"第{page}页:{section_label}"
-
-                        # 设置review_location_label到AIReviewEngine
-                        self.ai_review_engine.set_review_location_label(review_location_label)
-
-                        stage_name = f"AI审查:{section_label}"
-
-                        # 方法内部进度计算(基于当前处理的单元)
-                        current_progress = int((unit_index / total_units) * 100)
-
-                        # 构建进度消息
-                        message = f"正在处理第 {unit_index + 1}/{total_units} 个单元: {section_label}"
-                        logger.info(f"开始处理单元: {unit_index + 1}/{total_units} - {section_label}")
-
-                        # 并发执行各种原子化审查方法
-                        review_tasks = [
-                            self.ai_review_engine.basic_compliance_check(trace_id_idx, unit_content, stage_name, state, current_progress),
-                            self.ai_review_engine.technical_compliance_check(trace_id_idx, unit_content, stage_name, state, current_progress),
-                            # self.ai_review_engine.rag_enhanced_check(unit_content, trace_id_idx)
-                        ]
-
-                        # 等待所有审查完成
-                        review_results = await asyncio.gather(*review_tasks, return_exceptions=True)
-
-                        # 处理异常结果
-                        basic_result = review_results[0] if not isinstance(review_results[0], Exception) else {"error": str(review_results[0])}
-                        technical_result = review_results[1] if len(review_results) > 1 and not isinstance(review_results[1], Exception) else {"error": str(review_results[1]) if len(review_results) > 1 else "No result"}
-
-                        # RAG检查已注释,提供空结果
-                        rag_result = {"error": "RAG check disabled"}
-
-                        # 计算总体风险等级
-                        overall_risk = self._calculate_overall_risk(basic_result, technical_result, rag_result)
-
-                        issues = self._format_review_results_to_issues(
-                            state["callback_task_id"],
-                            unit_index,
-                            review_location_label,
-                            unit_content,
-                            basic_result,
-                            technical_result
-                        )
-                        #logger.info(f"issues: {issues}")
-
-                        # 单元审查完成,更新进度
-                        nonlocal completed_units
-                        completed_units += 1
-                        current = int((completed_units / total_units) * 100)  # 修正进度计算
-
-                        # 统计发现问题数量
-                        issues_count = sum(len(issue.get("review_lists", [])) for issue in issues)
-
-                        # 构建完成消息
-                        if issues_count > 0:
-                            message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
-                        else:
-                            message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}"
-
-                        logger.info(f"单元审查完成,更新进度: {current}% {message}")
-
-                        # 发送processing_flag事件(在单元完成时)
-                        if state["progress_manager"]:
-                            logger.info(f"发送processing_flag事件: current={current}, message={message}")
-                            await state["progress_manager"].update_stage_progress(
-                                callback_task_id=state["callback_task_id"],
-                                stage_name="AI审查",
-                                current=current,
-                                status="processing",
-                                message=message,
-                                user_id=state.get("user_id", ""),
-                                overall_task_status="processing",
-                                event_type="processing_flag"
-                            )
-
-                        # 如果有issues,额外推送详细信息
-                        if issues:
-                            asyncio.create_task(
-                                state["progress_manager"].update_stage_progress(
-                                    callback_task_id=state["callback_task_id"],
-                                    stage_name=stage_name,  # 保留详细的stage_name
-                                    current=current,
-                                    status="unit_review_update",
-                                    message=f"发现{issues_count}个问题: {section_label}",
-                                    issues=issues,
-                                    user_id=state.get("user_id", ""),
-                                    overall_task_status="processing",
-                                    event_type="unit_review"  # 明确设置事件类型
-                                )
-                            )
-
-                        
-                        # 清空当前issues
-                        await state["progress_manager"].update_stage_progress(
-                            callback_task_id=state["callback_task_id"],
-                            issues=['clear']
-                            )
-
-                        return ReviewResult(
-                        unit_index=unit_index,
-                        unit_content=unit_content,
-                        basic_compliance=basic_result,
-                        technical_compliance=technical_result,
-                        rag_enhanced=rag_result,
-                        overall_risk=overall_risk
-                    )
+    async def _review_single_unit(self, unit_content: Dict[str, Any], unit_index: int,
+                                  total_units: int, state: AIReviewState) -> ReviewResult:
+        """
+        审查单个单元的核心业务逻辑
+
+        Args:
+            unit_content: 单元内容
+            unit_index: 单元索引
+            total_units: 总单元数
+            state: AI审查状态
+
+        Returns:
+            ReviewResult: 单元审查结果
+        """
+        try:
+            # 构建Trace ID
+            trace_id_idx = f"({state['callback_task_id']}-{unit_index})"
+
+            # 获取section_label用于stage_name
+            section_label = unit_content.get('section_label', f'第{unit_index + 1}部分')
+            page = unit_content.get('page', '')
+            review_location_label = f"第{page}页:{section_label}"
 
-                except Exception as e:
-                    logger.error(f"审查单元 {unit_index} 失败: {str(e)}")
-                    return ReviewResult(
-                        unit_index=unit_index,
-                        unit_content=unit_content,
-                        basic_compliance={"error": str(e)},
-                        technical_compliance={"error": str(e)},
-                        rag_enhanced={"error": str(e)},
-                        overall_risk="error"
+            # 设置review_location_label到AIReviewEngine
+            self.ai_review_engine.set_review_location_label(review_location_label)
+
+            stage_name = f"AI审查:{section_label}"
+
+            # 方法内部进度计算(基于当前处理的单元)
+            current_progress = int((unit_index / total_units) * 100)
+
+            # 构建进度消息
+            message = f"正在处理第 {unit_index + 1}/{total_units} 个单元: {section_label}"
+            logger.info(f"开始处理单元: {unit_index + 1}/{total_units} - {section_label}")
+
+            # 并发执行各种原子化审查方法,添加超时控制
+            review_tasks = [
+                asyncio.wait_for(
+                    self.ai_review_engine.basic_compliance_check(trace_id_idx, unit_content, stage_name, state, current_progress),
+                    timeout=REVIEW_TIMEOUT
+                ),
+                asyncio.wait_for(
+                    self.ai_review_engine.technical_compliance_check(trace_id_idx, unit_content, stage_name, state, current_progress),
+                    timeout=REVIEW_TIMEOUT
+                ),
+            ]
+
+            # 等待所有审查完成
+            review_results = await asyncio.gather(*review_tasks, return_exceptions=True)
+
+            # 处理异常结果
+            basic_result = review_results[0] if not isinstance(review_results[0], Exception) else {"error": str(review_results[0])}
+            technical_result = review_results[1] if len(review_results) > 1 and not isinstance(review_results[1], Exception) else {"error": str(review_results[1]) if len(review_results) > 1 else "No result"}
+
+            # RAG检查已注释,提供空结果
+            rag_result = {"error": "RAG check disabled"}
+
+            # 计算总体风险等级
+            overall_risk = self._calculate_overall_risk(basic_result, technical_result, rag_result)
+
+            # 格式化审查结果
+            issues = self._format_review_results_to_issues(
+                state["callback_task_id"],
+                unit_index,
+                review_location_label,
+                unit_content,
+                basic_result,
+                technical_result
+            )
+
+            return ReviewResult(
+                unit_index=unit_index,
+                unit_content=unit_content,
+                basic_compliance=basic_result,
+                technical_compliance=technical_result,
+                rag_enhanced=rag_result,
+                overall_risk=overall_risk
+            )
+
+        except asyncio.TimeoutError:
+            logger.error(f"审查单元 {unit_index} 超时")
+            return ReviewResult(
+                unit_index=unit_index,
+                unit_content=unit_content,
+                basic_compliance={"error": f"审查超时({REVIEW_TIMEOUT}秒)"},
+                technical_compliance={"error": f"审查超时({REVIEW_TIMEOUT}秒)"},
+                rag_enhanced={"error": f"审查超时({REVIEW_TIMEOUT}秒)"},
+                overall_risk="error"
+            )
+        except Exception as e:
+            logger.error(f"审查单元 {unit_index} 失败: {str(e)}")
+            return ReviewResult(
+                unit_index=unit_index,
+                unit_content=unit_content,
+                basic_compliance={"error": str(e)},
+                technical_compliance={"error": str(e)},
+                rag_enhanced={"error": str(e)},
+                overall_risk="error"
+            )
+
+    async def _send_unit_complete_progress(self, state: AIReviewState, unit_index: int,
+                                           total_units: int, section_label: str,
+                                           issues_count: int) -> None:
+        """
+        发送单元完成进度更新
+
+        Args:
+            state: AI审查状态
+            unit_index: 单元索引
+            total_units: 总单元数
+            section_label: 章节标签
+            issues_count: 问题数量
+        """
+        try:
+            current = int(((unit_index + 1) / total_units) * 100)
+
+            # 构建完成消息
+            if issues_count > 0:
+                message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
+            else:
+                message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}"
+
+            logger.info(f"单元审查完成,更新进度: {current}% {message}")
+
+            # 发送processing_flag事件
+            if state["progress_manager"]:
+                logger.info(f"发送processing_flag事件: current={current}, message={message}")
+                await state["progress_manager"].update_stage_progress(
+                    callback_task_id=state["callback_task_id"],
+                    stage_name="AI审查",
+                    current=current,
+                    status="processing",
+                    message=message,
+                    user_id=state.get("user_id", ""),
+                    overall_task_status="processing",
+                    event_type="processing_flag"
+                )
+        except Exception as e:
+            logger.warning(f"发送单元完成进度更新失败: {str(e)}")
+
+    async def _send_unit_review_details(self, state: AIReviewState, unit_index: int,
+                                        total_units: int, section_label: str,
+                                        issues: List[Dict], current: int) -> None:
+        """
+        发送单元审查详细信息
+
+        Args:
+            state: AI审查状态
+            unit_index: 单元索引
+            total_units: 总单元数
+            section_label: 章节标签
+            issues: 问题列表
+            current: 当前进度
+        """
+        try:
+            if issues and state["progress_manager"]:
+                stage_name = f"AI审查:{section_label}"
+                issues_count = sum(len(issue.get("review_lists", [])) for issue in issues)
+
+                asyncio.create_task(
+                    state["progress_manager"].update_stage_progress(
+                        callback_task_id=state["callback_task_id"],
+                        stage_name=stage_name,
+                        current=current,
+                        status="unit_review_update",
+                        message=f"发现{issues_count}个问题: {section_label}",
+                        issues=issues,
+                        user_id=state.get("user_id", ""),
+                        overall_task_status="processing",
+                        event_type="unit_review"
                     )
+                )
+
+                # 清空当前issues
+                await state["progress_manager"].update_stage_progress(
+                    callback_task_id=state["callback_task_id"],
+                    issues=['clear']
+                )
+        except Exception as e:
+            logger.warning(f"发送单元审查详情失败: {str(e)}")
+
+    async def _execute_concurrent_reviews(self, review_chunks: List[Dict[str, Any]],
+                                          total_units: int, state: AIReviewState) -> List[ReviewResult]:
+        """
+        执行并发审查
+
+        Args:
+            review_chunks: 审查单元列表
+            total_units: 总单元数
+            state: AI审查状态
 
+        Returns:
+            List[ReviewResult]: 审查结果列表
+        """
+        try:
             # 实现异步并发
             review_tasks = [
-                asyncio.create_task(review_single_unit(content, i,state["callback_task_id"]))
+                asyncio.create_task(self._review_single_unit(content, i, total_units, state))
                 for i, content in enumerate(review_chunks)
             ]
 
             # 等待所有审查完成
             all_results = await asyncio.gather(*review_tasks)
 
+            # 处理进度更新和问题详情推送
+            for i, result in enumerate(all_results):
+                if result.overall_risk != "error":
+                    unit_content = review_chunks[i]
+                    section_label = unit_content.get('section_label', f'第{i + 1}部分')
+
+                    # 格式化issues以获取问题数量
+                    issues = self._format_review_results_to_issues(
+                        state["callback_task_id"],
+                        i,
+                        f"第{unit_content.get('page', '')}页:{section_label}",
+                        unit_content,
+                        result.basic_compliance,
+                        result.technical_compliance
+                    )
+
+                    issues_count = sum(len(issue.get("review_lists", [])) for issue in issues)
+                    current = int(((i + 1) / total_units) * 100)
+
+                    # 发送进度更新
+                    await self._send_unit_complete_progress(state, i, total_units, section_label, issues_count)
+
+                    # 发送问题详情
+                    await self._send_unit_review_details(state, i, total_units, section_label, issues, current)
+
             # 过滤成功结果
             successful_results = [result for result in all_results if result.overall_risk != "error"]
+            return successful_results
+
+        except Exception as e:
+            logger.error(f"执行并发审查失败: {str(e)}")
+            return []
+
+    async def _ai_review_node(self, state: AIReviewState) -> AIReviewState:
+        """
+        AI审查节点 - 重构后的简化版本
 
-            # 汇总结果
+        Args:
+            state: AI审查状态
+
+        Returns:
+            AIReviewState: 更新后的审查状态
+        """
+        try:
+            state["current_stage"] = "ai_review"
+
+            # 1. 准备审查单元数据
+            review_chunks, total_units, total_all_units = await self._prepare_review_units(state)
+
+            # 2. 发送开始审查进度
+            await self._send_start_review_progress(state, total_units)
+
+            # 3. 执行并发审查
+            successful_results = await self._execute_concurrent_reviews(review_chunks, total_units, state)
+
+            # 4. 汇总结果
             summary = self._aggregate_results(successful_results)
 
             review_results = {
@@ -471,9 +652,20 @@ class AIReviewWorkflow:
 
 
     def _calculate_overall_risk(self, basic_result: Dict, technical_result: Dict, rag_result: Dict) -> str:
-        """计算总体风险等级"""
+        """
+        计算总体风险等级
+
+        Args:
+            basic_result: 基础合规性审查结果
+            technical_result: 技术性审查结果
+            rag_result: RAG增强审查结果
+
+        Returns:
+            str: 风险等级 ("low", "medium", "high")
+        """
         try:
             # 基于各种审查结果计算风险等级
+            # 风险等级计算逻辑:基础和技术审查都达到90分以上为低风险,70分以上为中风险,否则为高风险
             basic_score = basic_result.get('overall_score', 0)
             technical_score = technical_result.get('overall_score', 0)
 
@@ -483,16 +675,25 @@ class AIReviewWorkflow:
                 return "medium"
             else:
                 return "high"
-        except:
-            return "medium"
+        except (KeyError, TypeError, ValueError) as e:
+            logger.warning(f"风险等级计算异常: {str(e)},使用默认风险等级")
+            return DEFAULT_RISK_LEVEL
 
     def _aggregate_results(self, successful_results: List[ReviewResult]) -> Dict[str, Any]:
-        """汇总审查结果"""
+        """
+        汇总审查结果
+
+        Args:
+            successful_results: 成功的审查结果列表
+
+        Returns:
+            Dict[str, Any]: 汇总后的统计信息,包括风险统计、平均得分等
+        """
         try:
             if not successful_results:
                 return {}
 
-            # 计算统计数据
+            # 计算风险等级统计
             risk_stats = {"low": 0, "medium": 0, "high": 0, "error": 0}
             for result in successful_results:
                 risk_stats[result.overall_risk] += 1
@@ -510,7 +711,7 @@ class AIReviewWorkflow:
                 'avg_technical_score': avg_technical_score,
                 'total_reviewed': len(successful_results)
             }
-        except Exception as e:
+        except (ZeroDivisionError, KeyError, TypeError) as e:
             logger.error(f"结果汇总失败: {str(e)}")
             return {}
 
@@ -549,7 +750,7 @@ class AIReviewWorkflow:
             technical_result: 技术性审查结果
 
         Returns:
-            List[Dict]: 格式化后的issues列表
+            List[Dict]: 格式化后的issues列表,包含issue_id、metadata、risk_summary、review_lists等字段
         """
         issues = []
         review_lists = []
@@ -620,11 +821,14 @@ class AIReviewWorkflow:
         解析AI审查的Markdown格式响应
 
         Args:
-            response: AI审查响应
+            response: AI审查响应内容
             check_name: 检查项名称(如"词句语法检查")
 
         Returns:
-            List[Dict]: 解析后的审查结果列表
+            List[Dict]: 解析后的审查结果列表,包含check_item、check_result、exist_issue、risk_info等字段
+
+        Note:
+            支持识别"无明显问题"等关键词,自动设置风险等级
         """
         review_lists = []
 

+ 1 - 1
foundation/logger/loggering.py

@@ -30,7 +30,7 @@ class CompatibleLogger(logging.Logger):
                  log_format=None, datefmt=None):
         # 初始化父类
         super().__init__(name)
-        self.setLevel(logging.DEBUG)  # 设置logger自身为最低级别
+        self.setLevel(logging.INFO)  # 设置logger自身为最低级别
 
         # 存储配置
         self.log_dir = log_dir

+ 0 - 2
server/app.py

@@ -24,7 +24,6 @@ from views.test_views import test_router
 from views.construction_review.file_upload import file_upload_router
 from views.construction_review.review_results import review_results_router
 from views.construction_review.launch_review import launch_review_router
-from views.construction_review.task_progress import task_progress_router
 
 
 # 创建 FastAPI 应用
@@ -51,7 +50,6 @@ def create_app() -> FastAPI:
     app.include_router(file_upload_router)
     app.include_router(review_results_router)
     app.include_router(launch_review_router)
-    app.include_router(task_progress_router)
 
     # 全局异常处理
     @app.exception_handler(HTTPException)

Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 1 - 1
temp/AI审查结果.json


+ 6 - 0
test/time.py

@@ -0,0 +1,6 @@
+from datetime import datetime
+
+if __name__ == "__main__":
+    now = datetime.now()
+    modification_time = now.strftime("%Y-%m-%d %H:%M:%S")
+    print(f"当前时间: {modification_time}")

+ 19 - 20
views/construction_review/launch_review.py

@@ -68,8 +68,8 @@ class SimpleSSEManager:
         if queue:
             # 优先使用progress_manager传递的event_type,如果没有则使用默认逻辑
             event_type = current_data.get("event_type", "processing")
-            # 保持向后兼容性
-            if event_type == "processing" and current_data.get("status") == "unit_review_update":
+            # 处理特殊的单元审查事件
+            if event_type == "unit_review" or (event_type == "processing" and current_data.get("status") == "unit_review_update"):
                 event_type = "unit_review_update"
 
             await queue.put({
@@ -349,24 +349,23 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                             unified_data_json = json.dumps(unified_data, ensure_ascii=False)
                             yield format_sse_event(sse_event_type, unified_data_json)
 
-                        # 统一检查任务完成状态
-                        if current_data:
-                            overall_task_status = current_data.get("overall_task_status")
-                            if overall_task_status in ["completed", "failed"]:
-                                completion_data = {
-                                    "callback_task_id": callback_task_id,
-                                    "user_id": user_id,
-                                    "current": current_data.get("current", 100),
-                                    "stage_name": "审查完成",
-                                    "status": "completed",
-                                    "message": "施工审查方案处理完成!",
-                                    "overall_task_status": overall_task_status,
-                                    "updated_at": int(time.time()),
-                                    "issues": current_data.get("issues", []),
-                                }
-                                completion_json = json.dumps(completion_data, ensure_ascii=False)
-                                yield format_sse_event("completed", completion_json)
-                                break
+                        # 检查SSE回调是否已被注销(作为任务结束信号)
+                        if not sse_callback_manager.is_callback_registered(callback_task_id):
+                            logger.info(f"检测到SSE回调已注销,任务结束: {callback_task_id}")
+                            # 推送最终的completed事件
+                            completion_data = {
+                                "callback_task_id": callback_task_id,
+                                "user_id": user_id,
+                                "current": 100,
+                                "stage_name": "审查完成",
+                                "status": "completed",
+                                "message": "施工审查方案处理完成!",
+                                "overall_task_status":  "completed",
+                                "updated_at": current_data.get("updated_at", int(time.time())) if current_data else int(time.time()),
+                            }
+                            completion_json = json.dumps(completion_data, ensure_ascii=False)
+                            yield format_sse_event("completed", completion_json)
+                            break
 
                     except Exception as e:
                         logger.error(f"队列消息处理异常: {callback_task_id}")

+ 0 - 195
views/construction_review/task_progress.py

@@ -1,195 +0,0 @@
-"""
-审查进度SSE实时推送接口
-"""
-
-import json
-import asyncio
-from typing import Dict
-from datetime import datetime
-from pydantic import BaseModel
-from fastapi import APIRouter, Query
-from .schemas.error_schemas import LaunchReviewErrors
-from fastapi.responses import StreamingResponse
-from foundation.logger.loggering import server_logger as logger
-from foundation.trace.trace_context import TraceContext, auto_trace
-from core.base.progress_manager import ProgressManager, sse_callback_manager
-
-progress_manager = ProgressManager()
-
-task_progress_router = APIRouter(prefix="/sgsc", tags=["进度推送"])
-
-async def sse_progress_callback(callback_task_id: str, current_data: dict):
-    """SSE推送回调函数 - 接收进度更新并推送到客户端"""
-    await sse_manager.send_progress(callback_task_id, current_data)
-
-class TaskProgressResponse(BaseModel):
-    code: int
-    data: dict
-
-
-class SimpleSSEManager:
-    """SSE连接管理器 - 管理客户端SSE连接和消息推送"""
-
-
-    def __init__(self):
-        self.connections: Dict[str, asyncio.Queue] = {}
-
-
-    async def connect(self, callback_task_id: str):
-        """建立SSE连接 - 创建消息队列并发送连接确认"""
-        queue = asyncio.Queue()
-        self.connections[callback_task_id] = queue
-
-        await queue.put({
-            "type": "connection_established",
-            "callback_task_id": callback_task_id,
-            "timestamp": datetime.now().isoformat()
-        })
-
-        logger.info(f"SSE连接: {callback_task_id}")
-        return queue
-
-
-    async def disconnect(self, callback_task_id: str):
-        """断开SSE连接 - 清理连接队列"""
-        if callback_task_id in self.connections:
-            del self.connections[callback_task_id]
-        logger.info(f"SSE连接已断开: {callback_task_id}")
-
-
-    async def send_progress(self, callback_task_id: str, current_data: dict):
-        """发送进度更新 - 将进度数据放入队列推送给客户端"""
-        queue = self.connections.get(callback_task_id)
-        if queue:
-            await queue.put({
-                "type": "progress_update",
-                "data": current_data,
-                "timestamp": datetime.now().isoformat()
-            })
-            logger.debug(f"SSE进度已推送: {callback_task_id}")
-
-sse_manager = SimpleSSEManager()
-
-def format_sse_event(event_type: str, data: str) -> str:
-    """格式化SSE事件 - 按照SSE协议格式化事件数据"""
-    lines = [
-        f"event: {event_type}",
-        f"data: {data}",
-        "",
-        ""
-    ]
-    return "\n".join(lines) + "\n" 
-
-
-@task_progress_router.get("/sse/progress/{callback_task_id}")
-@auto_trace("callback_task_id")
-async def sse_progress_stream(
-    callback_task_id: str,
-    user: str = Query(..., description="用户标识")
-):
-    """SSE实时进度推送接口 - 建立SSE连接并实时推送任务进度"""
-    try:
-        valid_users = {"user-001", "user-002", "user-003"}
-        if user not in valid_users:
-            raise LaunchReviewErrors.invalid_user()
-        sse_callback_manager.register_callback(callback_task_id, sse_progress_callback)
-
-        queue = await sse_manager.connect(callback_task_id)
-
-        async def generate_events():
-            """生成SSE事件流 - 处理连接确认、进度推送和任务完成检测"""
-            try:
-                logger.info(f"开始SSE事件流: {callback_task_id}")
-
-                connected_data = json.dumps({
-                    "callback_task_id": callback_task_id,
-                    "message": "SSE连接已建立,等待进度更新...",
-                    "timestamp": datetime.now().isoformat()
-                }, ensure_ascii=False)
-                yield format_sse_event("connected", connected_data)
-
-                current_progress = await progress_manager.get_progress(callback_task_id)
-                if current_progress:
-                    progress_json = json.dumps(current_progress, ensure_ascii=False)
-                    yield format_sse_event("current", progress_json)
-
-                logger.debug(f"开始监听队列中的进度更新: {callback_task_id}")
-
-                while True:
-                    try:
-                        message = await queue.get()
-
-                        if message.get("type") == "progress_update":
-                            current_data = message.get("data")
-                            if current_data:
-                                logger.info(f"总流程处理进度: {current_data.get("message")}")
-
-                                progress_json = json.dumps(current_data, ensure_ascii=False)
-                                yield format_sse_event("current", progress_json)
-
-                                overall_task_status = current_data.get("overall_task_status")
-
-                                if overall_task_status in ["completed", "failed"]:
-                                    completion_data = {
-                                        "callback_task_id": callback_task_id,
-                                        "task_status": overall_task_status,
-                                        "overall_progress": current_data.get("current", 100),
-                                        "timestamp": datetime.now().isoformat(),
-                                        "message": "全部任务完成!"
-                                    }
-                                    completion_json = json.dumps(completion_data, ensure_ascii=False)
-                                    yield format_sse_event("completed", completion_json)
-
-                                    #sse_callback_manager.unregister_callback(callback_task_id)
-                                    await sse_manager.disconnect(callback_task_id)
-                                    logger.info(f"全部任务完成,SSE连接已关闭: {callback_task_id}, 状态: {overall_task_status}")
-                                    break
-
-                        elif message.get("type") == "connection_established":
-                            pass
-
-                    except Exception as e:
-                        logger.error(f"队列消息处理异常: {callback_task_id}, {e}")
-                        break
-
-            except Exception as e:
-                logger.error(f"SSE事件流异常: {callback_task_id}, {e}")
-                error_data = json.dumps({
-                    "error": f"SSE异常: {str(e)}",
-                    "timestamp": datetime.now().isoformat()
-                }, ensure_ascii=False)
-                yield format_sse_event("error", error_data)
-
-            finally:
-                sse_callback_manager.unregister_callback(callback_task_id)
-                await sse_manager.disconnect(callback_task_id)
-                logger.debug(f"SSE流已结束: {callback_task_id}")
-
-        return StreamingResponse(
-            generate_events(),
-            media_type="text/event-stream",
-            headers={
-                "Cache-Control": "no-cache, no-store, must-revalidate",
-                "Connection": "keep-alive",
-                "Access-Control-Allow-Origin": "*",
-                "Access-Control-Allow-Headers": "Cache-Control, EventSource",
-                "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
-                "X-Accel-Buffering": "no",  
-                "X-Content-Type-Options": "nosniff"
-            }
-        )
-
-    except Exception as e:
-        logger.error(f"SSE连接失败: {callback_task_id}, {e}")
-        raise LaunchReviewErrors.internal_error(e)
-
-
-@task_progress_router.get("/sse/status")
-async def get_sse_status():
-    """获取SSE连接状态 - 返回当前活跃的SSE连接信息"""
-    return {
-        "active_connections": len(sse_manager.connections),
-        "connections": list(sse_manager.connections.keys()),
-        "timestamp": datetime.now().isoformat()
-    }
-

Kaikkia tiedostoja ei voida näyttää, sillä liian monta tiedostoa muuttui tässä diffissä