| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767 |
- """
- 审查链路调试 API 端点
- 提供审查链路调试的 SSE 流式执行端点,以及完整的 Pydantic 请求/响应模型定义。
- 功能:
- 1. POST /debug/api/review/execute — 执行审查调试,SSE 流式返回进度
- 2. GET /debug/api/review/stream/{task_id} — SSE 断线重连
- 3. 并发控制(最大 5 个并发调试任务)
- 4. 全局超时控制(默认 180s)
- 5. 环节隔离模式支持
- 6. 调用记录自动保存
- """
- import asyncio
- import json
- import logging
- from datetime import datetime
- from typing import Optional, List, Dict, AsyncGenerator, ClassVar
- from fastapi import APIRouter, HTTPException, Path
- from fastapi.responses import StreamingResponse
- from pydantic import BaseModel, Field, field_validator
- from core.debug.sse_utils import (
- format_sse_event,
- sse_generator,
- debug_semaphore,
- _running_tasks,
- MAX_CONCURRENT_DEBUG_TASKS,
- DEBUG_GLOBAL_TIMEOUT,
- make_trace_id,
- make_record_id,
- CHAIN_NAMES,
- CHAIN_STEPS_COUNT,
- )
- logger = logging.getLogger(__name__)
- # ============ 枚举常量 ============
- class ChainId:
- """链路标识枚举"""
- COMPLETENESS = "completeness"
- TIMELINESS = "timeliness"
- REFERENCE = "reference"
- SENSITIVE = "sensitive"
- SEMANTIC = "semantic"
- GRAMMAR = "grammar"
- PROFESSIONAL = "professional"
- class StepStatus:
- """步骤状态枚举"""
- PENDING = "pending"
- RUNNING = "running"
- SUCCESS = "success"
- ERROR = "error"
- class RecordStatus:
- """记录状态枚举"""
- SUCC = "succ"
- FAIL = "fail"
- TIMEOUT = "timeout"
- class ReviewType:
- """审查类型枚举"""
- BOTH = "both"
- NON_PARAMETER = "non_parameter"
- PARAMETER = "parameter"
- # ============ 审查链路调试 - 请求/响应模型 ============
- class RagParams(BaseModel):
- """专业性审查 RAG 参数"""
- review_type: str = Field(
- default="both",
- description="审查类型: both(全部) / non_parameter(非参数) / parameter(参数)",
- )
- collection_name: str = Field(
- default="construction_specs",
- description="Milvus 集合名称",
- )
- top_k: int = Field(
- default=5,
- ge=1,
- le=50,
- description="向量检索 top_k",
- )
- hybrid_top_k: int = Field(
- default=20,
- ge=1,
- le=100,
- description="混合检索 top_k",
- )
- dense_weight: float = Field(
- default=0.5,
- ge=0.0,
- le=1.0,
- description="稠密向量权重(0-1)",
- )
- parent_threshold: float = Field(
- default=0.3,
- ge=0.0,
- le=1.0,
- description="父文档增强阈值",
- )
- class DebugExecuteRequest(BaseModel):
- """审查调试执行请求"""
- chain_id: str = Field(
- ...,
- description="链路标识: completeness / timeliness / reference / sensitive / semantic / grammar / professional",
- )
- content: str = Field(
- ...,
- description="待审查的方案内容",
- )
- reference: Optional[str] = Field(
- default=None,
- description="审查参考依据",
- )
- model: Optional[str] = Field(
- default=None,
- description="覆盖模型名称(如 deepseek_v3),与 function_name 互斥",
- )
- function_name: Optional[str] = Field(
- default=None,
- description="覆盖功能名称(如 completeness_review_generate),与 model 互斥",
- )
- prompt_version: Optional[str] = Field(
- default=None,
- description="指定提示词版本,null 则使用当前激活版本",
- )
- timeout: int = Field(
- default=60,
- ge=10,
- le=600,
- description="模型调用超时(秒)",
- )
- rag_params: Optional[RagParams] = Field(
- default=None,
- description="专业性审查专属参数,其他链路忽略",
- )
- isolation_mode: bool = Field(
- default=False,
- description="是否启用环节隔离模式",
- )
- isolation_steps: List[int] = Field(
- default_factory=list,
- description="隔离模式下要执行的步骤索引列表",
- )
- manual_inputs: Dict[str, str] = Field(
- default_factory=dict,
- description="隔离模式下各步骤的手动输入,key 为步骤索引,value 为输入内容",
- )
- _VALID_CHAIN_IDS: ClassVar[set] = {
- ChainId.COMPLETENESS,
- ChainId.TIMELINESS,
- ChainId.REFERENCE,
- ChainId.SENSITIVE,
- ChainId.SEMANTIC,
- ChainId.GRAMMAR,
- ChainId.PROFESSIONAL,
- }
- @field_validator("chain_id")
- @classmethod
- def check_chain_id(cls, v: str) -> str:
- """验证 chain_id 是否为合法枚举值"""
- if v not in cls._VALID_CHAIN_IDS:
- raise ValueError(
- f"chain_id 必须为以下值之一: "
- f"{json.dumps(sorted(cls._VALID_CHAIN_IDS), ensure_ascii=False)}"
- )
- return v
- @field_validator("content")
- @classmethod
- def check_content_not_empty(cls, v: str) -> str:
- """验证 content 不能为空"""
- if not v or not v.strip():
- raise ValueError("content 不能为空")
- return v
- # ============ 提示词管理 - 请求/响应模型 ============
- class PromptItem(BaseModel):
- """提示词列表项"""
- name: str = Field(default="", description="提示词名称")
- version: str = Field(default="", description="版本号")
- time: str = Field(default="", description="创建时间")
- chain: str = Field(default="", description="所属链路")
- is_current: bool = Field(default=False, description="是否为当前激活版本")
- note: str = Field(default="", description="版本说明")
- class PromptListResponse(BaseModel):
- """提示词列表响应"""
- status: str = Field(default="not_implemented", description="状态")
- total: int = Field(default=0, description="总数")
- page: int = Field(default=1, description="当前页码")
- page_size: int = Field(default=50, description="每页条数")
- items: List[PromptItem] = Field(default_factory=list, description="提示词列表")
- chains: List[str] = Field(default_factory=list, description="可筛选的链路列表")
- class PromptDetailResponse(BaseModel):
- """提示词详情响应"""
- status: str = Field(default="not_implemented", description="状态")
- name: str = Field(default="", description="提示词名称")
- version: str = Field(default="", description="版本号")
- time: str = Field(default="", description="创建时间")
- chain: str = Field(default="", description="所属链路")
- is_current: bool = Field(default=False, description="是否为当前激活版本")
- system_prompt: str = Field(default="", description="系统提示词")
- user_prompt: str = Field(default="", description="用户提示词模板")
- note: str = Field(default="", description="版本说明")
- variables: List[str] = Field(default_factory=list, description="模板变量列表")
- based_on: Optional[str] = Field(default=None, description="基于哪个版本")
- file_path: str = Field(default="", description="文件路径")
- class PromptSaveRequest(BaseModel):
- """保存新版本请求"""
- name: str = Field(..., description="提示词名称")
- system_prompt: str = Field(..., description="系统提示词内容")
- user_prompt: str = Field(..., description="用户提示词模板")
- note: str = Field(default="", description="版本说明")
- set_current: bool = Field(default=True, description="是否设为当前激活版本")
- class PromptSaveResponse(BaseModel):
- """保存新版本响应"""
- success: bool = Field(default=True, description="是否成功")
- name: str = Field(default="", description="提示词名称")
- version: str = Field(default="", description="新版本号")
- time: str = Field(default="", description="创建时间")
- message: str = Field(default="", description="消息说明")
- class PromptCompareRequest(BaseModel):
- """版本对比请求"""
- name: str = Field(..., description="提示词名称")
- base_version: str = Field(..., description="基准版本号")
- target_version: str = Field(..., description="目标版本号")
- class PromptCompareResponse(BaseModel):
- """版本对比响应"""
- status: str = Field(default="not_implemented", description="状态")
- name: str = Field(default="", description="提示词名称")
- base_version: str = Field(default="", description="基准版本")
- target_version: str = Field(default="", description="目标版本")
- diffs: List[dict] = Field(default_factory=list, description="差异列表")
- class PromptActivateRequest(BaseModel):
- """激活版本请求"""
- name: str = Field(..., description="提示词名称")
- version: str = Field(..., description="版本号")
- class PromptActivateResponse(BaseModel):
- """激活版本响应"""
- success: bool = Field(default=True, description="是否成功")
- name: str = Field(default="", description="提示词名称")
- version: str = Field(default="", description="版本号")
- message: str = Field(default="", description="消息说明")
- class PromptVersionsResponse(BaseModel):
- """版本列表响应"""
- status: str = Field(default="not_implemented", description="状态")
- name: str = Field(default="", description="提示词名称")
- chain: str = Field(default="", description="所属链路")
- current_version: str = Field(default="", description="当前激活版本")
- versions: List[dict] = Field(default_factory=list, description="版本列表")
- # ============ 调用记录 - 请求/响应模型 ============
- class CallRecordItem(BaseModel):
- """调用记录列表项"""
- id: str = Field(default="", description="记录 ID")
- time: str = Field(default="", description="调用时间")
- chain: str = Field(default="", description="链路标识")
- chain_name: str = Field(default="", description="链路名称")
- doc_ref: str = Field(default="", description="文档引用")
- duration: str = Field(default="", description="持续时间(格式化)")
- duration_ms: int = Field(default=0, description="持续时间(毫秒)")
- status: str = Field(default="", description="状态: succ/fail/timeout")
- model: str = Field(default="", description="模型名称")
- prompt_ver: str = Field(default="", description="提示词版本")
- tokens: int = Field(default=0, description="Token 消耗")
- result_preview: str = Field(default="", description="结果预览")
- class CallRecordListResponse(BaseModel):
- """调用记录列表响应"""
- status: str = Field(default="not_implemented", description="状态")
- total: int = Field(default=0, description="总数")
- page: int = Field(default=1, description="当前页码")
- page_size: int = Field(default=20, description="每页条数")
- total_pages: int = Field(default=0, description="总页数")
- items: List[CallRecordItem] = Field(default_factory=list, description="记录列表")
- chains: List[str] = Field(default_factory=list, description="可筛选的链路列表")
- status_counts: Dict[str, int] = Field(default_factory=dict, description="状态统计")
- class StepDetail(BaseModel):
- """步骤详情"""
- index: int = Field(default=0, description="步骤索引")
- name: str = Field(default="", description="步骤名称")
- duration_ms: int = Field(default=0, description="持续时间(毫秒)")
- status: str = Field(default="", description="状态: succ/fail/timeout")
- input: dict = Field(default_factory=dict, description="步骤输入")
- output: dict = Field(default_factory=dict, description="步骤输出")
- class CallRecordDetailResponse(BaseModel):
- """调用记录详情响应"""
- id: str = Field(default="", description="记录 ID")
- time: str = Field(default="", description="调用时间")
- chain: str = Field(default="", description="链路标识")
- chain_name: str = Field(default="", description="链路名称")
- doc_ref: str = Field(default="", description="文档引用")
- status: str = Field(default="", description="状态: succ/fail/timeout")
- duration_ms: int = Field(default=0, description="持续时间(毫秒)")
- model: str = Field(default="", description="模型名称")
- function_name: Optional[str] = Field(default=None, description="功能名称")
- prompt_ver: str = Field(default="", description="提示词版本")
- prompt_name: str = Field(default="", description="提示词名称")
- tokens: int = Field(default=0, description="Token 消耗")
- params: dict = Field(default_factory=dict, description="请求参数")
- steps: List[StepDetail] = Field(default_factory=list, description="步骤列表")
- result: str = Field(default="", description="审查结果")
- error_message: Optional[str] = Field(default=None, description="错误信息")
- class OverrideParams(BaseModel):
- """回放覆盖参数"""
- model: Optional[str] = Field(default=None, description="覆盖模型名称")
- prompt_version: Optional[str] = Field(default=None, description="覆盖提示词版本")
- rag_params: Optional[RagParams] = Field(default=None, description="覆盖 RAG 参数")
- class ReplayRequest(BaseModel):
- """回放调用请求"""
- record_id: str = Field(..., description="要回放的调用记录 ID")
- override_params: Optional[OverrideParams] = Field(
- default=None,
- description="覆盖原始参数,不指定则使用原始参数",
- )
- class ExportRequest(BaseModel):
- """导出调用记录请求"""
- record_ids: List[str] = Field(..., description="要导出的记录 ID 列表")
- format: str = Field(default="json", description="导出格式")
- # ============ 路由定义 ============
- debug_router = APIRouter(prefix="/debug", tags=["审查调试工作台"])
- # 注册子模块路由
- from .prompt_api import register_routes as register_prompt_routes
- from .record_api import register_routes as register_record_routes
- from .rag_debug_api import register_routes as register_rag_debug_routes
- register_prompt_routes(debug_router)
- register_record_routes(debug_router)
- register_rag_debug_routes(debug_router)
- # ============ 审查链路调试端点 ============
- @debug_router.post("/api/review/execute")
- async def execute_review(request: DebugExecuteRequest):
- """
- 启动审查调试任务,返回 task_id。
- 前端拿到 task_id 后通过 GET /debug/api/review/stream/{task_id} (EventSource)
- 接收 SSE 实时进度推送。
- 支持全部 7 个审查链路。
- 并发限制:最多 5 个调试任务同时执行。
- """
- # ---- 检查并发上限 ----
- if debug_semaphore.locked():
- raise HTTPException(
- status_code=429,
- detail=(
- f"并发调试任务数已达上限 ({MAX_CONCURRENT_DEBUG_TASKS}),"
- f"请等待其他任务完成后再试"
- ),
- )
- chain_id = request.chain_id
- total_steps = CHAIN_STEPS_COUNT.get(chain_id, 3)
- event_queue: asyncio.Queue = asyncio.Queue()
- task_id = make_trace_id(chain_id)
- record_id = make_record_id()
- _running_tasks[task_id] = event_queue
- # 后台启动执行任务(不 await,让它在后台运行)
- asyncio.create_task(_background_execute(
- request, event_queue, task_id, record_id, chain_id,
- ))
- return {
- "task_id": task_id,
- "chain_id": chain_id,
- "total_steps": total_steps,
- }
- async def _background_execute(
- request: DebugExecuteRequest,
- event_queue: asyncio.Queue,
- task_id: str,
- record_id: str,
- chain_id: str,
- ) -> None:
- """后台执行审查调试,通过 event_queue 推送进度给 SSE 端点。
- 不消费 event_queue —— 仅启动 executor 并等待其完成,
- 将结果持久化到调用记录。SSE 事件由 GET stream 端点独立消费。
- """
- async with debug_semaphore:
- try:
- from core.debug.executor import DebugExecutor
- await event_queue.put(("started", {
- "task_id": task_id,
- "chain_id": chain_id,
- "total_steps": CHAIN_STEPS_COUNT.get(chain_id, 3),
- }))
- executor = DebugExecutor()
- result = await executor.execute(request, event_queue)
- # 保存调用记录
- try:
- await _save_debug_record(
- request=request,
- task_id=task_id,
- record_id=record_id,
- chain_id=chain_id,
- completed_data=result.get("completed_data"),
- error_message=result.get("error_occurred"),
- steps=result.get("steps_collected", []),
- )
- except Exception as exc:
- logger.warning("[execute_review] 保存调用记录失败: %s", exc)
- except asyncio.CancelledError:
- await event_queue.put(("error", {
- "task_id": task_id,
- "message": "任务被取消",
- }))
- except Exception as exc:
- logger.exception("[_background_execute] 执行异常")
- await event_queue.put(("error", {
- "task_id": task_id,
- "message": str(exc),
- }))
- finally:
- # 不立即清理 _running_tasks,让 GET SSE 端点消费事件后自行清理。
- # 兜底:30s 后如仍无人消费则清理。
- async def _delayed_cleanup():
- await asyncio.sleep(30)
- if task_id in _running_tasks:
- logger.warning("[_background_execute] 任务 %s 30s 未被消费,强制清理", task_id)
- _running_tasks.pop(task_id, None)
- asyncio.create_task(_delayed_cleanup())
- # ============ SSE 断线重连端点 ============
- @debug_router.get("/api/review/stream/{task_id}")
- async def stream_review_progress(task_id: str = Path(..., description="任务 ID")):
- """
- 重新连接获取审查调试进度(SSE 流式)
- 当 SSE 连接断开时,用于重新连接获取仍在执行中的任务进度。
- 从当前进度继续推送,不重放已完成的 step 事件。
- """
- async def event_generator():
- queue = _running_tasks.get(task_id)
- if queue is None:
- yield format_sse_event("error", {
- "task_id": task_id,
- "message": "任务不存在或已完成",
- })
- return
- # 发送 resumed 标记
- yield format_sse_event("started", {
- "task_id": task_id,
- "resumed": True,
- })
- # 继续从原队列消费事件
- try:
- while True:
- try:
- event_type, data = await asyncio.wait_for(
- queue.get(),
- timeout=DEBUG_GLOBAL_TIMEOUT,
- )
- except asyncio.TimeoutError:
- yield format_sse_event("error", {
- "task_id": task_id,
- "message": "重连等待超时",
- })
- break
- if event_type == "__end__":
- break
- yield format_sse_event(event_type, data)
- if event_type in ("completed", "error"):
- break
- finally:
- _running_tasks.pop(task_id, None)
- return StreamingResponse(
- event_generator(),
- media_type="text/event-stream",
- headers={
- "Cache-Control": "no-cache",
- "X-Accel-Buffering": "no",
- },
- )
- # ============================================================
- # 内部辅助函数
- # ============================================================
- async def _run_debug_execution(
- request: DebugExecuteRequest,
- event_queue: asyncio.Queue,
- task_id: str,
- record_id: str,
- ) -> None:
- """
- 执行审查调试后台任务。
- 根据 isolation_mode 选择执行路径:
- - True: 使用 IsolationRunner.run_selected_steps()
- - False: 使用 DebugExecutor.execute()
- """
- try:
- if request.isolation_mode:
- await _run_isolation_mode(request, event_queue, task_id)
- else:
- await _run_normal_mode(request, event_queue, task_id, record_id)
- except asyncio.CancelledError:
- await event_queue.put(("error", {
- "task_id": task_id,
- "message": "执行被取消",
- }))
- except Exception as exc:
- logger.exception("[_run_debug_execution] 执行异常")
- await event_queue.put(("error", {
- "task_id": task_id,
- "message": str(exc),
- }))
- async def _run_normal_mode(
- request: DebugExecuteRequest,
- event_queue: asyncio.Queue,
- task_id: str,
- record_id: str,
- ) -> None:
- """正常模式:使用 DebugExecutor 执行"""
- from core.debug.executor import DebugExecutor
- executor = DebugExecutor()
- await executor.execute(request, event_queue)
- async def _run_isolation_mode(
- request: DebugExecuteRequest,
- event_queue: asyncio.Queue,
- task_id: str,
- ) -> None:
- """隔离模式:使用 IsolationRunner 执行选中的步骤"""
- from core.debug.isolation_runner import IsolationRunner
- from core.debug.step_dispatcher import CHAIN_STEPS
- runner = IsolationRunner()
- chain_id = request.chain_id
- params = {
- "content": request.content,
- "reference": request.reference or "",
- "model": request.model,
- "function_name": request.function_name,
- "timeout": request.timeout,
- }
- if request.rag_params:
- params["rag_params"] = request.rag_params
- manual_inputs = request.manual_inputs or {}
- selected_indices = list(request.isolation_steps) if request.isolation_steps else []
- step_result_dicts = await runner.run_selected_steps(
- chain_id=chain_id,
- selected_indices=selected_indices,
- manual_inputs=manual_inputs,
- **params,
- )
- # 从 step_def 获取 phase 信息
- step_defs = CHAIN_STEPS.get(chain_id, [])
- current_phase = None
- for sr in step_result_dicts:
- si = sr.get("index", 0)
- # 获取 phase 信息
- phase = None
- if si < len(step_defs):
- sd = step_defs[si]
- if hasattr(sd, "phase"):
- phase = sd.phase
- elif isinstance(sd, dict):
- phase = sd.get("phase")
- # phase_label:阶段切换时推送
- if phase and phase != current_phase:
- current_phase = phase
- await event_queue.put(("phase_label", {
- "task_id": task_id,
- "label": phase,
- }))
- # step_progress
- await event_queue.put(("step_progress", {
- "task_id": task_id,
- "step_index": si,
- "step_name": sr.get("name", ""),
- "status": "running",
- "duration": None,
- }))
- # step_result
- await event_queue.put(("step_result", {
- "task_id": task_id,
- "step_index": si,
- "step_name": sr.get("name", ""),
- "status": sr.get("status", ""),
- "duration": sr.get("duration", 0),
- "input": sr.get("input", {}),
- "output": sr.get("output", {}),
- "error": sr.get("error"),
- }))
- # completed 事件
- total_duration = sum(
- sr.get("duration", 0) or 0 for sr in step_result_dicts
- )
- success_count = sum(
- 1 for sr in step_result_dicts if sr.get("status") == "success"
- )
- error_count = sum(
- 1 for sr in step_result_dicts if sr.get("status") == "error"
- )
- skipped_count = sum(
- 1 for sr in step_result_dicts if sr.get("status") == "skipped"
- )
- await event_queue.put(("completed", {
- "task_id": task_id,
- "chain_id": chain_id,
- "total_duration": round(total_duration, 3),
- "record_id": "",
- "final_result": {
- "summary": (
- f"{success_count}/{len(step_result_dicts)} 步骤成功, "
- f"{error_count} 错误, {skipped_count} 跳过"
- ),
- "success_count": success_count,
- "error_count": error_count,
- "skipped_count": skipped_count,
- "total_steps": len(step_result_dicts),
- },
- }))
- async def _save_debug_record(
- request: DebugExecuteRequest,
- task_id: str,
- record_id: str,
- chain_id: str,
- completed_data: Optional[dict],
- error_message: Optional[str],
- steps: List[dict],
- ) -> None:
- """通过 RecordManager 保存调用记录"""
- from core.debug.record_manager import RecordManager
- duration_ms = 0
- final_result = ""
- if completed_data:
- duration_sec = completed_data.get("total_duration", 0) or 0
- duration_ms = int(duration_sec * 1000)
- final_result = str(completed_data.get("final_result", {}).get("summary", ""))
- status = "succ"
- if error_message:
- if "超时" in str(error_message):
- status = "timeout"
- else:
- status = "fail"
- record_data = {
- "id": record_id,
- "time": datetime.now().isoformat(),
- "chain": chain_id,
- "chain_name": CHAIN_NAMES.get(chain_id, ""),
- "doc_ref": "",
- "status": status,
- "duration_ms": duration_ms,
- "model": request.model or "default",
- "function_name": request.function_name or "",
- "prompt_ver": request.prompt_version or "",
- "prompt_name": "",
- "tokens": 0,
- "params": {
- "review_content": request.content,
- "review_references": request.reference or "",
- "model_override": request.model,
- "function_name": request.function_name,
- "timeout": request.timeout,
- },
- "execution_params": {
- "isolation_mode": request.isolation_mode,
- "isolation_steps": list(request.isolation_steps),
- "rag_params": (
- request.rag_params.model_dump()
- if request.rag_params else None
- ),
- },
- "steps": steps,
- "result": final_result,
- "error_message": error_message,
- }
- rm = RecordManager()
- await rm.save_record(record_data)
|