tangle před 2 dny
rodič
revize
4f05927c30

+ 0 - 81
core/document_chat/component/diff_service.py

@@ -1,81 +0,0 @@
-# -*- coding: utf-8 -*-
-"""Deterministic diff service for document chat proposals."""
-
-import difflib
-import hashlib
-import re
-from typing import List
-
-from core.document_chat.schemas import DiffItem, DiffResult
-
-
-class DiffService:
-    """Build paragraph/line diffs, falling back to full-content comparison."""
-
-    _COMPLEX_PATTERNS = (
-        re.compile(r"<table[\s>]", re.IGNORECASE),
-        re.compile(r"</table>", re.IGNORECASE),
-        re.compile(r"!\[[^\]]*\]\("),
-        re.compile(r"<表格开始>|<表格结束>"),
-    )
-
-    def build_diff(self, old_content: str, new_content: str) -> DiffResult:
-        old_text = old_content or ""
-        new_text = new_content or ""
-        old_hash = self.hash_content(old_text)
-        new_hash = self.hash_content(new_text)
-
-        if self._is_complex(old_text) or self._is_complex(new_text):
-            return DiffResult(
-                old_content_hash=old_hash,
-                new_content_hash=new_hash,
-                diff=[DiffItem(type="full_content", old_text=old_text, new_text=new_text)],
-                diff_granularity="full_content",
-            )
-
-        old_units = self._split_units(old_text)
-        new_units = self._split_units(new_text)
-        matcher = difflib.SequenceMatcher(a=old_units, b=new_units, autojunk=False)
-        diff_items: List[DiffItem] = []
-
-        for tag, i1, i2, j1, j2 in matcher.get_opcodes():
-            old_part = "\n".join(old_units[i1:i2])
-            new_part = "\n".join(new_units[j1:j2])
-            if tag == "equal":
-                diff_items.append(DiffItem(type="equal", old_text=old_part, new_text=new_part))
-            elif tag == "insert":
-                diff_items.append(DiffItem(type="insert", old_text="", new_text=new_part))
-            elif tag == "delete":
-                diff_items.append(DiffItem(type="delete", old_text=old_part, new_text=""))
-            elif tag == "replace":
-                diff_items.append(DiffItem(type="replace", old_text=old_part, new_text=new_part))
-
-        return DiffResult(
-            old_content_hash=old_hash,
-            new_content_hash=new_hash,
-            diff=diff_items,
-            diff_granularity="line",
-        )
-
-    @staticmethod
-    def hash_content(content: str) -> str:
-        digest = hashlib.sha256((content or "").encode("utf-8")).hexdigest()
-        return f"sha256:{digest}"
-
-    def _is_complex(self, content: str) -> bool:
-        if not content:
-            return False
-        if any(pattern.search(content) for pattern in self._COMPLEX_PATTERNS):
-            return True
-        lines = [line for line in content.splitlines() if line.strip()]
-        table_like_lines = sum(1 for line in lines if line.count("|") >= 2)
-        return table_like_lines >= 2
-
-    @staticmethod
-    def _split_units(content: str) -> List[str]:
-        if not content:
-            return []
-        paragraphs = [part.strip() for part in re.split(r"\n\s*\n", content.strip()) if part.strip()]
-        if len(paragraphs) > 1:
-            return paragraphs
-        return [line.rstrip() for line in content.splitlines() if line.strip()]

+ 4 - 1
core/document_chat/component/intent_recognizer.py

@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 """Intent recognition for document chat."""
 
+import math
 from typing import Any, Dict, List
 
 from foundation.observability.logger.loggering import write_logger as logger
@@ -70,7 +71,7 @@ class IntentRecognizer:
         return self._heuristic_intent(user_message, skill_registry)
 
     def _normalize_intent(self, value: Dict[str, Any], skill_registry: List[Dict[str, Any]]) -> IntentResult:
-        allowed_skills = {skill["name"] for skill in skill_registry}
+        allowed_skills = {skill.get("name") for skill in skill_registry if skill.get("name")}
         skill_intents = {
             str(skill.get("name")): str(skill.get("intent"))
             for skill in skill_registry
@@ -183,6 +184,8 @@ class IntentRecognizer:
             confidence = float(value)
         except (TypeError, ValueError):
             confidence = 0.0
+        if math.isnan(confidence):
+            return 0.0
         return min(max(confidence, 0.0), 1.0)
 
     @staticmethod

+ 3 - 3
core/document_chat/component/retrieval_service.py

@@ -33,7 +33,7 @@ class RetrievalConfig:
     dense_weight: float = 0.7
     sparse_weight: float = 0.3
     ranker_type: str = "weighted"
-    warnings: Dict[str, str] = None
+    warnings: Optional[Dict[str, str]] = None
 
 
 def load_retrieval_config() -> RetrievalConfig:
@@ -185,7 +185,7 @@ class DocumentChatRetrievalService:
         from foundation.database.base.vector.milvus_vector import MilvusVectorManager
 
         expr = self._build_filter_expr(scope)
-        if not expr and not self.config.allow_unscoped_search:
+        if not expr:
             return []
 
         results = MilvusVectorManager().hybrid_search(
@@ -317,7 +317,7 @@ class DocumentChatRetrievalService:
 
     def _warning(self, key: str) -> str:
         warnings = self.config.warnings or _default_warnings()
-        return warnings.get(key) or _default_warnings().get(key) or ""
+        return warnings.get(key) or ""
 
 
 def _default_warnings() -> Dict[str, str]:

+ 12 - 1
core/document_chat/component/skill_dispatcher.py

@@ -3,7 +3,7 @@
 
 from dataclasses import dataclass
 from pathlib import Path
-from typing import Dict, List, Type
+from typing import Callable, Dict, List, Type
 
 import yaml
 
@@ -61,6 +61,17 @@ class SkillDispatcher:
         skill = self._get_instance(skill_name)
         return await skill.run(skill_input)
 
+    async def run_skill_stream(
+        self,
+        skill_name: str,
+        skill_input: DocumentChatSkillInput,
+        on_chunk: Callable[[str], None],
+    ) -> DocumentChatSkillOutput:
+        if skill_name not in self._definitions:
+            raise ValueError(f"Unsupported document chat skill: {skill_name}")
+        skill = self._get_instance(skill_name)
+        return await skill.run_stream(skill_input, on_chunk)
+
     def _get_instance(self, skill_name: str) -> BaseDocumentChatSkill:
         if skill_name not in self._instances:
             definition = self._definitions[skill_name]

+ 1 - 1
core/document_chat/schemas.py

@@ -65,7 +65,7 @@ class DocumentChatSkillInput(BaseModel):
 
 class DocumentChatSkillOutput(BaseModel):
     skill_name: str
-    response_type: Literal["answer", "proposal", "clarify", "unsupported"]
+    response_type: Literal["answer", "proposal", "clarify", "unsupported", "error"]
     answer: Optional[str] = None
     old_content: Optional[str] = None
     proposed_content: Optional[str] = None

+ 17 - 0
core/document_chat/skills/base.py

@@ -2,6 +2,7 @@
 """Base skill definitions for document chat."""
 
 from abc import ABC, abstractmethod
+from typing import Callable
 
 from core.document_chat.schemas import DocumentChatSkillInput, DocumentChatSkillOutput
 
@@ -15,3 +16,19 @@ class BaseDocumentChatSkill(ABC):
     async def run(self, skill_input: DocumentChatSkillInput) -> DocumentChatSkillOutput:
         """Run the skill and return a normalized output."""
         raise NotImplementedError
+
+    async def run_stream(
+        self,
+        skill_input: DocumentChatSkillInput,
+        on_chunk: Callable[[str], None],
+    ) -> DocumentChatSkillOutput:
+        """流式执行。每次生成一个 chunk 时调用 on_chunk,最终返回完整结果。
+
+        默认实现调用非流式 run(),将整个 answer 一次性传给 on_chunk,
+        子类可覆盖此方法实现真正的流式生成。
+        """
+        result = await self.run(skill_input)
+        text = result.answer or result.proposed_content or ""
+        if text:
+            on_chunk(text)
+        return result

+ 62 - 1
core/document_chat/skills/document_answer.py

@@ -1,7 +1,7 @@
 # -*- coding: utf-8 -*-
 """Document question-answering skill."""
 
-from typing import Any, List
+from typing import Any, Callable, List
 
 from foundation.observability.logger.loggering import write_logger as logger
 
@@ -65,6 +65,67 @@ class DocumentAnswerSkill(BaseDocumentChatSkill):
             logger.error(f"[DocumentChat] document answer skill failed: {exc}", exc_info=True)
             raise
 
+    async def run_stream(
+        self,
+        skill_input: DocumentChatSkillInput,
+        on_chunk: Callable[[str], None],
+    ) -> DocumentChatSkillOutput:
+        user_payload = {
+            "user_message": skill_input.user_message,
+            "normalized_instruction": skill_input.intent_result.normalized_instruction,
+            "project_info": skill_input.project_info,
+            "selected_section": model_to_dict(skill_input.selected_section),
+            "document_context": model_to_dict(skill_input.document_context),
+            "conversation_history": skill_input.conversation_history[-6:],
+            "output_schema": {
+                "answer": "回答内容",
+                "references": [{"source": "可选来源", "content": "可选依据"}],
+                "warnings": ["风险提示,可为空"],
+            },
+        }
+
+        from foundation.ai.agent.generate.model_generate import generate_model_client
+
+        full_text_parts: List[str] = []
+        warnings: List[str] = []
+
+        try:
+            async for chunk in generate_model_client.get_model_generate_invoke_stream(
+                trace_id=skill_input.conversation_id or skill_input.task_id or "document_answer",
+                system_prompt=self.system_prompt,
+                user_prompt=compact_json(user_payload),
+                timeout=self.timeout,
+                function_name=self.function_name,
+            ):
+                on_chunk(chunk)
+                full_text_parts.append(chunk)
+        except TimeoutError:
+            warnings.append("模型生成超时。")
+        except Exception as exc:
+            logger.error(f"[DocumentChat] document answer stream failed: {exc}", exc_info=True)
+            raise
+
+        full_text = "".join(full_text_parts)
+        parsed = extract_json_object(full_text)
+        answer = str(parsed.get("answer") or "").strip() if parsed else ""
+        references = skill_input.document_context.references
+        if parsed and isinstance(parsed.get("warnings"), list):
+            warnings.extend(self._list_of_strings(parsed["warnings"]))
+
+        if not answer:
+            answer = full_text.strip()
+        if not answer:
+            answer = "当前章节内容不足,无法给出有效回答。"
+            warnings.append("模型未返回有效回答。")
+
+        return DocumentChatSkillOutput(
+            skill_name=self.name,
+            response_type="answer",
+            answer=answer,
+            references=references,
+            warnings=warnings,
+        )
+
     @staticmethod
     def _list_of_strings(value: Any) -> List[str]:
         if not isinstance(value, list):

+ 68 - 2
core/document_chat/skills/document_modify.py

@@ -1,7 +1,7 @@
 # -*- coding: utf-8 -*-
 """Document modification skill."""
 
-from typing import Any, Dict, List
+from typing import Any, Callable, Dict, List
 
 from foundation.observability.logger.loggering import write_logger as logger
 
@@ -70,6 +70,72 @@ class DocumentModifySkill(BaseDocumentChatSkill):
             logger.error(f"[DocumentChat] document modify skill failed: {exc}", exc_info=True)
             raise
 
+    async def run_stream(
+        self,
+        skill_input: DocumentChatSkillInput,
+        on_chunk: Callable[[str], None],
+    ) -> DocumentChatSkillOutput:
+        selected_section = skill_input.selected_section
+        old_content = selected_section.content or ""
+        user_payload = {
+            "user_message": skill_input.user_message,
+            "normalized_instruction": skill_input.intent_result.normalized_instruction,
+            "operation": skill_input.intent_result.operation,
+            "project_info": skill_input.project_info,
+            "selected_section": model_to_dict(selected_section),
+            "document_context": model_to_dict(skill_input.document_context),
+            "conversation_history": skill_input.conversation_history[-6:],
+            "output_schema": {
+                "proposed_content": "完整的新章节正文",
+                "change_summary": ["变更摘要"],
+                "warnings": ["风险提示,可为空"],
+            },
+        }
+
+        from foundation.ai.agent.generate.model_generate import generate_model_client
+
+        full_text_parts: List[str] = []
+        warnings: List[str] = []
+
+        try:
+            async for chunk in generate_model_client.get_model_generate_invoke_stream(
+                trace_id=skill_input.conversation_id or skill_input.task_id or "document_modify",
+                system_prompt=self.system_prompt,
+                user_prompt=compact_json(user_payload),
+                timeout=self.timeout,
+                function_name=self.function_name,
+            ):
+                on_chunk(chunk)
+                full_text_parts.append(chunk)
+        except TimeoutError:
+            warnings.append("模型生成超时。")
+        except Exception as exc:
+            logger.error(f"[DocumentChat] document modify stream failed: {exc}", exc_info=True)
+            raise
+
+        full_text = "".join(full_text_parts)
+        parsed = extract_json_object(full_text)
+        proposed_content = str(parsed.get("proposed_content") or "").strip() if parsed else ""
+        change_summary = self._list_of_strings(parsed.get("change_summary")) if parsed else []
+        if parsed and isinstance(parsed.get("warnings"), list):
+            warnings.extend(self._list_of_strings(parsed["warnings"]))
+
+        if not proposed_content:
+            proposed_content = full_text.strip()
+        if not proposed_content:
+            proposed_content = old_content
+            warnings.append("模型未返回有效修改草案,已保留原章节内容。")
+
+        return DocumentChatSkillOutput(
+            skill_name=self.name,
+            response_type="proposal",
+            old_content=old_content,
+            proposed_content=proposed_content,
+            change_summary=change_summary,
+            references=skill_input.document_context.references,
+            warnings=warnings,
+        )
+
     @staticmethod
     def _list_of_strings(value: Any) -> List[str]:
         if not isinstance(value, list):
@@ -84,5 +150,5 @@ class DocumentModifySkill(BaseDocumentChatSkill):
             "你只能根据用户要求修改当前选中章节,不得生成其他章节内容。"
             "不要修改章节编号和标题,除非用户明确要求且输入允许。"
             "输出必须是 JSON 对象,包含 proposed_content、change_summary、warnings。"
-            "proposed_content 必须是完整的新章节正文,不要出现“以下是”等解释性开头。"
+            'proposed_content 必须是完整的新章节正文,不要出现"以下是"等解释性开头。'
         )

+ 20 - 28
core/document_chat/workflows/document_chat_workflow.py

@@ -9,7 +9,6 @@ from langgraph.graph import END, StateGraph
 from foundation.observability.logger.loggering import write_logger as logger
 
 from core.document_chat.component.conversation_context import ConversationContextBuilder
-from core.document_chat.component.diff_service import DiffService
 from core.document_chat.component.document_chat_logger import log_document_chat_event
 from core.document_chat.component.intent_recognizer import IntentRecognizer
 from core.document_chat.component.rerank_service import DocumentChatRerankService
@@ -18,7 +17,6 @@ from core.document_chat.component.retrieval_service import DocumentChatRetrieval
 from core.document_chat.component.skill_dispatcher import SkillDispatcher
 from core.document_chat.component.state_models import DocumentChatState
 from core.document_chat.schemas import (
-    DiffResult,
     DocumentChatData,
     DocumentChatRequest,
     DocumentChatSkillInput,
@@ -36,7 +34,6 @@ class DocumentChatWorkflow:
     def __init__(self):
         self.intent_recognizer = IntentRecognizer()
         self.skill_dispatcher = SkillDispatcher()
-        self.diff_service = DiffService()
         self.context_builder = ConversationContextBuilder()
         self.retrieval_service = DocumentChatRetrievalService()
         self.rerank_service = DocumentChatRerankService(self.retrieval_service.config)
@@ -58,7 +55,6 @@ class DocumentChatWorkflow:
         workflow.add_node("unsupported", self.unsupported_node)
         workflow.add_node("run_answer_skill", self.run_answer_skill_node)
         workflow.add_node("run_modify_skill", self.run_modify_skill_node)
-        workflow.add_node("build_diff", self.build_diff_node)
         workflow.add_node("error_handler", self.error_handler_node)
         workflow.add_node("complete", self.complete_node)
 
@@ -93,8 +89,7 @@ class DocumentChatWorkflow:
         workflow.add_edge("clarify", "complete")
         workflow.add_edge("unsupported", "complete")
         workflow.add_edge("run_answer_skill", "complete")
-        workflow.add_edge("run_modify_skill", "build_diff")
-        workflow.add_edge("build_diff", "complete")
+        workflow.add_edge("run_modify_skill", "complete")
         workflow.add_edge("error_handler", "complete")
         workflow.add_edge("complete", END)
         return workflow.compile()
@@ -126,7 +121,6 @@ class DocumentChatWorkflow:
             "retrieval_metrics": {},
             "intent_result": None,
             "skill_result": None,
-            "diff_result": None,
             "response_type": None,
             "current_stage": "start",
             "overall_task_status": "processing",
@@ -394,10 +388,27 @@ class DocumentChatWorkflow:
     async def run_modify_skill_node(self, state: DocumentChatState) -> Dict[str, Any]:
         return await self._run_skill(state, "document-modify", "run_modify_skill")
 
-    async def _run_skill(self, state: DocumentChatState, skill_name: str, stage: str) -> Dict[str, Any]:
+    async def _run_skill(
+        self,
+        state: DocumentChatState,
+        skill_name: str,
+        stage: str,
+    ) -> Dict[str, Any]:
         try:
             skill_input = self._build_skill_input(state)
-            skill_result = await self.skill_dispatcher.run_skill(skill_name, skill_input)
+
+            def _on_chunk(chunk: str):
+                from langgraph.config import get_stream_writer
+                try:
+                    writer = get_stream_writer()
+                    writer({"stream_chunk": chunk})
+                except Exception:
+                    # 非流式路径(如 workflow.run())或不支持 StreamWriter 时跳过
+                    pass
+
+            skill_result = await self.skill_dispatcher.run_skill_stream(
+                skill_name, skill_input, on_chunk=_on_chunk
+            )
             return {
                 "skill_result": model_to_dict(skill_result),
                 "response_type": skill_result.response_type,
@@ -406,20 +417,6 @@ class DocumentChatWorkflow:
         except Exception as exc:
             return self._error_update(stage, exc)
 
-    async def build_diff_node(self, state: DocumentChatState) -> Dict[str, Any]:
-        if state.get("error_message"):
-            return {}
-        skill_result = state.get("skill_result") or {}
-        old_content = skill_result.get("old_content")
-        if old_content is None:
-            old_content = (state.get("selected_section") or {}).get("content", "")
-        new_content = skill_result.get("proposed_content") or ""
-        diff_result = self.diff_service.build_diff(old_content, new_content)
-        return {
-            "diff_result": model_to_dict(diff_result),
-            "current_stage": "build_diff",
-        }
-
     async def error_handler_node(self, state: DocumentChatState) -> Dict[str, Any]:
         error_message = state.get("error_message") or "document chat workflow failed"
         logger.error(f"[DocumentChat] workflow error: {error_message}")
@@ -440,7 +437,6 @@ class DocumentChatWorkflow:
     def to_response_data(self, state: DocumentChatState) -> DocumentChatData:
         skill_result = state.get("skill_result") or {}
         intent_result = state.get("intent_result")
-        diff_result = state.get("diff_result") or {}
         selected_section = state.get("selected_section") or {}
         warnings = []
         warnings.extend(state.get("warnings") or [])
@@ -456,10 +452,6 @@ class DocumentChatWorkflow:
             intent_result=intent_result,
             answer=skill_result.get("answer"),
             proposed_content=skill_result.get("proposed_content"),
-            old_content_hash=diff_result.get("old_content_hash"),
-            new_content_hash=diff_result.get("new_content_hash"),
-            diff=diff_result.get("diff") or [],
-            diff_granularity=diff_result.get("diff_granularity"),
             change_summary=skill_result.get("change_summary") or [],
             references=approved_references,
             retrieval_status=state.get("retrieval_status"),

+ 336 - 233
docs/文档编辑AI对话接口文档.md

@@ -2,38 +2,41 @@
 
 ## 1. 接口用途
 
-用于文档编辑页中,针对“当前选中章节”发起 AI 对话。当前接口支持两类能力:
+`/sgbx/document_chat` 用于文档编辑页中,围绕“当前选中章节”发起 AI 对话。当前支持两类能力:
 
 - 章节问答:总结、解释、分析、判断当前章节是否合理或完整。
 - 章节修改:润色、扩写、改写、补充、压缩、优化当前章节正文,并返回修改草案。
 
 注意:
 
-- 本接口只用于文档编辑 AI 对话,不影响方案编写、大纲生成、章节续写等 `construction_write` 接口。
-- SSE 中的 `reasoning` 是可展示的处理过程,不是模型原始思维链。
-- 原始 `<think>...</think>` 推理内容不会透出。
-- 修改类请求只返回草案和 diff,不直接保存或替换章节。
+- 本接口只处理文档编辑 AI 对话,不影响方案编写、大纲生成、章节续写等 `construction_write` 接口。
+- 修改类请求只返回草案,不直接保存或替换章节。
+- 当前版本不生成 diff;返回体中的 `diff`、`old_content_hash`、`new_content_hash`、`diff_granularity` 是保留字段,默认为空。
 - `references` 只返回通过质量门控、实际提交给大模型的知识库参考。
+- SSE 中的 `reasoning` 是可展示的处理过程,不是模型原始思维链;原始 `<think>...</think>` 内容不会透出。
+- 当前 SSE 会流式推送流程事件,但 `chunk` 仍是在模型生成完成后一次性推送完整回答或完整草案,不是 token 级逐字输出。
 
-## 2. 判断问答还是内容编写
+## 2. 意图判定
 
-接口不是根据最终大模型返回文本来判断“问答”或“编写”,而是在工作流前置阶段先做意图识别。
+接口不是根据最终文本判断“问答”或“修改”,而是在工作流前置阶段先执行意图识别。
 
 ### 2.1 判定入口
 
-意图识别节点:`recognize_intent`
+- 工作流节点:`recognize_intent`
+- 模型功能名:`document_chat_intent`
+- 可调用技能白名单:`document-answer`、`document-modify`
 
-模型功能名:`document_chat_intent`
+意图识别模型的核心输入:
 
-输入给意图识别模型的核心信息:
-
-- `message`:用户本轮输入。
-- `selected_section.index/title/code/content_preview`:当前选中章节信息和正文预览。
-- `project_info`:项目信息。
-- `document_context`:前后文、同级章节、检索范围。
-- `available_skills`:当前允许调用的技能列表。
+| 字段 | 说明 |
+| --- | --- |
+| `message` | 用户本轮输入 |
+| `selected_section.index/title/code/content_preview` | 当前选中章节信息和正文预览 |
+| `project_info` | 项目信息 |
+| `document_context` | 前后文、同级章节、检索范围 |
+| `available_skills` | 后端允许调用的技能列表 |
 
-意图识别模型必须返回 JSON:
+意图识别模型返回 JSON,示例
 
 ```json
 {
@@ -50,42 +53,25 @@
 }
 ```
 
-### 2.2 判定规则
+### 2.2 路由规则
 
 | 判定结果 | 条件 | 后续执行 | 最终 `response_type` |
 | --- | --- | --- | --- |
-| 普通问答 | `skill_name=document-answer` | 执行 `DocumentAnswerSkill` | `answer` |
-| 内容编写/修改 | `skill_name=document-modify` | 执行 `DocumentModifySkill`,再生成 diff | `proposal` |
-| 需要澄清 | `needs_clarification=true` 或 `confidence < 0.65` | 返回澄清问题 | `clarify` |
+| 章节问答 | `skill_name=document-answer` | 执行 `DocumentAnswerSkill` | `answer` |
+| 章节修改 | `skill_name=document-modify` | 执行 `DocumentModifySkill` | `proposal` |
+| 需要澄清 | `needs_clarification=true`、`intent=clarify` 或 `confidence < 0.65` | 返回澄清问题 | `clarify` |
 | 不支持 | `intent=unsupported` 或 skill 不在白名单 | 返回不支持说明 | `unsupported` |
-| 异常 | 工作流或模型调用异常 | 返回错误 | `error` |
-
-### 2.3 白名单保护
-
-`skill_name` 只能从后端加载的 skill 白名单中选择:
-
-- `document-answer`
-- `document-modify`
-
-`intent` 与 `skill_name` 必须一致:
+| 异常 | 工作流或模型调用异常 | 返回错误信息 | `error` |
 
-- `document-answer` 对应 `intent=document_answer`。
-- `document-modify` 对应 `intent=document_modify`。
-- 如果模型返回 `intent=unsupported` 但 `skill_name=document-answer` 或 `document-modify`,后端会按白名单 skill 自动修正并继续执行对应能力。
+后端会做白名单归一化:如果模型返回的 `intent` 与 `skill_name` 不一致,但 `skill_name` 命中白名单且不需要澄清,则优先信任白名单 skill 并修正 `intent`。
 
-如果模型返回了不存在的 skill:
+### 2.3 意图识别失败兜底
 
-- `intent=document_modify` 时,后端会修正为 `document-modify`。
-- `intent=document_answer` 时,后端会修正为 `document-answer`。
-- 仍无法匹配时,判定为 `unsupported`。
-
-### 2.4 模型失败时的兜底规则
-
-如果意图识别模型异常或返回非 JSON,后端会使用关键词兜底:
+如果意图识别模型异常或返回非 JSON,后端使用关键词兜底:
 
 | 用户输入包含 | 兜底意图 |
 | --- | --- |
-| 怎么完善、如何完善、完善建议、修改建议、优化建议、补充建议、怎么改、如何改 | `document_answer` |
+| 怎么完善、如何完善、怎样完善、完善建议、修改建议、优化建议、补充建议、怎么改、如何改 | `document_answer` |
 | 润色、扩写、改写、修改、补充、完善、压缩、简化、优化、替换、重写 | `document_modify` |
 | 解释、说明、总结、分析、是否、为什么、哪里、问题、合理、缺少 | `document_answer` |
 | 空消息 | `clarify` |
@@ -99,7 +85,7 @@
 POST /sgbx/document_chat
 ```
 
-### 3.2 SSE 流式
+### 3.2 SSE
 
 ```http
 POST /sgbx/document_chat?stream=true
@@ -113,21 +99,36 @@ POST /sgbx/document_chat?stream=true
 }
 ```
 
+当 query 参数 `stream=true` 或请求体 `response_mode=sse` 任一成立时,接口返回 `text/event-stream`。
+
 ### 3.3 健康检查
 
 ```http
 GET /sgbx/document_chat/health
 ```
 
+返回示例:
+
+```json
+{
+  "status": "healthy",
+  "module": "document_chat",
+  "workflow": "langgraph",
+  "skills": ["document-answer", "document-modify"]
+}
+```
+
 ## 4. 请求参数
 
+请求体不允许传入未定义字段。
+
 | 字段 | 类型 | 必填 | 说明 |
 | --- | --- | --- | --- |
 | `user_id` | string | 是 | 用户 ID |
-| `message` | string | 是 | 用户问题或修改要求 |
+| `message` | string | 是 | 用户问题或修改要求,不能为空 |
 | `selected_section` | object | 是 | 当前选中章节 |
-| `conversation_id` | string | 否 | 会话 ID |
-| `task_id` | string | 否 | 业务任务 ID |
+| `conversation_id` | string/null | 否 | 会话 ID |
+| `task_id` | string/null | 否 | 业务任务 ID |
 | `project_info` | object | 否 | 项目信息 |
 | `document_context` | object | 否 | 章节上下文和检索范围 |
 | `conversation_history` | array | 否 | 历史对话 |
@@ -141,8 +142,8 @@ GET /sgbx/document_chat/health
 | `title` | string | 是 | 章节标题 |
 | `content` | string | 否 | 当前章节正文 |
 | `code` | string | 否 | 章节编码 |
-| `chapter_level_1` | string | 否 | 一级章节分类,用于 RAG 检索 |
-| `chapter_level_2` | string | 否 | 二级章节分类,用于 RAG 检索 |
+| `chapter_level_1` | string | 否 | 一级章节分类,用于相似章节检索 |
+| `chapter_level_2` | string | 否 | 二级章节分类,用于相似章节检索 |
 
 `document_context`:
 
@@ -151,6 +152,7 @@ GET /sgbx/document_chat/health
 | `before` | string | 当前章节前文 |
 | `after` | string | 当前章节后文 |
 | `siblings` | array | 同级章节摘要 |
+| `references` | array | 入参允许传入,但生成阶段会被后端质量门控后的知识库参考覆盖 |
 | `retrieval_filters` | object | RAG 检索范围 |
 
 `retrieval_filters` 常用字段:
@@ -164,6 +166,8 @@ GET /sgbx/document_chat/health
 }
 ```
 
+检索范围还可以从 `selected_section.chapter_level_1`、`selected_section.chapter_level_2` 或 `project_info` 中补齐。
+
 ## 5. 请求示例
 
 ### 5.1 章节问答
@@ -230,6 +234,8 @@ GET /sgbx/document_chat/health
 
 ## 6. 普通 JSON 返回
 
+### 6.1 问答成功
+
 ```json
 {
   "code": 200,
@@ -237,8 +243,19 @@ GET /sgbx/document_chat/health
   "data": {
     "callback_task_id": "doc_chat_abc123",
     "response_type": "answer",
-    "intent_result": {},
-    "answer": "回答内容",
+    "intent_result": {
+      "intent": "document_answer",
+      "confidence": 0.86,
+      "skill_name": "document-answer",
+      "operation": "answer",
+      "target_scope": "selected_section",
+      "normalized_instruction": "总结当前章节并判断是否完整",
+      "needs_clarification": false,
+      "clarification_question": "",
+      "reason": "",
+      "warnings": []
+    },
+    "answer": "本节主要介绍工程概况、施工对象和主要施工内容。当前内容覆盖了主要结构类型,但现场条件、施工准备和关键工程特点仍可补充。",
     "proposed_content": null,
     "old_content_hash": null,
     "new_content_hash": null,
@@ -247,33 +264,85 @@ GET /sgbx/document_chat/health
     "change_summary": [],
     "references": [],
     "retrieval_status": "low_confidence",
-    "retrieval_metrics": {},
+    "retrieval_metrics": {
+      "approved_count": 0,
+      "retrieval_method": "chapter_similarity"
+    },
+    "warnings": ["未找到可信度足够的知识库片段,本次未引用向量库内容。"],
+    "selected_section": {
+      "index": "2.1",
+      "code": "overview_DesignSummary_ProjectIntroduction",
+      "title": "工程简介"
+    },
+    "error_message": null
+  }
+}
+```
+
+### 6.2 修改成功
+
+```json
+{
+  "code": 200,
+  "message": "success",
+  "data": {
+    "callback_task_id": "doc_chat_def456",
+    "response_type": "proposal",
+    "intent_result": {
+      "intent": "document_modify",
+      "confidence": 0.88,
+      "skill_name": "document-modify",
+      "operation": "modify",
+      "target_scope": "selected_section",
+      "normalized_instruction": "补充当前章节施工准备、现场条件和工程特点描述",
+      "needs_clarification": false,
+      "clarification_question": "",
+      "reason": "",
+      "warnings": []
+    },
+    "answer": null,
+    "proposed_content": "本工程为某桥梁施工项目,主要包括桩基、承台、墩柱及上部结构施工。施工前应完成图纸会审、测量复核、技术交底和临时设施布置...",
+    "old_content_hash": null,
+    "new_content_hash": null,
+    "diff": [],
+    "diff_granularity": null,
+    "change_summary": ["补充施工准备", "增加现场条件描述"],
+    "references": [],
+    "retrieval_status": "low_confidence",
+    "retrieval_metrics": {
+      "approved_count": 0,
+      "retrieval_method": "chapter_similarity"
+    },
     "warnings": [],
-    "selected_section": {},
+    "selected_section": {
+      "index": "2.1",
+      "code": "overview_DesignSummary_ProjectIntroduction",
+      "title": "工程简介"
+    },
     "error_message": null
   }
 }
 ```
 
-`data` 字段说明:
+### 6.3 字段说明
 
 | 字段 | 类型 | 说明 |
 | --- | --- | --- |
 | `callback_task_id` | string | 本次请求 ID |
-| `response_type` | string | 返回类型 |
+| `response_type` | string | 返回类型,见下表 |
 | `intent_result` | object/null | 意图识别结果 |
-| `answer` | string/null | 问答结果或澄清说明 |
+| `answer` | string/null | 问答结果、澄清问题或不支持说明 |
 | `proposed_content` | string/null | 修改后的完整章节正文草案 |
-| `old_content_hash` | string/null | 原正文 hash |
-| `new_content_hash` | string/null | 新正文 hash |
-| `diff` | array | 新旧内容对比 |
-| `diff_granularity` | string/null | `line` 或 `full_content` |
-| `change_summary` | array | 修改摘要 |
-| `references` | array | 实际引用的知识库参考 |
+| `old_content_hash` | string/null | 保留字段,当前为 `null` |
+| `new_content_hash` | string/null | 保留字段,当前为 `null` |
+| `diff` | array | 保留字段,当前为空数组 |
+| `diff_granularity` | string/null | 保留字段,当前为 `null` |
+| `change_summary` | array | 修改摘要,仅 `proposal` 常见 |
+| `references` | array | 通过质量门控并提交给大模型的知识库参考 |
 | `retrieval_status` | string/null | RAG 状态 |
 | `retrieval_metrics` | object | RAG 指标 |
 | `warnings` | array | 提示信息 |
-| `selected_section` | object | 当前章节摘要 |
+| `selected_section` | object | 当前章节摘要,只返回 `index/code/title` |
 | `error_message` | string/null | 错误信息 |
 
 `response_type` 取值:
@@ -281,32 +350,56 @@ GET /sgbx/document_chat/health
 | 值 | 说明 |
 | --- | --- |
 | `answer` | 普通问答 |
-| `proposal` | 内容编写/修改草案 |
+| `proposal` | 内容修改草案 |
 | `clarify` | 需要用户补充说明 |
 | `unsupported` | 当前能力不支持 |
 | `error` | 执行异常 |
 
+普通 JSON 模式下,工作流内错误通常返回 `code=500` 且 `data.response_type=error`;请求处理层未捕获的异常会返回 HTTP 500。
+
 ## 7. RAG 状态
 
-| `retrieval_status` | 说明 | `references` |
+| `retrieval_status` | 出现场景 | 最终 `references` |
 | --- | --- | --- |
 | `usable` | 有高质量参考,已提交给大模型 | 非空 |
-| `no_scope` | 缺少可靠检索范围 | 空数组 |
+| `low_confidence` | 召回或重排内容质量不足,未通过质量门控 | 空数组 |
+| `no_scope` | 缺少可靠检索范围,且不允许无范围检索 | 空数组 |
 | `no_recall` | 没有召回内容 | 空数组 |
 | `rerank_failed` | 重排失败 | 空数组 |
-| `low_confidence` | 召回内容质量不足 | 空数组 |
-| `disabled` | RAG 关闭 | 空数组 |
-| `null` | 未进入 RAG,例如澄清或不支持 | 空数组 |
+| `disabled` | RAG 配置关闭 | 空数组 |
+| `recalled` | 已召回,通常只在中间状态出现 | 以最终结果为准 |
+| `reranked` | 已重排,通常只在中间状态出现 | 以最终结果为准 |
+| `null` | 未进入 RAG,例如澄清、不支持或早期异常 | 空数组 |
 
 说明:
 
-- 只有 `retrieval_status=usable` 时,`references` 才表示本次实际引用的知识库内容。
-- 召回但未过质量门控的内容不会进入最终 `references`。
-- SSE 调试事件中可能返回候选片段预览,正式结果仍以最终 `references` 为准。
+- 最终 `references` 只取质量门控后的 `approved_references`。
+- 召回但未通过质量门控的内容不会进入最终 `references`。
+- SSE 的 `retrieval_result` 是重排阶段的过程预览,不等同于最终 `references`。
+
+`retrieval_method` 常见取值:
+
+| retrieval_method | 说明 |
+| --- | --- |
+| `chapter_similarity` | 根据 `chapter_level_1` 和 `chapter_level_2` 走相似章节片段检索 |
+| `milvus_hybrid_vector` | 走 Milvus hybrid search 检索 |
+| `disabled` | RAG 配置关闭 |
+| `empty_query` | 未构建出有效检索 query |
+| `no_scope` | 缺少可靠检索范围,且不允许无范围检索 |
+| `unknown` | 检索异常或未能识别方式 |
 
 ## 8. SSE 事件
 
-SSE 格式:
+SSE 响应头:
+
+```http
+Content-Type: text/event-stream
+Cache-Control: no-cache
+Connection: keep-alive
+X-Accel-Buffering: no
+```
+
+SSE 数据格式:
 
 ```text
 event: event_name
@@ -314,36 +407,48 @@ data: {"callback_task_id":"doc_chat_abc123"}
 
 ```
 
-### 8.1 事件顺序
+### 8.1 典型事件顺序
 
-典型问答或修改流程:
+问答或修改流程:
 
 ```text
 connected
 processing
-reasoning
+reasoning          # recognize_intent
 intent
-reasoning
-retrieval_query
-reasoning
-retrieval_recalled
-reasoning
-retrieval_reranked
-reasoning
-retrieval_approved
-retrieval
+reasoning          # rerank_context
+retrieval_result
 skill_started
-reasoning
-diff_ready              # 仅 proposal 场景可能出现
-chunk
-answer_completed        # answer/clarify/unsupported
-proposal_completed      # proposal
+reasoning          # run_answer_skill 或 run_modify_skill
+chunk              # 完整回答或完整草案,一次性推送
+answer_completed   # answer
+proposal_completed # proposal
 completed
 ```
 
-实际事件会根据流程分支变化。例如 `clarify` 和 `unsupported` 不会进入 RAG 检索。
+澄清或不支持流程:
+
+```text
+connected
+processing
+reasoning          # recognize_intent
+intent
+answer_completed   # response_type=clarify 或 unsupported
+completed
+```
+
+错误流程:
+
+```text
+connected
+processing
+reasoning          # error_handler,视错误发生位置而定
+error
+```
+
+实际事件会根据流程分支变化。当前不会发送 `retrieval_query`、`retrieval_recalled`、`retrieval_reranked`、`retrieval_approved`、`retrieval`、`diff_ready` 等事件。
 
-### 8.2 基础事件
+### 8.2 事件清单
 
 | event | 说明 |
 | --- | --- |
@@ -351,90 +456,81 @@ completed
 | `processing` | 工作流启动 |
 | `reasoning` | 可展示处理过程 |
 | `intent` | 意图识别结果 |
+| `retrieval_result` | 重排阶段的参考片段预览 |
 | `skill_started` | 技能开始执行 |
-| `chunk` | 最终回答或草案文本块 |
+| `chunk` | 完整回答或完整草案文本 |
 | `answer_completed` | 问答、澄清或不支持流程完成 |
 | `proposal_completed` | 修改草案完成 |
-| `error` | 错误 |
 | `completed` | SSE 流程结束 |
+| `error` | 错误 |
 
-### 8.3 新增过程事件
+### 8.3 事件 payload
 
-#### reasoning
+#### connected
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123",
+  "status": "connected",
+  "timestamp": 1779696000
+}
+```
 
-可展示处理过程,不是模型原始思维链。
+#### processing
 
 ```json
 {
   "callback_task_id": "doc_chat_abc123",
-  "stage_name": "recognize_intent",
+  "stage_name": "workflow_started",
   "status": "processing",
-  "message": "已完成用户意图识别"
+  "message": "文档 AI 对话工作流已启动"
 }
 ```
 
-常见 `stage_name`:
-
-| stage_name | 说明 |
-| --- | --- |
-| `validate_input` | 校验输入 |
-| `load_context` | 整理上下文 |
-| `load_skill_registry` | 加载技能 |
-| `recognize_intent` | 识别意图 |
-| `route_intent` | 路由到问答、修改、澄清或不支持 |
-| `build_retrieval_query` | 构建 RAG 检索问题 |
-| `vector_recall` | 向量召回 |
-| `rerank_context` | 重排召回片段 |
-| `quality_gate` | 质量门控 |
-| `run_answer_skill` | 生成问答 |
-| `run_modify_skill` | 生成修改草案 |
-| `build_diff` | 生成 diff |
-| `complete` | 流程完成 |
-
-#### retrieval_query
-
-返回本次 RAG 查询文本。
+#### reasoning
 
 ```json
 {
   "callback_task_id": "doc_chat_abc123",
-  "query": "项目名称:某桥梁施工方案\n工程类型:桥梁工程\n章节:2.1 工程简介\n用户需求:总结一下这一节..."
+  "stage_name": "recognize_intent",
+  "status": "processing",
+  "message": "已完成用户意图识别"
 }
 ```
 
-#### retrieval_recalled
+当前会主动转成 `reasoning` 的阶段:
+
+| `stage_name` | `message` |
+| --- | --- |
+| `recognize_intent` | 已完成用户意图识别 |
+| `rerank_context` | 知识库内容检索重排完成 |
+| `run_answer_skill` | 已生成章节问答结果 |
+| `run_modify_skill` | 已生成章节修改草案 |
+| `error_handler` | 流程异常,已进入错误处理 |
 
-返回向量召回结果预览。`candidates` 最多返回 8 条,每条内容最多约 600 字。
+#### intent
 
 ```json
 {
   "callback_task_id": "doc_chat_abc123",
-  "retrieval_status": "recalled",
-  "retrieval_method": "chapter_similarity",
-  "retrieval_metrics": {
-    "recall_count": 18,
-    "max_vector_similarity": 0.78
-  },
-  "candidate_count": 18,
-  "candidates": [
-    {
-      "source": "相似施工方案A",
-      "snippet": "施工准备包括图纸会审、测量复核、临时设施布置...",
-      "vector_similarity": 0.78,
-      "metadata": {
-        "knowledge_base_id": "kb-bridge-001",
-        "file_name": "相似施工方案A",
-        "source_scope_valid": true
-      }
-    }
-  ],
-  "warnings": []
+  "intent_result": {
+    "intent": "document_answer",
+    "confidence": 0.86,
+    "skill_name": "document-answer",
+    "operation": "answer",
+    "target_scope": "selected_section",
+    "normalized_instruction": "总结当前章节并判断是否完整",
+    "needs_clarification": false,
+    "clarification_question": "",
+    "reason": "",
+    "warnings": []
+  }
 }
 ```
 
-#### retrieval_reranked
+#### retrieval_result
 
-返回重排后的参考片段预览
+`references` 最多 8 条,每条 `content` 最多约 600 字。该事件用于过程展示或调试,最终引用以完成事件中的 `references` 为准。
 
 ```json
 {
@@ -443,6 +539,7 @@ completed
   "retrieval_method": "chapter_similarity",
   "retrieval_metrics": {
     "recall_count": 18,
+    "max_vector_similarity": 0.78,
     "rerank_count": 8,
     "max_rerank_score": 0.86
   },
@@ -455,7 +552,10 @@ completed
       "rerank_score": 0.86,
       "metadata": {
         "knowledge_base_id": "kb-bridge-001",
-        "file_name": "相似施工方案A"
+        "file_name": "相似施工方案A",
+        "chapter_level_1": "technology",
+        "chapter_level_2": "MethodsOverview",
+        "source_scope_valid": true
       }
     }
   ],
@@ -463,122 +563,134 @@ completed
 }
 ```
 
-#### retrieval_approved
-
-返回通过质量门控、实际提交给大模型的参考资料。前端默认应展示这个事件,而不是默认展示全部召回候选。
+#### skill_started
 
 ```json
 {
   "callback_task_id": "doc_chat_abc123",
-  "retrieval_status": "usable",
-  "retrieval_method": "chapter_similarity",
-  "retrieval_metrics": {
-    "recall_count": 18,
-    "rerank_count": 8,
-    "approved_count": 1,
-    "max_rerank_score": 0.86
-  },
-  "approved_count": 1,
-  "references": [
-    {
-      "source": "相似施工方案A",
-      "content": "施工准备包括图纸会审、测量复核、临时设施布置...",
-      "vector_similarity": 0.78,
-      "rerank_score": 0.86,
-      "metadata": {
-        "knowledge_base_id": "kb-bridge-001",
-        "file_name": "相似施工方案A"
-      }
-    }
-  ],
-  "warnings": []
+  "skill_name": "document-answer",
+  "response_type": "answer"
 }
 ```
 
-#### retrieval
+#### chunk
 
-兼容旧事件,当前 payload 与 `retrieval_approved` 一致。
+```json
+{
+  "callback_task_id": "doc_chat_abc123",
+  "chunk": "本节主要介绍工程概况、施工对象和主要施工内容..."
+}
+```
 
-#### diff_ready
+#### answer_completed
 
-修改草案生成 diff 后返回摘要。
+`answer`、`clarify`、`unsupported` 都使用该事件完成。payload 为完整 `DocumentChatData`。
 
 ```json
 {
   "callback_task_id": "doc_chat_abc123",
-  "diff_granularity": "line",
-  "diff_count": 3,
-  "old_content_hash": "xxx",
-  "new_content_hash": "yyy"
+  "response_type": "answer",
+  "intent_result": {},
+  "answer": "本节主要介绍工程概况...",
+  "proposed_content": null,
+  "change_summary": [],
+  "references": [],
+  "retrieval_status": "low_confidence",
+  "retrieval_metrics": {},
+  "warnings": [],
+  "selected_section": {
+    "index": "2.1",
+    "code": "overview_DesignSummary_ProjectIntroduction",
+    "title": "工程简介"
+  },
+  "error_message": null
 }
 ```
 
-## 9. SSE 完成事件示例
-
-### 9.1 问答完成
+#### proposal_completed
 
-```text
-event: chunk
-data: {"callback_task_id":"doc_chat_abc123","chunk":"本节主要介绍工程概况、施工对象和主要施工内容..."}
+payload 为完整 `DocumentChatData`。
 
-event: answer_completed
-data: {"callback_task_id":"doc_chat_abc123","response_type":"answer","answer":"本节主要介绍工程概况...","references":[]}
+```json
+{
+  "callback_task_id": "doc_chat_def456",
+  "response_type": "proposal",
+  "intent_result": {},
+  "answer": null,
+  "proposed_content": "本工程为某桥梁施工项目,主要包括桩基...",
+  "change_summary": ["补充施工准备", "增加现场条件描述"],
+  "references": [],
+  "retrieval_status": "low_confidence",
+  "retrieval_metrics": {},
+  "warnings": [],
+  "selected_section": {
+    "index": "2.1",
+    "code": "overview_DesignSummary_ProjectIntroduction",
+    "title": "工程简介"
+  },
+  "error_message": null
+}
+```
 
-event: completed
-data: {"callback_task_id":"doc_chat_abc123","status":"completed","duration":3.218}
+#### completed
 
+```json
+{
+  "callback_task_id": "doc_chat_abc123",
+  "status": "completed",
+  "duration": 3.218
+}
 ```
 
-### 9.2 修改完成
+#### error
 
-```text
-event: chunk
-data: {"callback_task_id":"doc_chat_def456","chunk":"本工程为某桥梁施工项目,主要包括桩基..."}
-
-event: proposal_completed
-data: {"callback_task_id":"doc_chat_def456","response_type":"proposal","proposed_content":"本工程为某桥梁施工项目...","diff":[...],"change_summary":["补充施工准备","增加现场条件描述"]}
+```json
+{
+  "callback_task_id": "doc_chat_abc123",
+  "response_type": "error",
+  "error_message": "错误信息"
+}
+```
 
-event: completed
-data: {"callback_task_id":"doc_chat_def456","status":"completed","duration":5.642}
+如果 SSE 生成器外层捕获到异常,`error` payload 可能是:
 
+```json
+{
+  "callback_task_id": "doc_chat_abc123",
+  "status": "error",
+  "message": "错误信息"
+}
 ```
 
-## 10. 前端处理建议
+## 9. 前端处理建议
 
-- `intent`:只用于展示本轮识别为“问答”或“修改”,不要把 `intent_result.reason` 当成最终 assistant 消息。
-- `reasoning`:展示为处理进度,例如“正在检索参考资料”“已完成重排”。
-- `retrieval_query`、`retrieval_recalled`、`retrieval_reranked`:建议放在调试详情或折叠面板
-- `retrieval_approved` 或 `retrieval`:展示“本次引用资料”
-- `response_type=answer`展示 `answer`。
-- `response_type=proposal`:展示 `proposed_content` 和 `diff`,用户确认后替换当前章节。
-- `response_type=clarify`展示 `answer`,引导用户补充说明。
-- `response_type=unsupported`展示 `answer` 或不支持说明。
-- `response_type=error`:展示 `error_message`。
-- 替换和保存章节由前端或业务后端完成,本 AI 服务不保存文档
+- 按 `callback_task_id` 归并同一次请求的 SSE 事件
+- `intent` 只用于展示本轮识别为“问答”或“修改”,不要把 `intent_result.reason` 当成最终 assistant 消息
+- `reasoning` 展示为处理进度,例如“已完成用户意图识别”“知识库内容检索重排完成”
+- `retrieval_result` 建议放在“检索详情”或折叠面板;正式引用资料以完成事件和普通 JSON 返回中的 `references` 为准
+- `response_type=answer`展示 `answer`。
+- `response_type=proposal` 时展示 `proposed_content` 和 `change_summary`;用户确认后由前端或业务后端替换当前章节。
+- `response_type=clarify`展示 `answer`,引导用户补充说明。
+- `response_type=unsupported`展示 `answer` 或不支持说明。
+- `response_type=error` 时展示 `error_message` 或 `message`。
+- 不要依赖当前保留的 diff 字段;当前服务端不生成 diff
 
-## 11. 对接边界
+## 10. 对接边界
 
 - 本文档只适用于 `/sgbx/document_chat`。
-- 方案编写接口,例如 `/sgbx/generating_outline`、`/sgbx/content_completion`,不返回文档对话的 `reasoning`、`retrieval_*`、`diff_ready` 事件
+- 方案编写接口,例如 `/sgbx/generating_outline`、`/sgbx/content_completion`,不复用本文档的事件语义
 - 如果前端同时对接方案编写和文档编辑 AI 对话,应按接口路径区分事件处理逻辑。
+- AI 服务不保存文档,也不直接替换章节;保存和版本管理由前端或业务后端完成。
 
-## 12. 服务端日志
+## 11. 服务端日志
 
-文档编辑 AI 对话会写入独立日志目录
+文档编辑 AI 对话会写入独立模块日志:
 
 ```text
 logs/document_chat/
 ```
 
-主要文件:
-
-| 文件 | 说明 |
-| --- | --- |
-| `document_chat_info.log` | 正常请求、RAG、输出结果 |
-| `document_chat_error.log` | 异常请求 |
-| `document_chat_debug.log` | debug 及以上级别日志 |
-
-日志按 `callback_task_id` 串联一次请求,日志消息体为 JSON 字符串,核心事件如下:
+日志按 `callback_task_id` 串联一次请求,日志消息体为 JSON 字符串。核心事件:
 
 | event | 记录内容 |
 | --- | --- |
@@ -589,16 +701,7 @@ logs/document_chat/
 | `rag_rerank_skipped` | 未进入重排时的 RAG 状态和原因 |
 | `rag_quality_gate_completed` | 质量门控状态、重排结果、最终可引用结果 |
 | `rag_quality_gate_skipped` | 未进入质量门控时的 RAG 状态和原因 |
-| `response_completed` | 最终输出结果,包括 `answer`、`proposed_content`、`diff`、`references` |
+| `response_completed` | 最终输出结果 |
 | `request_failed` | 异常信息和请求参数 |
 
-`retrieval_method` 常见取值:
-
-| retrieval_method | 说明 |
-| --- | --- |
-| `chapter_similarity` | 根据 `chapter_level_1` 和 `chapter_level_2` 走相似章节片段检索 |
-| `milvus_hybrid_vector` | 走 Milvus hybrid search 检索 |
-| `disabled` | RAG 配置关闭 |
-| `empty_query` | 未构建出有效检索 query |
-| `no_scope` | 缺少可靠检索范围,且不允许无范围检索 |
-| `unknown` | 检索异常或未能识别方式 |
+`response_completed` 当前不包含服务端生成的 diff;如需前端展示新旧内容对比,需要由前端或后续专门接口生成。

+ 270 - 0
docs/流式输出API文档.md

@@ -0,0 +1,270 @@
+# 文档 AI 对话 — 流式输出 API 文档
+
+> 改造说明:后端已增加流式输出能力,LLM 推理过程实时推送给前端,前端可按需展示打字效果。
+
+## 接口基本信息
+
+| 项目 | 内容 |
+|------|------|
+| URL | `POST /sgbx/document_chat` |
+| 非流式 | 查询参数 `stream=false`(默认),返回完整 JSON |
+| 流式 | 查询参数 `stream=true` 或 `response_mode="sse"`,返回 SSE 事件流 |
+| Content-Type | `application/json` |
+| Response Content-Type | `text/event-stream`(流式)/ `application/json`(非流式) |
+
+## 请求体(流式/非流式共用)
+
+```json
+{
+  "user_id": "string,必填",
+  "message": "string,必填,用户问题",
+  "conversation_id": "string,可选,对话历史 ID",
+  "task_id": "string,可选,任务 ID",
+  "response_mode": "sse 或 blocking,可选,默认 blocking",
+  "project_info": {
+    "tenant_id": "string",
+    "project_id": "string"
+  },
+  "selected_section": {
+    "index": "string,必填,章节索引",
+    "title": "string,必填,章节标题",
+    "code": "string,可选,章节编号",
+    "content": "string,必填,章节正文"
+  },
+  "document_context": {
+    "full_text": "string,可选,文档全文",
+    "previous_section": { "title": "...", "content": "..." },
+    "next_section": { "title": "...", "content": "..." }
+  },
+  "conversation_history": [
+    { "role": "user/assistant", "content": "string" }
+  ]
+}
+```
+
+## 流式 SSE 事件格式
+
+每个事件遵循标准 SSE 协议:
+
+```
+event: <事件类型>
+data: <JSON 对象>
+
+```
+
+### 事件顺序总览
+
+```
+connected → processing(workflow_started) → reasoning(recognize_intent) → intent
+→ reasoning(rerank_context) → retrieval_result
+→ reasoning(run_answer_skill / run_modify_skill)
+→ [chunk] → [chunk] → ...  ← 实时推理流
+→ answer_completed / proposal_completed
+→ completed
+```
+
+### 1. connected — 连接建立
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123def456",
+  "status": "connected",
+  "timestamp": 1748150000
+}
+```
+
+### 2. processing — 工作流启动
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123def456",
+  "stage_name": "workflow_started",
+  "status": "processing",
+  "message": "文档 AI 对话工作流已启动"
+}
+```
+
+### 3. reasoning — 阶段进度(共 3 次)
+
+| stage_name | message |
+|---|---|
+| `recognize_intent` | "已完成用户意图识别" |
+| `rerank_context` | "知识库内容检索重排完成" |
+| `run_answer_skill` | "已生成章节问答结果" |
+| `run_modify_skill` | "已生成章节修改草案" |
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123def456",
+  "stage_name": "recognize_intent",
+  "status": "processing",
+  "message": "已完成用户意图识别"
+}
+```
+
+> **异常时** `status` 为 `"failed"`。
+
+### 4. intent — 意图识别结果
+
+紧跟 `reasoning(recognize_intent)` 之后。
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123def456",
+  "intent_result": {
+    "intent": "answer",
+    "skill_name": "document-answer",
+    "confidence": 0.92,
+    "normalized_instruction": "请解释施工准备的内容",
+    "operation": null
+  }
+}
+```
+
+### 5. retrieval_result — RAG 检索结果
+
+紧跟 `reasoning(rerank_context)` 之后。
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123def456",
+  "retrieval_status": "reranked",
+  "retrieval_method": "hybrid",
+  "retrieval_metrics": {
+    "recall_count": 12,
+    "rerank_count": 8
+  },
+  "rerank_count": 8,
+  "references": [
+    {
+      "source": "向量知识库",
+      "content": "施工准备包括...",
+      "vector_similarity": 0.87,
+      "metadata": {
+        "tenant_id": "t1",
+        "project_id": "p1",
+        "chapter_level_1": "第一章 施工准备",
+        "source_scope_valid": true
+      }
+    }
+  ],
+  "warnings": []
+}
+```
+
+> `references` 最多返回 8 条,每条 content 截取前 600 字符。
+
+### 6. chunk — 实时推理文本(改造新增)
+
+在 LLM 生成阶段持续推送,前端应拼接为完整回答并做打字效果展示。
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123def456",
+  "chunk": "施工准备是项目实施前的关键环节"
+}
+```
+
+> 前端收到多个 chunk 后拼接得到完整文本。该文本为 JSON 包裹格式,前端需从中提取 `answer` 或 `proposed_content` 字段作为展示内容。
+>
+> 思考内容(`<think>...</think>` 等)已被后端过滤,不会推送。
+
+### 7. answer_completed / proposal_completed — 最终结果
+
+**问答场景** `answer_completed`:
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123def456",
+  "response_type": "answer",
+  "intent_result": { "intent": "answer", "skill_name": "document-answer", "confidence": 0.92 },
+  "answer": "施工准备包括...(完整回答)",
+  "references": [
+    { "source": "...", "content": "...", "metadata": {}, "vector_similarity": 0.87 }
+  ],
+  "retrieval_status": "reranked",
+  "retrieval_metrics": { "recall_count": 12, "rerank_count": 8, "approved_count": 5 },
+  "warnings": [],
+  "selected_section": { "index": "2", "code": "SP-02", "title": "施工准备" },
+  "error_message": null
+}
+```
+
+**修改场景** `proposal_completed`:
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123def456",
+  "response_type": "proposal",
+  "intent_result": { "intent": "modify", "skill_name": "document-modify", "confidence": 0.88 },
+  "answer": null,
+  "proposed_content": "修改后的完整章节正文...",
+  "change_summary": ["调整了施工准备流程描述", "补充了安全要求"],
+  "references": [],
+  "retrieval_status": "reranked",
+  "retrieval_metrics": { "recall_count": 12, "rerank_count": 8, "approved_count": 5 },
+  "warnings": [],
+  "selected_section": { "index": "2", "code": "SP-02", "title": "施工准备" },
+  "error_message": null
+}
+```
+
+> **对比说明**:修改场景的 diff 对比由前端自行处理,后端不再返回 diff 结果。
+
+### 8. completed — 流程结束
+
+仅在 `response_type != "error"` 时发送。
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123def456",
+  "status": "completed",
+  "duration": 12.345
+}
+```
+
+### 9. error — 异常
+
+```json
+{
+  "callback_task_id": "doc_chat_abc123def456",
+  "status": "error",
+  "message": "错误详情"
+}
+```
+
+> error 事件发出后,**不会**再发送 completed 事件。
+
+## 非流式响应(stream=false)
+
+```json
+{
+  "code": 200,
+  "message": "success",
+  "data": {
+    "callback_task_id": "doc_chat_abc123def456",
+    "response_type": "answer",
+    "intent_result": { ... },
+    "answer": "施工准备包括...",
+    "proposed_content": null,
+    "change_summary": [],
+    "references": [ ... ],
+    "retrieval_status": "reranked",
+    "retrieval_metrics": { ... },
+    "warnings": [],
+    "selected_section": { "index": "2", "code": "SP-02", "title": "施工准备" },
+    "error_message": null
+  }
+}
+```
+
+> `code: 500` 表示异常,`message` 包含错误信息。
+
+## 前端对接要点
+
+1. **流式选择**:请求时加 `?stream=true` 或 body 中 `response_mode: "sse"`
+2. **chunk 拼接**:将所有 `chunk` 事件的 `chunk` 字段拼接,从结果 JSON 中提取 `answer` 或 `proposed_content` 做展示
+3. **diff 对比**:修改场景下,前端自行对 `proposed_content` 与原章节 `content` 做 diff 展示
+4. **进度展示**:监听 `reasoning` 事件的 `message` 字段作为用户可见的进度提示
+5. **错误处理**:收到 `error` 事件即终止,不再等待 `completed`
+6. **健康检查**:`GET /sgbx/document_chat/health`

+ 199 - 0
docs/流式输出改造方案.md

@@ -0,0 +1,199 @@
+# AI 对话流式输出改造方案
+
+## 当前问题
+
+目前 `run_answer_skill` / `run_modify_skill` 节点调用 LLM 使用的是 `ainvoke`(非流式),模型推理期间前端收不到任何内容,直到一次性返回完整回答后才推送 `chunk` 事件。用户看到的体验是:进度提示 → 长时间空窗 → 突然输出全部内容。
+
+## 改动目标
+
+将 LLM 推理结果实时推送到前端,用户在生成过程中就能逐字看到回答/草案内容。
+
+---
+
+## 架构设计(两层)
+
+### 层 1:LLM → Skill 节点(异步流式生成)
+
+Skill 内部用 `get_model_generate_stream` 逐 chunk 生成,同时收集完整文本用于最终 JSON 解析。
+
+### 层 2:Skill 节点 → SSE 前端(LangGraph custom stream)
+
+Skill 节点通过 LangGraph 的 `StreamWriter` + `stream_mode="custom"` 将 chunk 实时推到 views.py 的 SSE 生成器。
+
+**非 SSE 路径不改**:普通 POST 仍走 `workflow.run()` → `to_response_data` 一次性返回,不受流式影响。
+
+---
+
+## 实施步骤
+
+### Step 0:确认 langgraph 版本(先做)
+
+当前项目 `requirements.txt` 中 `langgraph==1.0.4`,需要确认该版本是否支持 `StreamWriter` 和 `stream_mode="custom"`。
+- 如果不支持,需要升级到 1.1+(或找到 1.0.4 的等价 API)
+- 如果 API 签名与新版文档不同,以 1.0.4 的实际接口为准
+
+### Step 1:改造 `model_generate.py` — 新增异步流式方法
+
+当前 `get_model_generate_stream`(第 597 行)是同步生成器,**不能直接 `asyncio.to_thread` 包一下就用**。原因:
+- `to_thread(gen_func)` 只拿到 generator 对象,迭代仍在原线程,每次迭代都阻塞事件循环
+- SSE 需要真正的异步迭代,每个 chunk 到达时 `await` 到异步队列中
+
+**做法:**
+- 新增 `async def get_model_generate_invoke_stream(...)` 异步方法
+- 内部用 worker 线程启动同步 `get_model_generate_stream`,通过 `asyncio.Queue` 投递 chunk
+- 异步方法从 queue 中 `await get()` 逐 chunk yield
+- 同步流式方法已有的 `_ThinkingBlockStreamFilter` 思考内容过滤保留
+- 支持 `function_name` 加载模型配置 + `enable_thinking` 配置(与非流式行为一致)
+- 支持超时:用 `asyncio.wait_for` 包装 queue.get()
+
+```python
+async def get_model_generate_invoke_stream(
+    self, trace_id, system_prompt, user_prompt, timeout, function_name, enable_thinking
+) -> AsyncGenerator[str, None]:
+    # worker 线程跑同步流式,queue 投递 chunk
+    # 主异步循环从 queue 消费
+```
+
+### Step 2:改造 `schemas.py` — 调整错误 response_type
+
+当前 `DocumentChatSkillOutput.response_type` 只允许 `"answer" | "proposal" | "clarify" | "unsupported"`,方案中流式超时返回 `"error"` 会校验失败。
+
+**做法:**
+- 在 Literal 中增加 `"error"`:`Literal["answer", "proposal", "clarify", "unsupported", "error"]`
+- 这与 `DocumentChatData.response_type` 已经包含 `"error"` 保持一致
+
+### Step 3:改造 `base.py` — 增加流式 run 接口
+
+不能用 `AsyncGenerator[str, DocumentChatSkillOutput]`(Python async generator 不能 return value)。
+
+**做法:**
+```python
+async def run_stream(
+    self,
+    skill_input: DocumentChatSkillInput,
+    on_chunk: Callable[[str], None],
+) -> DocumentChatSkillOutput:
+    """流式执行。每次生成一个 chunk 时调用 on_chunk,最终返回完整结果。"""
+    raise NotImplementedError
+```
+
+默认实现:调用非流式 `run()`,将整个 answer 一次性传给 `on_chunk`,保持向后兼容。
+
+**为什么不用 AsyncGenerator:**
+- AsyncGenerator 不能 return 最终结果
+- `on_chunk` callback 模式更符合 LangGraph 节点的需求(节点需要最终 return state update)
+
+### Step 4:改造 `document_answer.py` + `document_modify.py` — 实现流式生成
+
+**共同流程:**
+1. 调用 `Step 1` 的异步流式方法
+2. 每次 chunk 到达时调用 `on_chunk(chunk)`
+3. 所有 chunk 收集完后拼接为完整文本
+4. 用 `extract_json_object` 解析 JSON 提取字段
+5. 构造 `DocumentChatSkillOutput` 返回
+
+**JSON 剥离策略:**
+- `on_chunk` 中推送的是**完整 LLM 原始 chunk**(包含 JSON 结构字符)
+- 前端看到的是 `{"answer": "回答内容"...}` 等完整文本
+- 前端自行解析提取 answer 字段内容(后端不剥离 JSON)
+- 或者:后端在 `on_chunk` 中维护 JSON 解析状态机,只推送 answer 字段的值(实现更复杂但用户体验好)
+
+**推荐:后端推送原始 chunk,前端处理剥离。** 原因:
+- 减少后端复杂度
+- 前端本来就要做 markdown 渲染,顺手处理 JSON 结构
+- `extract_json_object` 已支持 fenced JSON 和纯 JSON 两种格式
+
+### Step 5:改造 `skill_dispatcher.py` — 增加 `run_skill_stream`
+
+```python
+async def run_skill_stream(
+    self,
+    skill_name: str,
+    skill_input: DocumentChatSkillInput,
+    on_chunk: Callable[[str], None],
+) -> DocumentChatSkillOutput:
+    if skill_name not in self._definitions:
+        raise ValueError(...)
+    skill = self._get_instance(skill_name)
+    return await skill.run_stream(skill_input, on_chunk)
+```
+
+### Step 6:改造 `workflow.py` — skill 节点用 StreamWriter 推送 chunk
+
+当前 `_run_skill` 方法直接调 `run_skill`。需要改为:
+
+```python
+async def run_answer_skill_node(self, state, writer: StreamWriter):
+    ...
+    skill_input = self._build_skill_input(state)
+
+    def _on_chunk(chunk: str):
+        writer({"stream_chunk": chunk})
+
+    skill_result = await self.skill_dispatcher.run_skill_stream(
+        "document-answer", skill_input, on_chunk=_on_chunk
+    )
+    return {
+        "skill_result": model_to_dict(skill_result),
+        "response_type": skill_result.response_type,
+        "current_stage": "run_answer_skill",
+    }
+```
+
+### Step 7:改造 `views.py` — SSE 接收 custom stream
+
+当前:
+```python
+async for raw_update in workflow.get_graph().astream(graph_state, stream_mode="updates"):
+```
+
+改为:
+```python
+stream_modes = ["updates", "custom"]
+async for chunk in workflow.get_graph().astream(graph_state, stream_mode=stream_modes):
+    # chunk 是 (mode, payload) 或类似结构,需要分流
+    if mode == "custom" and "stream_chunk" in payload:
+        yield format_sse_event("chunk", {"chunk": payload["stream_chunk"]})
+    elif mode == "updates":
+        # 现有逻辑不变
+```
+
+去掉工作流结束后的一次性 `chunk` 推送。
+
+---
+
+## 改动文件清单
+
+| 文件 | 改动内容 |
+|------|---------|
+| `foundation/ai/agent/generate/model_generate.py` | 新增 `get_model_generate_invoke_stream` 异步方法 |
+| `core/document_chat/schemas.py` | `DocumentChatSkillOutput.response_type` 增加 `"error"` |
+| `core/document_chat/skills/base.py` | 新增 `run_stream(input, on_chunk)` 抽象方法 |
+| `core/document_chat/skills/document_answer.py` | 实现 `run_stream` |
+| `core/document_chat/skills/document_modify.py` | 实现 `run_stream` |
+| `core/document_chat/component/skill_dispatcher.py` | 新增 `run_skill_stream` 方法 |
+| `core/document_chat/workflows/document_chat_workflow.py` | skill 节点改用 `StreamWriter` + `run_skill_stream` |
+| `views/document_chat/views.py` | `astream` 改用 `["updates", "custom"]`,分流处理 |
+
+---
+
+## 改动影响范围
+
+| 组件 | 是否影响 |
+|------|---------|
+| 非流式接口 (`run_skill`) | 保留不动 |
+| `to_response_data` | 不改 |
+| workflow 图结构 | 不改 |
+| 意图识别、检索、重排、质量门控 | 全部不改 |
+| clarify / unsupported / error 流程 | 不改 |
+| 非 SSE 接口(同步返回) | 不改 |
+
+---
+
+## 已知风险
+
+1. **langgraph 1.0.4 API 兼容性** — 需确认 `StreamWriter` / `stream_mode="custom"` 是否可用,不可用则需要升级
+2. **前端需要处理 JSON 结构** — 如果选择后端不剥离 JSON,前端需自行从 `{"answer": "..."}` 中提取内容
+3. **异步队列线程安全** — worker 线程 → queue → async consumer 需要正确处理取消、超时、异常
+4. **测试缺失** — 当前仓库没有 document_chat 相关测试,流式改动后需要补
+5. **`diff_result` 死字段清理** — 前一轮改动遗留,建议拆成单独的 PR 处理,不混在本次流式改动中

+ 129 - 1
foundation/ai/agent/generate/model_generate.py

@@ -13,9 +13,11 @@ from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
 from foundation.ai.models.model_handler import model_handler
 from foundation.observability.logger.loggering import write_logger as logger
 import asyncio
+import queue
 import re
+import threading
 import time
-from typing import Optional, Callable, Any, List, Union
+from typing import Optional, Callable, Any, AsyncGenerator, List, Union
 
 
 def _is_non_retryable_model_error(error: Exception) -> bool:
@@ -694,4 +696,130 @@ class GenerateModelClient:
             logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
             raise
 
+    async def get_model_generate_invoke_stream(
+        self,
+        trace_id: str,
+        system_prompt: Optional[str] = None,
+        user_prompt: Optional[str] = None,
+        prompt: Optional[str] = None,
+        timeout: Optional[int] = None,
+        model_name: Optional[str] = None,
+        enable_thinking: Optional[bool] = False,
+        function_name: Optional[str] = None,
+    ) -> AsyncGenerator[str, None]:
+        """模型流式生成(异步生成器)
+
+        内部用 worker 线程启动同步流式调用,通过 asyncio.Queue 投递 chunk,
+        实现真正的异步流式输出,不阻塞事件循环。
+
+        Args:
+            trace_id: 追踪ID
+            system_prompt: 系统提示词
+            user_prompt: 用户提示词
+            prompt: 单条用户提示词字符串
+            timeout: 超时时间(秒)
+            model_name: 模型名称
+            enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
+            function_name: 功能名称(可选),如提供则从配置加载模型
+
+        Yields:
+            str: 生成的文本块
+        """
+        current_timeout = timeout or self.default_timeout
+
+        # 加载模型配置(与异步版本一致的逻辑)
+        if function_name:
+            try:
+                from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
+                config_model = get_model_for_function(function_name)
+                config_thinking = get_thinking_mode_for_function(function_name)
+                if config_model:
+                    model_name = config_model
+                    logger.info(f"[模型流式-异步] 从配置加载功能 '{function_name}' 的模型: {model_name}")
+                if config_thinking is not None and enable_thinking is False:
+                    enable_thinking = config_thinking
+                    logger.info(f"[模型流式-异步] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
+            except Exception as e:
+                logger.warning(f"[模型流式-异步] 加载功能配置失败 [{function_name}]: {e}")
+
+        if not model_name:
+            try:
+                from foundation.ai.models.model_config_loader import get_model_for_function
+                model_name = get_model_for_function("default")
+                logger.info(f"[模型流式-异步] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
+            except Exception as e:
+                logger.warning(f"[模型流式-异步] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
+
+        # 选择模型并处理 Qwen3.5 thinking
+        llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
+        logger.info(f"[模型流式-异步] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
+
+        final_messages = self._build_messages(
+            system_prompt=system_prompt,
+            user_prompt=user_prompt,
+            prompt=prompt,
+        )
+
+        model_to_invoke = llm_to_use
+        is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
+        if is_qwen35:
+            if enable_thinking is False:
+                model_to_invoke = llm_to_use.bind(
+                    extra_body={"chat_template_kwargs": {"enable_thinking": False}}
+                )
+                logger.debug(f"[模型流式-异步] 已禁用 Qwen3.5 思考模式: {model_name}")
+            elif enable_thinking is True:
+                model_to_invoke = llm_to_use.bind(
+                    extra_body={"chat_template_kwargs": {"enable_thinking": True}}
+                )
+                logger.debug(f"[模型流式-异步] 已启用 Qwen3.5 思考模式: {model_name}")
+
+        # 用 Queue 桥接同步流式生成 → 异步消费
+        q: asyncio.Queue = asyncio.Queue()
+        sentinel = object()  # 结束标记
+
+        def _worker():
+            """Worker 线程:运行同步流式生成器,把 chunk 放入 Queue。"""
+            try:
+                response = model_to_invoke.stream(final_messages)
+                think_filter = _ThinkingBlockStreamFilter()
+                for chunk in response:
+                    if hasattr(chunk, 'content') and chunk.content:
+                        cleaned = think_filter.feed(chunk.content)
+                        if cleaned:
+                            q.put_nowait(cleaned)
+                    elif chunk:
+                        text = chunk.content if hasattr(chunk, 'content') else str(chunk)
+                        if text:
+                            q.put_nowait(text)
+                tail = think_filter.flush()
+                if tail:
+                    q.put_nowait(tail)
+            except Exception as e:
+                logger.error(f"[模型流式-异步] worker 线程异常 trace_id: {trace_id}: {e}", exc_info=True)
+                q.put_nowait(None)  # None 表示异常
+            finally:
+                q.put_nowait(sentinel)
+
+        thread = threading.Thread(target=_worker, daemon=True)
+        thread.start()
+        start_time = time.time()
+
+        try:
+            while True:
+                item = await asyncio.wait_for(q.get(), timeout=current_timeout)
+                if item is sentinel:
+                    break
+                if item is None:
+                    logger.warning(f"[模型流式-异步] worker 线程发生异常,提前结束 trace_id: {trace_id}")
+                    break
+                yield item
+
+            elapsed_time = time.time() - start_time
+            logger.info(f"[模型流式-异步] 完成 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
+        except asyncio.TimeoutError:
+            elapsed_time = time.time() - start_time
+            logger.error(f"[模型流式-异步] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
+            raise TimeoutError(f"模型流式调用超时,trace_id: {trace_id}")
+
 generate_model_client = GenerateModelClient(default_timeout=60, max_retries=10, backoff_factor=0.5)

+ 34 - 113
views/document_chat/views.py

@@ -22,22 +22,12 @@ REFERENCE_PREVIEW_CHARS = 600
 
 
 STAGE_MESSAGES = {
-    "validate_input": "已校验对话输入",
-    "load_context": "已整理当前章节上下文",
-    "load_skill_registry": "已加载文档对话技能",
+    "workflow_started": "文档 AI 对话工作流已启动",
     "recognize_intent": "已完成用户意图识别",
-    "route_intent": "已确定对话处理路径",
-    "build_retrieval_query": "已构建知识库检索问题",
-    "vector_recall": "已完成知识库向量召回",
-    "rerank_context": "已完成召回片段重排",
-    "quality_gate": "已完成参考资料质量门控",
-    "clarify": "需要用户补充说明",
-    "unsupported": "当前请求不在文档对话能力范围内",
+    "rerank_context": "知识库内容检索重排完成",
     "run_answer_skill": "已生成章节问答结果",
     "run_modify_skill": "已生成章节修改草案",
-    "build_diff": "已生成新旧内容对比",
     "error_handler": "流程异常,已进入错误处理",
-    "complete": "文档 AI 对话流程完成",
 }
 
 
@@ -95,16 +85,6 @@ def _safe_metadata(metadata: Any) -> Dict[str, Any]:
     return {key: metadata.get(key) for key in allowed_keys if metadata.get(key) not in (None, "")}
 
 
-def _pack_candidate_preview(item: Dict[str, Any]) -> Dict[str, Any]:
-    metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
-    return {
-        "source": str(item.get("source") or metadata.get("file_name") or "向量知识库"),
-        "snippet": _preview_text(item.get("text")),
-        "vector_similarity": item.get("vector_similarity", 0.0),
-        "metadata": _safe_metadata(metadata),
-    }
-
-
 def _pack_reference_preview(item: Dict[str, Any]) -> Dict[str, Any]:
     metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
     content = item.get("content") if "content" in item else item.get("text")
@@ -158,39 +138,11 @@ def _build_realtime_events(
             )
         )
 
-    if node_name == "build_retrieval_query":
-        events.append(
-            (
-                "retrieval_query",
-                {
-                    "callback_task_id": callback_task_id,
-                    "query": state.get("retrieval_query") or "",
-                },
-            )
-        )
-
-    if node_name == "vector_recall":
-        candidates = state.get("retrieval_candidates") or []
-        events.append(
-            (
-                "retrieval_recalled",
-                {
-                    "callback_task_id": callback_task_id,
-                    "retrieval_status": state.get("retrieval_status"),
-                    "retrieval_method": state.get("retrieval_method"),
-                    "retrieval_metrics": state.get("retrieval_metrics") or {},
-                    "candidate_count": len(candidates),
-                    "candidates": _limited_items(candidates, _pack_candidate_preview),
-                    "warnings": state.get("warnings") or [],
-                },
-            )
-        )
-
     if node_name == "rerank_context":
         reranked = state.get("reranked_references") or []
         events.append(
             (
-                "retrieval_reranked",
+                "retrieval_result",
                 {
                     "callback_task_id": callback_task_id,
                     "retrieval_status": state.get("retrieval_status"),
@@ -204,19 +156,6 @@ def _build_realtime_events(
         )
 
     if node_name == "quality_gate":
-        approved = state.get("approved_references") or []
-        retrieval_payload = {
-            "callback_task_id": callback_task_id,
-            "retrieval_status": state.get("retrieval_status"),
-            "retrieval_method": state.get("retrieval_method"),
-            "retrieval_metrics": state.get("retrieval_metrics") or {},
-            "approved_count": len(approved),
-            "references": _limited_items(approved, _pack_reference_preview),
-            "warnings": state.get("warnings") or [],
-        }
-        events.append(("retrieval_approved", retrieval_payload))
-        events.append(("retrieval", retrieval_payload))
-
         intent_result = state.get("intent_result") or {}
         skill_name = intent_result.get("skill_name") or ""
         if skill_name and not skill_started_sent:
@@ -233,21 +172,6 @@ def _build_realtime_events(
             )
             skill_started_sent = True
 
-    if node_name == "build_diff":
-        diff_result = state.get("diff_result") or {}
-        events.append(
-            (
-                "diff_ready",
-                {
-                    "callback_task_id": callback_task_id,
-                    "diff_granularity": diff_result.get("diff_granularity"),
-                    "diff_count": len(diff_result.get("diff") or []),
-                    "old_content_hash": diff_result.get("old_content_hash"),
-                    "new_content_hash": diff_result.get("new_content_hash"),
-                },
-            )
-        )
-
     return events, skill_started_sent
 
 
@@ -326,54 +250,51 @@ async def _generate_document_chat_events(
         graph_state = dict(state)
         skill_started_sent = False
 
-        async for raw_update in workflow.get_graph().astream(graph_state, stream_mode="updates"):
-            for node_name, node_update in _iter_node_updates(raw_update):
-                _merge_state_update(state, node_update)
-                realtime_events, skill_started_sent = _build_realtime_events(
-                    callback_task_id,
-                    state,
-                    node_name,
-                    skill_started_sent,
+        async for mode, payload in workflow.get_graph().astream(
+            graph_state, stream_mode=["updates", "custom"]
+        ):
+            if mode == "custom" and isinstance(payload, dict) and payload.get("stream_chunk"):
+                yield format_sse_event(
+                    "chunk",
+                    {
+                        "callback_task_id": callback_task_id,
+                        "chunk": payload["stream_chunk"],
+                    },
                 )
-                for event_type, event_data in realtime_events:
-                    yield format_sse_event(event_type, event_data)
+            elif mode == "updates":
+                for node_name, node_update in _iter_node_updates(payload):
+                    _merge_state_update(state, node_update)
+                    realtime_events, skill_started_sent = _build_realtime_events(
+                        callback_task_id,
+                        state,
+                        node_name,
+                        skill_started_sent,
+                    )
+                    for event_type, event_data in realtime_events:
+                        yield format_sse_event(event_type, event_data)
 
         data = workflow.to_response_data(state)
         data_dict = model_to_dict(data)
         log_document_chat_event("response_completed", callback_task_id, data_dict)
 
-        if data.response_type == "answer" and data.answer:
-            yield format_sse_event(
-                "chunk",
-                {
-                    "callback_task_id": callback_task_id,
-                    "chunk": data.answer,
-                },
-            )
+        if data.response_type == "answer":
             yield format_sse_event("answer_completed", data_dict)
         elif data.response_type == "proposal":
-            if data.proposed_content:
-                yield format_sse_event(
-                    "chunk",
-                    {
-                        "callback_task_id": callback_task_id,
-                        "chunk": data.proposed_content,
-                    },
-                )
             yield format_sse_event("proposal_completed", data_dict)
         elif data.response_type in ("clarify", "unsupported"):
             yield format_sse_event("answer_completed", data_dict)
         else:
             yield format_sse_event("error", data_dict)
 
-        yield format_sse_event(
-            "completed",
-            {
-                "callback_task_id": callback_task_id,
-                "status": state.get("overall_task_status", "completed"),
-                "duration": round(time.time() - started_at, 3),
-            },
-        )
+        if data.response_type != "error":
+            yield format_sse_event(
+                "completed",
+                {
+                    "callback_task_id": callback_task_id,
+                    "status": state.get("overall_task_status", "completed"),
+                    "duration": round(time.time() - started_at, 3),
+                },
+            )
     except Exception as exc:
         logger.error(f"[DocumentChat] SSE request failed: {exc}", exc_info=True)
         log_document_chat_event(