""" SSE 格式化工具 提供 SSE 事件的格式化和生成功能,用于审查调试 API 的流式响应。 支持 started / step_progress / step_result / phase_label / completed / error / replay_comparison 共 7 种事件类型。 """ import asyncio import json from typing import Any, Dict, List, Optional, AsyncGenerator, Tuple # ============================================================ # 常量 # ============================================================ SSE_EVENT_TYPES = [ "started", "step_progress", "step_result", "phase_label", "completed", "error", "replay_comparison", ] MAX_CONCURRENT_DEBUG_TASKS = 5 """最大并发调试任务数""" DEBUG_GLOBAL_TIMEOUT = 180 """全局默认超时(秒)""" # ============================================================ # 并发控制 # ============================================================ debug_semaphore = asyncio.Semaphore(MAX_CONCURRENT_DEBUG_TASKS) """全局调试任务信号量,控制并发上限""" _running_tasks: Dict[str, asyncio.Queue] = {} """正在执行的任务队列缓存,key=task_id, value=event_queue,用于断线重连""" # ============================================================ # SSE 格式化 # ============================================================ def format_sse_event(event: str, data: dict) -> str: """ 格式化为 SSE 规范的事件字符串。 Args: event: 事件类型(started, step_progress, step_result, phase_label, completed, error, replay_comparison) data: 事件数据字典 Returns: 符合 SSE 协议的文本行(含两个末尾换行) """ return ( f"event: {event}\n" f"data: {json.dumps(data, ensure_ascii=False, default=str)}\n\n" ) async def sse_generator(event_queue: asyncio.Queue) -> AsyncGenerator[str, None]: """ SSE 事件生成器,从队列消费事件并格式化为 SSE 文本流。 队列元素格式: (event_type: str, data: dict) 当收到 ("__end__", None) 时停止生成。 Args: event_queue: asyncio.Queue,由执行器填充事件 Yields: SSE 格式的文本行 """ while True: event_type, data = await event_queue.get() if event_type == "__end__": break yield format_sse_event(event_type, data) event_queue.task_done() def make_trace_id(chain_id: str) -> str: """生成 trace_id,添加 debug_ 前缀实现生产隔离""" from datetime import datetime import uuid return ( f"debug_{chain_id}_" f"{datetime.now().strftime('%H%M%S')}_" f"{uuid.uuid4().hex[:8]}" ) def make_record_id() -> str: """生成记录 ID: call-{YYYYMMDD}-{HHMMSS}-{hex}""" from datetime import datetime import uuid return ( f"call-{datetime.now().strftime('%Y%m%d')}-" f"{datetime.now().strftime('%H%M%S')}-" f"{uuid.uuid4().hex[:6]}" ) # ============================================================ # 链名称映射(避免循环引用 executor) # ============================================================ CHAIN_NAMES = { "completeness": "完整性审查", "timeliness": "时效性审查", "reference": "规范性审查", "sensitive": "敏感词检查", "semantic": "语义逻辑检查", "grammar": "语法检查", "professional": "专业性审查", } CHAIN_STEPS_COUNT = { "completeness": 3, "timeliness": 3, "reference": 3, "sensitive": 3, "semantic": 3, "grammar": 3, "professional": 7, }