Просмотр исходного кода

v0.0.3-增加启动审查sgsc/sse/launch_review接口

WangXuMing 3 месяцев назад
Родитель
Сommit
9633b877d9

+ 2 - 2
config/config.ini

@@ -1,7 +1,7 @@
 
 
 [model]
-MODEL_TYPE=gemini
+MODEL_TYPE=qwen_local_1.5b
 
 
 
@@ -29,7 +29,7 @@ QWEN_API_KEY=ms-9ad4a379-d592-4acd-b92c-8bac08a4a045
 
 [ai_review]
 # 调试模式配置
-MAX_REVIEW_UNITS=3
+MAX_REVIEW_UNITS=1
 REVIEW_MODE=random
 # REVIEW_MODE=all/random/first
 

+ 1 - 1
core/base/doc_worker/config.yaml

@@ -5,7 +5,7 @@ llm:
   # 模型API地址
   model_url: "http://172.16.35.50:8000/v1/chat/completions"
   # 模型名称
-  model_name: "Qwen2.5-7B-Instruct"
+  model_name: "Qwen2.5-1.5B-Instruct"
   # 温度参数(越低越确定)
   temperature: 0.1
   # 请求超时时间(秒)

+ 2 - 2
foundation/base/redis_connection.py

@@ -252,10 +252,10 @@ class RedisAdapter(RedisConnection):
     async def close(self) -> None:
         if self._redis:
             await self._redis.close()
-            await self._redis.wait_closed()
+            #await self._redis.wait_closed() #该方法已弃用
         if self._langchain_redis_client:
             await self._langchain_redis_client.close()
-            await self._langchain_redis_client.wait_closed()
+            #await self._langchain_redis_client.wait_closed()
 
 
 

+ 51 - 178
foundation/utils/redis_utils.py

@@ -65,120 +65,12 @@ async def get_redis_result_cache_data_and_delete_key(data_type: str , trace_id:
     return data
 
 
-@track_execution_time
-async def _store_file_chunked(file_id: str, file_content: bytes, chunk_size: int = 1024*1024, expire_seconds: int = 3600) -> bool:
-    """
-    分块存储大文件内容(内部方法)
-    """
-    try:
-        redis_store = await RedisConnectionFactory.get_redis_store()
-        file_size = len(file_content)
-
-        server_logger.info(f"开始分块存储文件: {file_id}, 大小: {file_size/1024/1024:.2f}MB, 分块大小: {chunk_size/1024/1024:.2f}MB")
-
-        # 计算分块数量
-        chunk_count = (file_size + chunk_size - 1) // chunk_size
-
-        # 创建分块索引信息
-        chunk_index = {
-            'file_id': file_id,
-            'file_size': file_size,
-            'chunk_size': chunk_size,
-            'chunk_count': chunk_count,
-            'created_at': int(time.time())
-        }
-
-        # 存储分块索引
-        await redis_store.setex(f"chunks:{file_id}", expire_seconds, json.dumps(chunk_index))
-
-        # 分块存储文件内容
-        tasks = []
-        for i in range(chunk_count):
-            start = i * chunk_size
-            end = min(start + chunk_size, file_size)
-            chunk_data = file_content[start:end]
-            chunk_key = f"chunk:{file_id}:{i}"
-            task = redis_store.setex(chunk_key, expire_seconds, chunk_data)
-            tasks.append(task)
-
-        # 并行执行所有分块存储
-        await asyncio.gather(*tasks)
-
-        server_logger.info(f"文件分块存储完成: {file_id}, {chunk_count}个块")
-        return True
-
-    except Exception as e:
-        server_logger.error(f"分块存储文件失败: {file_id}, {str(e)}")
-        return False
-
-
-async def _get_file_chunked(file_id: str) -> bytes:
-    """
-    获取大文件内容(从分块中组装)(内部方法)
-    """
-    try:
-        redis_store = await RedisConnectionFactory.get_redis_store()
-
-        # 获取分块索引
-        chunk_index_json = await redis_store.get(f"chunks:{file_id}")
-        if not chunk_index_json:
-            server_logger.warning(f"文件分块索引不存在: {file_id}")
-            return None
-
-        chunk_index = json.loads(chunk_index_json.decode('utf-8'))
-        chunk_count = chunk_index['chunk_count']
-
-        # 并行获取所有分块
-        tasks = []
-        for i in range(chunk_count):
-            chunk_key = f"chunk:{file_id}:{i}"
-            task = redis_store.get(chunk_key)
-            tasks.append(task)
-
-        # 并行执行获取
-        chunks = await asyncio.gather(*tasks)
-
-        # 组装文件内容
-        file_content = b''.join(chunks)
-        return file_content
-
-    except Exception as e:
-        server_logger.error(f"获取分块文件失败: {file_id}, {str(e)}")
-        return None
-
-
-async def _delete_file_chunks(file_id: str) -> bool:
-    """
-    删除大文件分块(内部方法)
-    """
-    try:
-        redis_store = await RedisConnectionFactory.get_redis_store()
-
-        # 获取分块索引
-        chunk_index_json = await redis_store.get(f"chunks:{file_id}")
-        if not chunk_index_json:
-            return True  # 可能已经删除了
-
-        chunk_index = json.loads(chunk_index_json.decode('utf-8'))
-        chunk_count = chunk_index['chunk_count']
-
-        # 构造要删除的所有键
-        keys_to_delete = [f"chunks:{file_id}"]
-        keys_to_delete.extend([f"chunk:{file_id}:{i}" for i in range(chunk_count)])
-
-        # 批量删除
-        await redis_store.delete(*keys_to_delete)
-        return True
-
-    except Exception as e:
-        server_logger.error(f"删除文件分块失败: {file_id}, {str(e)}")
-        return False
 
 
 @track_execution_time
 async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_seconds: int = 3600) -> bool:
     """
-    存储文件信息(自动优化:小文件直接存储,大文件分块存储
+    存储文件信息(直接存储模式)
 
     Args:
         file_id: 文件ID
@@ -188,8 +80,6 @@ async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_second
     Returns:
         bool: 存储是否成功
     """
-    # 直接存储开关,True表示使用直接存储,False表示使用分块存储
-    direct_storage = True  
     try:
         redis_store = await RedisConnectionFactory.get_redis_store()
 
@@ -204,43 +94,21 @@ async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_second
 
         if file_content:
             file_size = len(file_content)
-            chunk_threshold = 50 * 1024 * 1024  # 50MB阈值
-
-            # 根据文件大小和强制参数选择存储策略
-            if direct_storage or file_size <= chunk_threshold:
-                storage_method = "直接存储" if direct_storage else "直接存储"
-                server_logger.info(f"使用{storage_method}策略: {file_id}, {file_size/1024/1024:.2f}MB")
-
-                # 直接存储
-                metadata = {k: v for k, v in file_info.items() if k != 'file_content'}
-                metadata['storage_type'] = 'direct_test' if direct_storage else 'direct'
-                metadata['file_size'] = file_size
-
-                # 并行执行元数据和内容存储以提高性能
-                tasks = [
-                    redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)),
-                    redis_store.setex(f"content:{file_id}", expire_seconds, file_content)
-                ]
-                await asyncio.gather(*tasks)
-
-            else:
-                server_logger.info(f"使用分块存储策略: {file_id}, {file_size/1024/1024:.2f}MB > 50MB")
-
-                # 分块存储文件内容
-                chunk_success = await _store_file_chunked(file_id, file_content, expire_seconds=expire_seconds)
-                if not chunk_success:
-                    return False
-
-                # 存储元数据(不含文件内容)
-                metadata = {k: v for k, v in file_info.items() if k != 'file_content'}
-                metadata['storage_type'] = 'chunked'
-                metadata['file_size'] = file_size
-
-                await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata))
+            server_logger.info(f"使用直接存储策略: {file_id}, {file_size/1024/1024:.2f}MB")
+
+            # 直接存储
+            metadata = {k: v for k, v in file_info.items() if k != 'file_content'}
+            metadata['file_size'] = file_size
+
+            # 并行执行元数据和内容存储以提高性能
+            tasks = [
+                redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata)),
+                redis_store.setex(f"content:{file_id}", expire_seconds, file_content)
+            ]
+            await asyncio.gather(*tasks)
         else:
             # 没有文件内容,只存元数据
             metadata = file_info.copy()
-            metadata['storage_type'] = 'metadata_only'
             await redis_store.setex(f"meta:{file_id}", expire_seconds, json.dumps(metadata))
 
         server_logger.info(f"文件信息已存储到Redis: {file_id}")
@@ -253,7 +121,7 @@ async def store_file_info(file_id: str, file_info: Dict[str, Any], expire_second
 @track_execution_time
 async def get_file_info(file_id: str, include_content: bool = True) -> Optional[Dict[str, Any]]:
     """
-    根据file_id获取文件信息(自动适配分块和直接存储)
+    根据file_id获取文件信息
 
     Args:
         file_id: 文件ID
@@ -275,27 +143,19 @@ async def get_file_info(file_id: str, include_content: bool = True) -> Optional[
 
         # 解析元数据
         file_info = json.loads(meta_bytes.decode('utf-8'))
-        storage_type = file_info.get('storage_type', 'direct')
 
         # 根据存储类型获取文件内容
         if include_content and 'file_size' in file_info:
-            if storage_type == 'chunked':
-                # 从分块中获取文件内容
-                file_content = await _get_file_chunked(file_id)
-                if file_content:
-                    file_info['file_content'] = file_content
-                else:
-                    server_logger.warning(f"分块文件内容获取失败: {file_id}")
-            elif storage_type == 'direct':
-                # 直接获取文件内容
-                content_key = f"content:{file_id}"
-                file_content = await redis_store.get(content_key)
-                if file_content:
-                    file_info['file_content'] = file_content
-                else:
-                    server_logger.warning(f"文件内容不存在: {content_key}")
-
-        server_logger.info(f"从Redis获取到文件信息: {meta_key}, 存储类型: {storage_type}")
+            # 直接获取文件内容
+            content_key = f"content:{file_id}"
+            file_content = await redis_store.get(content_key)
+            if file_content:
+                file_info['file_content'] = file_content
+            else:
+                server_logger.warning(f"文件内容不存在: {content_key}")
+                return None  # 文件内容缺失,返回None
+
+        server_logger.info(f"从Redis获取到文件信息: {meta_key}")
         return file_info
 
     except json.JSONDecodeError as e:
@@ -305,10 +165,10 @@ async def get_file_info(file_id: str, include_content: bool = True) -> Optional[
         server_logger.error(f"获取文件信息失败: {str(e)}")
         return None
 
-@track_execution_time
+
 async def delete_file_info(file_id: str) -> bool:
     """
-    删除文件信息(自动适配分块和直接存储)
+    删除文件信息
 
     Args:
         file_id: 文件ID
@@ -317,7 +177,14 @@ async def delete_file_info(file_id: str) -> bool:
         bool: 删除是否成功
     """
     try:
-        redis_store = await RedisConnectionFactory.get_redis_store()
+        # 为了避免事件循环冲突,直接创建新的Redis连接
+        from foundation.base.redis_config import load_config_from_env
+        from foundation.base.redis_connection import RedisAdapter
+
+        redis_config = load_config_from_env()
+        adapter = RedisAdapter(redis_config)
+        await adapter.connect()
+        redis_store = adapter.get_langchain_redis_client()
 
         # 获取元数据以确定存储类型
         meta_key = f"meta:{file_id}"
@@ -325,41 +192,47 @@ async def delete_file_info(file_id: str) -> bool:
 
         if not meta_bytes:
             server_logger.warning(f"文件元数据不存在: {meta_key}")
+            # 清理连接
+            await adapter.close()
             return True  # 可能已经删除了
 
         # 解析元数据
         file_info = json.loads(meta_bytes.decode('utf-8'))
-        storage_type = file_info.get('storage_type', 'direct')
 
-        # 根据存储类型删除相应的内容
+        # 删除相应的内容
         deleted_count = 0
 
         # 删除元数据
         deleted_count += await redis_store.delete(meta_key)
 
-        if storage_type == 'chunked':
-            # 删除分块内容
-            chunk_success = await _delete_file_chunks(file_id)
-            if chunk_success:
-                server_logger.info(f"已删除分块文件内容: {file_id}")
-        elif storage_type == 'direct':
-            # 删除直接存储的内容
+        # 如果有文件大小信息,说明有文件内容,需要删除
+        if 'file_size' in file_info:
+            # 删除文件内容
             content_key = f"content:{file_id}"
             deleted_count += await redis_store.delete(content_key)
 
         if deleted_count > 0:
             server_logger.info(f"已删除文件信息: {file_id}, {deleted_count}个键")
-            return True
         else:
             server_logger.warning(f"Redis缓存不存在,无法删除: {file_id}")
-            return False
+
+        # 清理连接
+        await adapter.close()
+        return True if deleted_count > 0 else False
 
     except json.JSONDecodeError as e:
         server_logger.error(f"解析文件元数据JSON失败: {str(e)}")
+        # 清理连接
+        await adapter.close()
         return False
     except Exception as e:
         server_logger.error(f"删除文件信息失败: {str(e)}")
+        # 清理连接
+        await adapter.close()
         return False
+    finally:
+        # 确保连接被关闭
+        await adapter.close()
 
 #asyncio.run(delete_file_info('e385049cde7d21a48c7de216182f0f23'))
 

Разница между файлами не показана из-за своего большого размера
+ 3 - 13
temp/AI审查结果.json


+ 0 - 8
views/construction_review/app.py

@@ -20,7 +20,6 @@ from foundation.logger.loggering import server_logger as logger
 
 
 from views.construction_review.file_upload import file_upload_router
-from views.construction_review.task_progress import task_progress_router
 from views.construction_review.review_results import review_results_router
 from views.construction_review.launch_review import launch_review_router
 
@@ -43,7 +42,6 @@ def create_app() -> FastAPI:
 
     # 添加路由
     app.include_router(file_upload_router)
-    app.include_router(task_progress_router)
     app.include_router(review_results_router)
     app.include_router(launch_review_router)
 
@@ -92,12 +90,6 @@ def create_app() -> FastAPI:
                     "method": "POST",
                     "description": "启动AI审查工作流"
                 },
-                {
-                    "name": "进度查询",
-                    "path": "/sgsc/task_progress/{callback_task_id}",
-                    "method": "GET",
-                    "description": "查询审查任务进度"
-                },
                 {
                     "name": "结果获取",
                     "path": "/sgsc/review_results",

+ 282 - 63
views/construction_review/launch_review.py

@@ -5,15 +5,20 @@
 
 import uuid
 import time
+import json
+import asyncio
+import traceback
 from datetime import datetime
 from typing import List, Optional, Dict, Any
 from pydantic import BaseModel, Field
-from fastapi import APIRouter, HTTPException
+from fastapi import APIRouter, HTTPException, Query
+from fastapi.responses import StreamingResponse
 from core.base.redis_duplicate_checker import RedisDuplicateChecker
 from foundation.logger.loggering import server_logger as logger
 from foundation.trace.trace_context import TraceContext, auto_trace
 from foundation.utils.redis_utils import get_file_info, delete_file_info
 from core.base.workflow_manager import WorkflowManager
+from core.base.progress_manager import ProgressManager, sse_callback_manager
 from views.construction_review.file_upload import validate_upload_parameters
 from .schemas.error_schemas import LaunchReviewErrors
 
@@ -24,6 +29,61 @@ workflow_manager = WorkflowManager(
     max_concurrent_docs=3,
     max_concurrent_reviews=5
 )
+# 初始化进度管理器
+progress_manager = ProgressManager()
+
+async def sse_progress_callback(callback_task_id: str, current_data: dict):
+    """SSE推送回调函数 - 接收进度更新并推送到客户端"""
+    await sse_manager.send_progress(callback_task_id, current_data)
+
+class SimpleSSEManager:
+    """SSE连接管理器 - 管理客户端SSE连接和消息推送"""
+
+    def __init__(self):
+        self.connections: Dict[str, asyncio.Queue] = {}
+
+    async def connect(self, callback_task_id: str):
+        """建立SSE连接 - 创建消息队列并发送连接确认"""
+        queue = asyncio.Queue()
+        self.connections[callback_task_id] = queue
+
+        await queue.put({
+            "type": "connection_established",
+            "callback_task_id": callback_task_id,
+            "timestamp": datetime.now().isoformat()
+        })
+
+        logger.info(f"SSE连接: {callback_task_id}")
+        return queue
+
+    async def disconnect(self, callback_task_id: str):
+        """断开SSE连接 - 清理连接队列"""
+        if callback_task_id in self.connections:
+            del self.connections[callback_task_id]
+        logger.info(f"SSE连接已断开: {callback_task_id}")
+
+    async def send_progress(self, callback_task_id: str, current_data: dict):
+        """发送进度更新 - 将进度数据放入队列推送给客户端"""
+        queue = self.connections.get(callback_task_id)
+        if queue:
+            await queue.put({
+                "type": "progress_update",
+                "data": current_data,
+                "timestamp": datetime.now().isoformat()
+            })
+            logger.debug(f"SSE进度已推送: {callback_task_id}")
+
+sse_manager = SimpleSSEManager()
+
+def format_sse_event(event_type: str, data: str) -> str:
+    """格式化SSE事件 - 按照SSE协议格式化事件数据"""
+    lines = [
+        f"event: {event_type}",
+        f"data: {data}",
+        "",
+        ""
+    ]
+    return "\n".join(lines) + "\n"
 
 
 class LaunchReviewRequest(BaseModel):
@@ -81,79 +141,238 @@ def validate_project_plan_type(project_plan_type: str) -> None:
         raise LaunchReviewErrors.project_plan_type_invalid()
 
 
-@launch_review_router.post("/sse/launch_review", response_model=LaunchReviewResponse)
+@launch_review_router.post("/sse/launch_review")
 @auto_trace(generate_if_missing=True)
-async def launch_review(request_data: LaunchReviewRequest):
+async def launch_review_sse(request_data: LaunchReviewRequest):
     """
-    启动施工方案审查
+    启动施工方案审查并返回SSE进度流
 
     Args:
         request_data: 启动审查请求参数
 
     Returns:
-        LaunchReviewResponse: 包含任务ID的响应
+        StreamingResponse: SSE事件流,包含任务启动状态和进度
     """
-    try:
-
-        callback_task_id = request_data.callback_task_id
-        review_config = request_data.review_config
-        project_plan_type = request_data.project_plan_type
-
-        logger.info(f"收到审查启动请求: callback_task_id={callback_task_id}")
-
-        # 验证审查配置
-        validate_review_config(review_config)
-
-        # 验证工程方案类型
-        validate_project_plan_type(project_plan_type)
-
-        try:          
-
-            # 从callback_task_id中提取file_id (格式: file_id-timestamp)
-            file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id
-
-            # 检查重复任务
-            if await duplicatechecker.is_duplicate_task(file_id):
-                raise LaunchReviewErrors.task_already_exists()
-
-            # 获取文件信息(确保包含文件内容)
-            file_info = await get_file_info(file_id, include_content=True)
-
-            if not file_info:
-                raise LaunchReviewErrors.task_not_found()
-
-            # 验证必要的字段是否存在
-            if 'file_content' not in file_info:
-                logger.error(f"文件信息中缺少file_content字段,可用字段: {list(file_info.keys())}")
-                raise LaunchReviewErrors.task_not_found()
-
-            # 添加审查配置到文件信息
-            file_info.update({
-                'review_config': review_config,
-                'project_plan_type': project_plan_type,
-                'launched_at': int(time.time())
-            })
-
-            logger.info(f"获取到文件信息: file_id={file_id}, 包含字段: {list(file_info.keys())}")
-            logger.info(f"文件内容大小: {len(file_info.get('file_content', b''))} bytes")
-
-            # 注意:暂不删除Redis缓存,让工作流处理完成后再清理
-            # await delete_file_info(file_id)
-            logger.info(f"保留Redis缓存供工作流使用: file_info:{file_id}")
+    callback_task_id = request_data.callback_task_id
+    TraceContext.set_trace_id(callback_task_id)
+    review_config = request_data.review_config
+    project_plan_type = request_data.project_plan_type
+
+    logger.info(f"收到审查启动SSE请求: callback_task_id={callback_task_id}")
+
+    # 验证审查配置
+    validate_review_config(review_config)
+
+    # 验证工程方案类型
+    validate_project_plan_type(project_plan_type)
+
+    # 注册SSE回调
+    sse_callback_manager.register_callback(callback_task_id, sse_progress_callback)
+    queue = await sse_manager.connect(callback_task_id)
+
+    async def generate_launch_review_events():
+        """生成启动审查SSE事件流"""
+        try:
+            # 发送连接确认
+            connected_data = json.dumps({
+                "callback_task_id": callback_task_id,
+                "message": "启动审查SSE连接已建立,正在处理请求...",
+                "timestamp": datetime.now().isoformat()
+            }, ensure_ascii=False)
+            yield format_sse_event("connected", connected_data)
+
+            # 处理启动审查逻辑
+            try:
+                from foundation.utils.redis_utils import get_file_info
+
+                # 从callback_task_id中提取file_id (格式: file_id-timestamp)
+                file_id = callback_task_id.rsplit('-', 1)[0] if '-' in callback_task_id else callback_task_id
+
+                # 发送处理状态
+                status_data = json.dumps({
+                    "callback_task_id": callback_task_id,
+                    "stage": "validation",
+                    "message": f"正在验证文件信息: {file_id}",
+                    "timestamp": datetime.now().isoformat()
+                }, ensure_ascii=False)
+                yield format_sse_event("processing", status_data)
+
+                # 检查重复任务
+                if await duplicatechecker.is_duplicate_task(file_id):
+                    error_data = json.dumps({
+                        "callback_task_id": callback_task_id,
+                        "error": "task_already_exists",
+                        "message": "任务已存在,请勿重复提交",
+                        "timestamp": datetime.now().isoformat()
+                    }, ensure_ascii=False)
+                    yield format_sse_event("error", error_data)
+                    return
+
+                # 获取文件信息
+                status_data = json.dumps({
+                    "callback_task_id": callback_task_id,
+                    "stage": "loading",
+                    "message": "正在加载文件信息...",
+                    "timestamp": datetime.now().isoformat()
+                }, ensure_ascii=False)
+                yield format_sse_event("processing", status_data)
+
+                file_info = await get_file_info(file_id, include_content=True)
+
+
+
+                if not file_info:
+                    error_data = json.dumps({
+                        "callback_task_id": callback_task_id,
+                        "error": "task_not_found",
+                        "message": "任务ID不存在或已过期",
+                        "timestamp": datetime.now().isoformat()
+                    }, ensure_ascii=False)
+                    yield format_sse_event("error", error_data)
+                    return
+
+                # 验证必要的字段
+                if 'file_content' not in file_info:
+                    error_data = json.dumps({
+                        "callback_task_id": callback_task_id,
+                        "error": "missing_content",
+                        "message": "文件内容缺失",
+                        "timestamp": datetime.now().isoformat()
+                    }, ensure_ascii=False)
+                    yield format_sse_event("error", error_data)
+                    return
+
+                # 添加审查配置到文件信息
+                file_info.update({
+                    'review_config': review_config,
+                    'project_plan_type': project_plan_type,
+                    'launched_at': int(time.time())
+                })
+
+
+
+                # 发送提交任务状态
+                status_data = json.dumps({
+                    "callback_task_id": callback_task_id,
+                    "stage": "submitting",
+                    "message": "正在提交AI审查任务...",
+                    "timestamp": datetime.now().isoformat()
+                }, ensure_ascii=False)
+                yield format_sse_event("processing", status_data)
+
+                # 提交处理任务到工作流管理器
+                task_id = await workflow_manager.submit_task_processing(file_info)
+
+                # 发送成功启动状态
+                success_data = json.dumps({
+                    "callback_task_id": callback_task_id,
+                    "task_id": task_id,
+                    "file_id": file_info['file_id'],
+                    "review_config": review_config,
+                    "project_plan_type": project_plan_type,
+                    "status": "submitted",
+                    "submitted_at": file_info['launched_at'],
+                    "message": "AI审查任务已成功启动",
+                    "timestamp": datetime.now().isoformat()
+                }, ensure_ascii=False)
+                yield format_sse_event("submitted", success_data)
+
+                # 继续监听工作流进度
+                logger.info(f"开始监听工作流进度: {callback_task_id}")
+                while True:
+                    try:
+                        message = await queue.get()
+
+                        if message.get("type") == "progress_update":
+                            current_data = message.get("data")
+                            if current_data:
+                                progress_json = json.dumps(current_data, ensure_ascii=False)
+                                yield format_sse_event("progress", progress_json)
+
+                                overall_task_status = current_data.get("overall_task_status")
+                                if overall_task_status in ["completed", "failed"]:
+                                    completion_data = {
+                                        "callback_task_id": callback_task_id,
+                                        "task_status": overall_task_status,
+                                        "overall_progress": current_data.get("current", 100),
+                                        "timestamp": datetime.now().isoformat(),
+                                        "message": "审查任务处理完成!"
+                                    }
+                                    completion_json = json.dumps(completion_data, ensure_ascii=False)
+                                    yield format_sse_event("completed", completion_json)
+                                    break
+
+                    except Exception as e:
+                        logger.error(f"队列消息处理异常: {callback_task_id}")
+                        logger.error(f"异常详情: {str(e)}")
+                        logger.error(f"异常堆栈: {traceback.format_exc()}")
+                        break
+
+            except HTTPException as e:
+                logger.error(f"HTTP异常: {callback_task_id}")
+                logger.error(f"异常详情: {str(e)}")
+                logger.error(f"异常堆栈: {traceback.format_exc()}")
+                error_data = json.dumps({
+                    "callback_task_id": callback_task_id,
+                    "error": e.detail.get("code") if hasattr(e, 'detail') and e.detail else "http_error",
+                    "message": e.detail.get("message") if hasattr(e, 'detail') and e.detail else str(e),
+                    "timestamp": datetime.now().isoformat()
+                }, ensure_ascii=False)
+                yield format_sse_event("error", error_data)
+
+            except Exception as e:
+                logger.error(f"启动审查处理异常: {callback_task_id}")
+                logger.error(f"异常详情: {str(e)}")
+                logger.error(f"异常堆栈: {traceback.format_exc()}")
+                error_data = json.dumps({
+                    "callback_task_id": callback_task_id,
+                    "error": "internal_error",
+                    "message": f"服务端内部错误: {str(e)}",
+                    "timestamp": datetime.now().isoformat()
+                }, ensure_ascii=False)
+                yield format_sse_event("error", error_data)
 
         except Exception as e:
-            logger.error(f"获取文件信息失败: {str(e)}")
-            raise LaunchReviewErrors.file_info_not_found(e)
-
-        # 提交处理任务到工作流管理器
-        task_id = await workflow_manager.submit_task_processing(file_info)
+            logger.error(f"启动审查SSE事件流异常: {callback_task_id}")
+            logger.error(f"异常详情: {str(e)}")
+            logger.error(f"异常堆栈: {traceback.format_exc()}")
+            error_data = json.dumps({
+                "callback_task_id": callback_task_id,
+                "error": "sse_error",
+                "message": f"SSE流异常: {str(e)}",
+                "timestamp": datetime.now().isoformat()
+            }, ensure_ascii=False)
+            yield format_sse_event("error", error_data)
+
+        finally:
+            # 清理回调连接
+            sse_callback_manager.unregister_callback(callback_task_id)
+            await sse_manager.disconnect(callback_task_id)
+            logger.debug(f"启动审查SSE流已结束: {callback_task_id}")
+
+    return StreamingResponse(
+        generate_launch_review_events(),
+        media_type="text/event-stream",
+        headers={
+            "Cache-Control": "no-cache, no-store, must-revalidate",
+            "Connection": "keep-alive",
+            "Access-Control-Allow-Origin": "*",
+            "Access-Control-Allow-Headers": "Cache-Control, EventSource, Content-Type",
+            "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
+            "X-Accel-Buffering": "no",
+            "X-Content-Type-Options": "nosniff"
+        }
+    )
 
 
 
 
-    except HTTPException:
-        # 重新抛出HTTP异常
-        raise
-    except Exception as e:
-        logger.error(f"启动审查失败: {str(e)}")
-        raise LaunchReviewErrors.internal_error(e)
+@launch_review_router.get("/sse/launch_review_status")
+async def get_launch_review_sse_status():
+    """获取启动审查SSE连接状态 - 返回当前活跃的启动审查SSE连接信息"""
+    return {
+        "active_connections": len(sse_manager.connections),
+        "connections": list(sse_manager.connections.keys()),
+        "timestamp": datetime.now().isoformat(),
+        "service": "launch_review_sse"
+    }

Некоторые файлы не были показаны из-за большого количества измененных файлов