目前 run_answer_skill / run_modify_skill 节点调用 LLM 使用的是 ainvoke(非流式),模型推理期间前端收不到任何内容,直到一次性返回完整回答后才推送 chunk 事件。用户看到的体验是:进度提示 → 长时间空窗 → 突然输出全部内容。
将 LLM 推理结果实时推送到前端,用户在生成过程中就能逐字看到回答/草案内容。
Skill 内部用 get_model_generate_stream 逐 chunk 生成,同时收集完整文本用于最终 JSON 解析。
Skill 节点通过 LangGraph 的 StreamWriter + stream_mode="custom" 将 chunk 实时推到 views.py 的 SSE 生成器。
非 SSE 路径不改:普通 POST 仍走 workflow.run() → to_response_data 一次性返回,不受流式影响。
当前项目 requirements.txt 中 langgraph==1.0.4,需要确认该版本是否支持 StreamWriter 和 stream_mode="custom"。
model_generate.py — 新增异步流式方法当前 get_model_generate_stream(第 597 行)是同步生成器,不能直接 asyncio.to_thread 包一下就用。原因:
to_thread(gen_func) 只拿到 generator 对象,迭代仍在原线程,每次迭代都阻塞事件循环await 到异步队列中做法:
async def get_model_generate_invoke_stream(...) 异步方法get_model_generate_stream,通过 asyncio.Queue 投递 chunkawait get() 逐 chunk yield_ThinkingBlockStreamFilter 思考内容过滤保留function_name 加载模型配置 + enable_thinking 配置(与非流式行为一致)支持超时:用 asyncio.wait_for 包装 queue.get()
async def get_model_generate_invoke_stream(
self, trace_id, system_prompt, user_prompt, timeout, function_name, enable_thinking
) -> AsyncGenerator[str, None]:
# worker 线程跑同步流式,queue 投递 chunk
# 主异步循环从 queue 消费
schemas.py — 调整错误 response_type当前 DocumentChatSkillOutput.response_type 只允许 "answer" | "proposal" | "clarify" | "unsupported",方案中流式超时返回 "error" 会校验失败。
做法:
"error":Literal["answer", "proposal", "clarify", "unsupported", "error"]DocumentChatData.response_type 已经包含 "error" 保持一致base.py — 增加流式 run 接口不能用 AsyncGenerator[str, DocumentChatSkillOutput](Python async generator 不能 return value)。
做法:
async def run_stream(
self,
skill_input: DocumentChatSkillInput,
on_chunk: Callable[[str], None],
) -> DocumentChatSkillOutput:
"""流式执行。每次生成一个 chunk 时调用 on_chunk,最终返回完整结果。"""
raise NotImplementedError
默认实现:调用非流式 run(),将整个 answer 一次性传给 on_chunk,保持向后兼容。
为什么不用 AsyncGenerator:
on_chunk callback 模式更符合 LangGraph 节点的需求(节点需要最终 return state update)document_answer.py + document_modify.py — 实现流式生成共同流程:
Step 1 的异步流式方法on_chunk(chunk)extract_json_object 解析 JSON 提取字段DocumentChatSkillOutput 返回JSON 剥离策略:
on_chunk 中推送的是完整 LLM 原始 chunk(包含 JSON 结构字符){"answer": "回答内容"...} 等完整文本on_chunk 中维护 JSON 解析状态机,只推送 answer 字段的值(实现更复杂但用户体验好)推荐:后端推送原始 chunk,前端处理剥离。 原因:
extract_json_object 已支持 fenced JSON 和纯 JSON 两种格式skill_dispatcher.py — 增加 run_skill_streamasync def run_skill_stream(
self,
skill_name: str,
skill_input: DocumentChatSkillInput,
on_chunk: Callable[[str], None],
) -> DocumentChatSkillOutput:
if skill_name not in self._definitions:
raise ValueError(...)
skill = self._get_instance(skill_name)
return await skill.run_stream(skill_input, on_chunk)
workflow.py — skill 节点用 StreamWriter 推送 chunk当前 _run_skill 方法直接调 run_skill。需要改为:
async def run_answer_skill_node(self, state, writer: StreamWriter):
...
skill_input = self._build_skill_input(state)
def _on_chunk(chunk: str):
writer({"stream_chunk": chunk})
skill_result = await self.skill_dispatcher.run_skill_stream(
"document-answer", skill_input, on_chunk=_on_chunk
)
return {
"skill_result": model_to_dict(skill_result),
"response_type": skill_result.response_type,
"current_stage": "run_answer_skill",
}
views.py — SSE 接收 custom stream当前:
async for raw_update in workflow.get_graph().astream(graph_state, stream_mode="updates"):
改为:
stream_modes = ["updates", "custom"]
async for chunk in workflow.get_graph().astream(graph_state, stream_mode=stream_modes):
# chunk 是 (mode, payload) 或类似结构,需要分流
if mode == "custom" and "stream_chunk" in payload:
yield format_sse_event("chunk", {"chunk": payload["stream_chunk"]})
elif mode == "updates":
# 现有逻辑不变
去掉工作流结束后的一次性 chunk 推送。
| 文件 | 改动内容 |
|---|---|
foundation/ai/agent/generate/model_generate.py |
新增 get_model_generate_invoke_stream 异步方法 |
core/document_chat/schemas.py |
DocumentChatSkillOutput.response_type 增加 "error" |
core/document_chat/skills/base.py |
新增 run_stream(input, on_chunk) 抽象方法 |
core/document_chat/skills/document_answer.py |
实现 run_stream |
core/document_chat/skills/document_modify.py |
实现 run_stream |
core/document_chat/component/skill_dispatcher.py |
新增 run_skill_stream 方法 |
core/document_chat/workflows/document_chat_workflow.py |
skill 节点改用 StreamWriter + run_skill_stream |
views/document_chat/views.py |
astream 改用 ["updates", "custom"],分流处理 |
| 组件 | 是否影响 |
|---|---|
非流式接口 (run_skill) |
保留不动 |
to_response_data |
不改 |
| workflow 图结构 | 不改 |
| 意图识别、检索、重排、质量门控 | 全部不改 |
| clarify / unsupported / error 流程 | 不改 |
| 非 SSE 接口(同步返回) | 不改 |
StreamWriter / stream_mode="custom" 是否可用,不可用则需要升级{"answer": "..."} 中提取内容diff_result 死字段清理 — 前一轮改动遗留,建议拆成单独的 PR 处理,不混在本次流式改动中