| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- """
- SSE 格式化工具
- 提供 SSE 事件的格式化和生成功能,用于审查调试 API 的流式响应。
- 支持 started / step_progress / step_result / phase_label / completed / error / replay_comparison
- 共 7 种事件类型。
- """
- import asyncio
- import json
- from typing import Any, Dict, List, Optional, AsyncGenerator, Tuple
- # ============================================================
- # 常量
- # ============================================================
- SSE_EVENT_TYPES = [
- "started",
- "step_progress",
- "step_result",
- "phase_label",
- "completed",
- "error",
- "replay_comparison",
- ]
- MAX_CONCURRENT_DEBUG_TASKS = 5
- """最大并发调试任务数"""
- DEBUG_GLOBAL_TIMEOUT = 180
- """全局默认超时(秒)"""
- # ============================================================
- # 并发控制
- # ============================================================
- debug_semaphore = asyncio.Semaphore(MAX_CONCURRENT_DEBUG_TASKS)
- """全局调试任务信号量,控制并发上限"""
- _running_tasks: Dict[str, asyncio.Queue] = {}
- """正在执行的任务队列缓存,key=task_id, value=event_queue,用于断线重连"""
- # ============================================================
- # SSE 格式化
- # ============================================================
- def format_sse_event(event: str, data: dict) -> str:
- """
- 格式化为 SSE 规范的事件字符串。
- Args:
- event: 事件类型(started, step_progress, step_result, phase_label,
- completed, error, replay_comparison)
- data: 事件数据字典
- Returns:
- 符合 SSE 协议的文本行(含两个末尾换行)
- """
- return (
- f"event: {event}\n"
- f"data: {json.dumps(data, ensure_ascii=False, default=str)}\n\n"
- )
- async def sse_generator(event_queue: asyncio.Queue) -> AsyncGenerator[str, None]:
- """
- SSE 事件生成器,从队列消费事件并格式化为 SSE 文本流。
- 队列元素格式: (event_type: str, data: dict)
- 当收到 ("__end__", None) 时停止生成。
- Args:
- event_queue: asyncio.Queue,由执行器填充事件
- Yields:
- SSE 格式的文本行
- """
- while True:
- event_type, data = await event_queue.get()
- if event_type == "__end__":
- break
- yield format_sse_event(event_type, data)
- event_queue.task_done()
- def make_trace_id(chain_id: str) -> str:
- """生成 trace_id,添加 debug_ 前缀实现生产隔离"""
- from datetime import datetime
- import uuid
- return (
- f"debug_{chain_id}_"
- f"{datetime.now().strftime('%H%M%S')}_"
- f"{uuid.uuid4().hex[:8]}"
- )
- def make_record_id() -> str:
- """生成记录 ID: call-{YYYYMMDD}-{HHMMSS}-{hex}"""
- from datetime import datetime
- import uuid
- return (
- f"call-{datetime.now().strftime('%Y%m%d')}-"
- f"{datetime.now().strftime('%H%M%S')}-"
- f"{uuid.uuid4().hex[:6]}"
- )
- # ============================================================
- # 链名称映射(避免循环引用 executor)
- # ============================================================
- CHAIN_NAMES = {
- "completeness": "完整性审查",
- "timeliness": "时效性审查",
- "reference": "规范性审查",
- "sensitive": "敏感词检查",
- "semantic": "语义逻辑检查",
- "grammar": "语法检查",
- "professional": "专业性审查",
- }
- CHAIN_STEPS_COUNT = {
- "completeness": 3,
- "timeliness": 3,
- "reference": 3,
- "sensitive": 3,
- "semantic": 3,
- "grammar": 3,
- "professional": 7,
- }
|