流式输出改造方案.md 8.0 KB

AI 对话流式输出改造方案

当前问题

目前 run_answer_skill / run_modify_skill 节点调用 LLM 使用的是 ainvoke(非流式),模型推理期间前端收不到任何内容,直到一次性返回完整回答后才推送 chunk 事件。用户看到的体验是:进度提示 → 长时间空窗 → 突然输出全部内容。

改动目标

将 LLM 推理结果实时推送到前端,用户在生成过程中就能逐字看到回答/草案内容。


架构设计(两层)

层 1:LLM → Skill 节点(异步流式生成)

Skill 内部用 get_model_generate_stream 逐 chunk 生成,同时收集完整文本用于最终 JSON 解析。

层 2:Skill 节点 → SSE 前端(LangGraph custom stream)

Skill 节点通过 LangGraph 的 StreamWriter + stream_mode="custom" 将 chunk 实时推到 views.py 的 SSE 生成器。

非 SSE 路径不改:普通 POST 仍走 workflow.run()to_response_data 一次性返回,不受流式影响。


实施步骤

Step 0:确认 langgraph 版本(先做)

当前项目 requirements.txtlanggraph==1.0.4,需要确认该版本是否支持 StreamWriterstream_mode="custom"

  • 如果不支持,需要升级到 1.1+(或找到 1.0.4 的等价 API)
  • 如果 API 签名与新版文档不同,以 1.0.4 的实际接口为准

Step 1:改造 model_generate.py — 新增异步流式方法

当前 get_model_generate_stream(第 597 行)是同步生成器,不能直接 asyncio.to_thread 包一下就用。原因:

  • to_thread(gen_func) 只拿到 generator 对象,迭代仍在原线程,每次迭代都阻塞事件循环
  • SSE 需要真正的异步迭代,每个 chunk 到达时 await 到异步队列中

做法:

  • 新增 async def get_model_generate_invoke_stream(...) 异步方法
  • 内部用 worker 线程启动同步 get_model_generate_stream,通过 asyncio.Queue 投递 chunk
  • 异步方法从 queue 中 await get() 逐 chunk yield
  • 同步流式方法已有的 _ThinkingBlockStreamFilter 思考内容过滤保留
  • 支持 function_name 加载模型配置 + enable_thinking 配置(与非流式行为一致)
  • 支持超时:用 asyncio.wait_for 包装 queue.get()

    async def get_model_generate_invoke_stream(
    self, trace_id, system_prompt, user_prompt, timeout, function_name, enable_thinking
    ) -> AsyncGenerator[str, None]:
    # worker 线程跑同步流式,queue 投递 chunk
    # 主异步循环从 queue 消费
    

Step 2:改造 schemas.py — 调整错误 response_type

当前 DocumentChatSkillOutput.response_type 只允许 "answer" | "proposal" | "clarify" | "unsupported",方案中流式超时返回 "error" 会校验失败。

做法:

  • 在 Literal 中增加 "error"Literal["answer", "proposal", "clarify", "unsupported", "error"]
  • 这与 DocumentChatData.response_type 已经包含 "error" 保持一致

Step 3:改造 base.py — 增加流式 run 接口

不能用 AsyncGenerator[str, DocumentChatSkillOutput](Python async generator 不能 return value)。

做法:

async def run_stream(
    self,
    skill_input: DocumentChatSkillInput,
    on_chunk: Callable[[str], None],
) -> DocumentChatSkillOutput:
    """流式执行。每次生成一个 chunk 时调用 on_chunk,最终返回完整结果。"""
    raise NotImplementedError

默认实现:调用非流式 run(),将整个 answer 一次性传给 on_chunk,保持向后兼容。

为什么不用 AsyncGenerator:

  • AsyncGenerator 不能 return 最终结果
  • on_chunk callback 模式更符合 LangGraph 节点的需求(节点需要最终 return state update)

Step 4:改造 document_answer.py + document_modify.py — 实现流式生成

共同流程:

  1. 调用 Step 1 的异步流式方法
  2. 每次 chunk 到达时调用 on_chunk(chunk)
  3. 所有 chunk 收集完后拼接为完整文本
  4. extract_json_object 解析 JSON 提取字段
  5. 构造 DocumentChatSkillOutput 返回

JSON 剥离策略:

  • on_chunk 中推送的是完整 LLM 原始 chunk(包含 JSON 结构字符)
  • 前端看到的是 {"answer": "回答内容"...} 等完整文本
  • 前端自行解析提取 answer 字段内容(后端不剥离 JSON)
  • 或者:后端在 on_chunk 中维护 JSON 解析状态机,只推送 answer 字段的值(实现更复杂但用户体验好)

推荐:后端推送原始 chunk,前端处理剥离。 原因:

  • 减少后端复杂度
  • 前端本来就要做 markdown 渲染,顺手处理 JSON 结构
  • extract_json_object 已支持 fenced JSON 和纯 JSON 两种格式

Step 5:改造 skill_dispatcher.py — 增加 run_skill_stream

async def run_skill_stream(
    self,
    skill_name: str,
    skill_input: DocumentChatSkillInput,
    on_chunk: Callable[[str], None],
) -> DocumentChatSkillOutput:
    if skill_name not in self._definitions:
        raise ValueError(...)
    skill = self._get_instance(skill_name)
    return await skill.run_stream(skill_input, on_chunk)

Step 6:改造 workflow.py — skill 节点用 StreamWriter 推送 chunk

当前 _run_skill 方法直接调 run_skill。需要改为:

async def run_answer_skill_node(self, state, writer: StreamWriter):
    ...
    skill_input = self._build_skill_input(state)

    def _on_chunk(chunk: str):
        writer({"stream_chunk": chunk})

    skill_result = await self.skill_dispatcher.run_skill_stream(
        "document-answer", skill_input, on_chunk=_on_chunk
    )
    return {
        "skill_result": model_to_dict(skill_result),
        "response_type": skill_result.response_type,
        "current_stage": "run_answer_skill",
    }

Step 7:改造 views.py — SSE 接收 custom stream

当前:

async for raw_update in workflow.get_graph().astream(graph_state, stream_mode="updates"):

改为:

stream_modes = ["updates", "custom"]
async for chunk in workflow.get_graph().astream(graph_state, stream_mode=stream_modes):
    # chunk 是 (mode, payload) 或类似结构,需要分流
    if mode == "custom" and "stream_chunk" in payload:
        yield format_sse_event("chunk", {"chunk": payload["stream_chunk"]})
    elif mode == "updates":
        # 现有逻辑不变

去掉工作流结束后的一次性 chunk 推送。


改动文件清单

文件 改动内容
foundation/ai/agent/generate/model_generate.py 新增 get_model_generate_invoke_stream 异步方法
core/document_chat/schemas.py DocumentChatSkillOutput.response_type 增加 "error"
core/document_chat/skills/base.py 新增 run_stream(input, on_chunk) 抽象方法
core/document_chat/skills/document_answer.py 实现 run_stream
core/document_chat/skills/document_modify.py 实现 run_stream
core/document_chat/component/skill_dispatcher.py 新增 run_skill_stream 方法
core/document_chat/workflows/document_chat_workflow.py skill 节点改用 StreamWriter + run_skill_stream
views/document_chat/views.py astream 改用 ["updates", "custom"],分流处理

改动影响范围

组件 是否影响
非流式接口 (run_skill) 保留不动
to_response_data 不改
workflow 图结构 不改
意图识别、检索、重排、质量门控 全部不改
clarify / unsupported / error 流程 不改
非 SSE 接口(同步返回) 不改

已知风险

  1. langgraph 1.0.4 API 兼容性 — 需确认 StreamWriter / stream_mode="custom" 是否可用,不可用则需要升级
  2. 前端需要处理 JSON 结构 — 如果选择后端不剥离 JSON,前端需自行从 {"answer": "..."} 中提取内容
  3. 异步队列线程安全 — worker 线程 → queue → async consumer 需要正确处理取消、超时、异常
  4. 测试缺失 — 当前仓库没有 document_chat 相关测试,流式改动后需要补
  5. diff_result 死字段清理 — 前一轮改动遗留,建议拆成单独的 PR 处理,不混在本次流式改动中