Ver Fonte

fix(ai对话优化)

tangle há 2 dias atrás
pai
commit
ddf99daf32

+ 18 - 0
config/document_chat_retrieval.yaml

@@ -3,7 +3,12 @@ version: "1.0.0"
 
 retrieval:
   enabled: true
+  parent_collection: "t_kngs_construction_plan_parent"
   child_collection: "t_kngs_construction_plan_child"
+  parent_recall_top_k: 30
+  child_recall_top_k: 40
+  tag_recall_top_k: 30
+  chapter_recall_top_k: 15
   recall_top_k: 30
   rerank_top_k: 8
   submit_top_k: 3
@@ -16,7 +21,20 @@ retrieval:
   allow_unscoped_search: false
   dense_weight: 0.7
   sparse_weight: 0.3
+  child_dense_weight: 0.6
+  child_sparse_weight: 0.4
   ranker_type: "weighted"
+  tag_recall_enabled: true
+  tag_terms_limit: 8
+  rrf_k: 60
+  parent_vector_weight: 1.0
+  child_locator_weight: 0.8
+  tag_weight: 1.2
+  chapter_similarity_weight: 0.5
+  tag_exact_bonus: 0.08
+  tag_partial_bonus: 0.03
+  multi_source_bonus: 0.02
+  scope_bonus: 0.03
 
 warnings:
   no_scope: "缺少可靠的知识库检索范围,本次未引用向量库内容。"

+ 151 - 2
core/document_chat/component/document_chat_logger.py

@@ -1,8 +1,19 @@
 # -*- coding: utf-8 -*-
-"""Structured logging helpers for document chat."""
+"""文档对话结构化日志工具。
+
+日志分级策略:
+    info  — 计数、评分、短预览(query 前 150 字、候选前 3 条等)
+    debug — 完整内容(需单独调用 level="debug")
+
+设计目的:
+    - 控制 info 日志文件大小,避免长文本(章节内容、完整候选列表)撑爆日志
+    - 保留关键指标用于问题排查(召回数量、相似度评分、scope 匹配等)
+    - info 和 debug 写入同一个文件(ModuleLogger 不支持按级别分文件),
+      但 debug 级别可通过 log_document_chat_event(..., level="debug") 单独标记
+"""
 
 import json
-from typing import Any, Dict
+from typing import Any, Dict, Optional
 
 from foundation.infrastructure.config import config_handler
 from foundation.observability.logger.loggering import ModuleLogger
@@ -13,6 +24,9 @@ _CONSOLE_OUTPUT = config_handler.get("log", "CONSOLE_OUTPUT", "True").upper() !=
 _FILE_MAX_MB = int(config_handler.get("log", "LOG_FILE_MAX_MB", "10"))
 _BACKUP_COUNT = int(config_handler.get("log", "LOG_BACKUP_COUNT", "5"))
 
+# Info 级别日志中文本字段和列表项的截断上限
+_INFO_TEXT_LIMIT = 200
+_INFO_LIST_LIMIT = 3
 
 document_chat_logger = ModuleLogger(
     name="document_chat",
@@ -30,6 +44,14 @@ def log_document_chat_event(
     payload: Dict[str, Any],
     level: str = "info",
 ) -> None:
+    """记录一条文档对话结构化日志。
+
+    参数:
+        event:事件名(如 request_received、rag_query_built)
+        callback_task_id:全链路追踪 ID
+        payload:事件负载字典
+        level:日志级别(info / debug / warning / error)
+    """
     record = {
         "event": event,
         "callback_task_id": callback_task_id,
@@ -38,3 +60,130 @@ def log_document_chat_event(
     message = json.dumps(record, ensure_ascii=False, default=str)
     log_method = getattr(document_chat_logger, level, document_chat_logger.info)
     log_method(message, trace_id=callback_task_id, log_type="chat")
+
+
+def log_document_chat_event_truncated(
+    event: str,
+    callback_task_id: str,
+    payload: Dict[str, Any],
+    level: str = "info",
+    text_limit: int = _INFO_TEXT_LIMIT,
+) -> None:
+    """记录截断版日志,用于 info 级别控制文件大小。
+
+    会截断的字段:
+    - retrieval_query / retrieval_keywords → 150 字
+    - retrieval_candidates / reranked_references → 前 3 条,仅保留 text_preview
+    - request → 章节内容仅记录长度和前 100 字预览,历史对话仅记录条数
+    完整内容需额外调用 level="debug" 单独记录。
+    """
+    truncated = _truncate_payload(payload, text_limit)
+    log_document_chat_event(event, callback_task_id, truncated, level)
+
+
+def _truncate_payload(payload: Dict[str, Any], limit: int) -> Dict[str, Any]:
+    """递归截断 payload 中的大字段。"""
+    if not isinstance(payload, dict):
+        return payload
+
+    result = {}
+    for key, value in payload.items():
+        if key in ("retrieval_query", "retrieval_keywords"):
+            result[key] = _truncate_value(value, 150)
+        elif key == "retrieval_candidates":
+            result[key] = _truncate_candidates(value, limit)
+        elif key == "reranked_references":
+            result[key] = _truncate_candidates(value, limit)
+        elif key == "approved_references":
+            result[key] = _truncate_candidates(value, limit)
+        elif key == "retrieval_steps":
+            result[key] = _truncate_steps(value, limit)
+        elif key == "request":
+            result[key] = _truncate_request(value, limit)
+        elif key == "payload" and isinstance(value, dict):
+            result[key] = _truncate_payload(value, limit)
+        else:
+            result[key] = value
+    return result
+
+
+def _truncate_value(value: Any, limit: int) -> Any:
+    """截断超长字符串。"""
+    if isinstance(value, str) and len(value) > limit:
+        return value[:limit] + "..."
+    return value
+
+
+def _truncate_candidates(candidates: Any, limit: int) -> list:
+    """截断候选列表:仅保留前 3 条,每条仅保留 source 和 text_preview。"""
+    if not isinstance(candidates, list):
+        return []
+    result = []
+    for item in candidates[:_INFO_LIST_LIMIT]:
+        if not isinstance(item, dict):
+            continue
+        text = str(item.get("text") or item.get("content") or "")[:limit]
+        result.append({
+            "source": str(item.get("source", ""))[:40],
+            "text_preview": text,
+            "vector_similarity": item.get("vector_similarity", 0.0),
+            "rerank_score": item.get("rerank_score"),
+        })
+    if len(candidates) > _INFO_LIST_LIMIT:
+        result.append({"...": f"{len(candidates) - _INFO_LIST_LIMIT} more"})
+    return result
+
+
+def _truncate_steps(steps: Any, limit: int) -> list:
+    """截断检索步骤列表,嵌套调用 _truncate_candidates 处理子项。"""
+    if not isinstance(steps, list):
+        return []
+    result = []
+    for step in steps:
+        if not isinstance(step, dict):
+            continue
+        s = {"step": step.get("step"), "count": step.get("count")}
+        if "items" in step:
+            s["items"] = _truncate_candidates(step["items"], limit)
+        result.append(s)
+    return result
+
+
+def _truncate_request(request: Any, limit: int) -> dict:
+    """截断请求体日志,避免章节内容和历史对话撑爆日志文件。
+
+    截断策略:
+    - project_info → 仅保留 project_id 前 20 字和 engineering_type
+    - selected_section → 保留 index、title、content_len、content_preview(前 100 字)
+    - document_context → 仅保留 retrieval_filters
+    - conversation_history → 仅保留条数统计
+    - 其他字段 → _truncate_value 通用截断
+    """
+    if not isinstance(request, dict):
+        return {"...": "non-dict request"}
+    result = {}
+    for key, value in request.items():
+        if key == "project_info":
+            result[key] = {
+                "project_id": str(value.get("project_id", ""))[:20],
+                "engineering_type": value.get("engineering_type"),
+            }
+        elif key == "selected_section":
+            if isinstance(value, dict):
+                content = str(value.get("content", ""))
+                result[key] = {
+                    "index": value.get("index"),
+                    "title": str(value.get("title", ""))[:50],
+                    "content_len": len(content),
+                    "content_preview": content[:100] if content else "",
+                }
+        elif key == "document_context":
+            if isinstance(value, dict):
+                rf = value.get("retrieval_filters", {})
+                result[key] = {"retrieval_filters": rf} if rf else {}
+        elif key == "conversation_history":
+            if isinstance(value, list):
+                result[key] = f"{len(value)} items"
+        else:
+            result[key] = _truncate_value(value, limit)
+    return result

+ 41 - 6
core/document_chat/component/intent_recognizer.py

@@ -1,10 +1,21 @@
 # -*- coding: utf-8 -*-
-"""Intent recognition for document chat."""
+"""意图识别:通过 LLM 分析用户输入,判断是要问答还是修改当前章节。
+
+识别策略:
+    1. 优先使用 LLM 模型分析(调用 get_model_generate_invoke)
+    2. 模型失败或非 JSON 响应时,回退到关键词启发式规则
+
+支持的意图类型:
+    document_modify  — 用户要求润色、扩写、改写、压缩等(→ document-modify 技能)
+    document_answer  — 用户要求解释、分析、判断合理性等(→ document-answer 技能)
+    clarify          — 用户表述不清或模型置信度不足(→ 引导用户补充说明)
+    unsupported      — 超出模块能力范围(如要求画图、写代码等)
+"""
 
 import math
 from typing import Any, Dict, List
 
-from foundation.observability.logger.loggering import write_logger as logger
+from core.document_chat.component.document_chat_logger import document_chat_logger as logger
 
 from core.document_chat.component.llm_utils import compact_json, extract_json_object
 from core.document_chat.component.prompt_loader import load_prompt_config
@@ -12,7 +23,7 @@ from core.document_chat.schemas import IntentResult
 
 
 class IntentRecognizer:
-    """Recognize user intent and choose an allowed skill."""
+    """基于 LLM 的意图识别器,附带启发式兜底规则。"""
 
     def __init__(self):
         config = load_prompt_config("document_chat_intent.yaml")
@@ -20,6 +31,10 @@ class IntentRecognizer:
         self.timeout = int(config.get("timeout", 30))
 
     async def recognize(self, state: Dict[str, Any]) -> IntentResult:
+        """执行意图识别。优先 LLM,失败则回退启发式规则。
+
+        传给 LLM 的信息包括:用户输入、选中章节预览、项目信息、可用技能列表。
+        """
         skill_registry = state.get("skill_registry", [])
         user_message = state.get("user_message", "")
         selected_section = state.get("selected_section", {})
@@ -68,9 +83,17 @@ class IntentRecognizer:
         except Exception as exc:
             logger.warning(f"[DocumentChat] intent recognition failed, using heuristic fallback: {exc}")
 
+        # LLM 失败 → 关键词启发式兜底
         return self._heuristic_intent(user_message, skill_registry)
 
     def _normalize_intent(self, value: Dict[str, Any], skill_registry: List[Dict[str, Any]]) -> IntentResult:
+        """将 LLM 返回的 JSON 标准化为 IntentResult 对象。
+
+        处理逻辑:
+        1. 校验 skill_name 是否在可用技能白名单中
+        2. 如果模型返回了 skill_name 但 intent 不一致,以 skill_name 反查正确的 intent
+        3. 置信度 < 0.65 时标记为需要澄清
+        """
         allowed_skills = {skill.get("name") for skill in skill_registry if skill.get("name")}
         skill_intents = {
             str(skill.get("name")): str(skill.get("intent"))
@@ -81,6 +104,7 @@ class IntentRecognizer:
         skill_name = value.get("skill_name")
         confidence = self._coerce_confidence(value.get("confidence"))
 
+        # 将 skill_name 限制在可用技能白名单内
         if skill_name not in allowed_skills:
             if intent == "document_modify":
                 skill_name = "document-modify"
@@ -93,12 +117,12 @@ class IntentRecognizer:
             intent = "unsupported"
             skill_name = None
 
-        # The intent model can occasionally return an inconsistent pair such as
-        # intent=unsupported with skill_name=document-answer. Trust the allowlisted
-        # skill and normalize the intent so routing reaches the actual skill.
+        # 处理模型返回的不一致情况:如 intent=unsupported 但 skill_name=document-answer
+        # 以白名单中的技能为准,反查正确的 intent
         if skill_name in allowed_skills and not bool(value.get("needs_clarification")):
             intent = skill_intents.get(skill_name, intent)
 
+        # 置信度不足时需要用户补充说明
         needs_clarification = bool(value.get("needs_clarification")) or confidence < 0.65
         if needs_clarification and intent not in ("unsupported",):
             intent = "clarify"
@@ -118,6 +142,14 @@ class IntentRecognizer:
         )
 
     def _heuristic_intent(self, user_message: str, skill_registry: List[Dict[str, Any]]) -> IntentResult:
+        """基于关键词匹配的启发式意图识别,作为 LLM 的兜底方案。
+
+        关键词分类:
+        - modify_tokens:润色、扩写、改写等 → document_modify
+        - advice_tokens:怎么完善、如何改进等建议类 → document_answer
+        - answer_tokens:解释、说明、分析、是否等 → document_answer
+        - 默认兜底:document_answer(保守策略,宁可回答也不拒绝)
+        """
         message = (user_message or "").strip()
         modify_tokens = ("润色", "扩写", "改写", "修改", "补充", "完善", "压缩", "简化", "优化", "替换", "重写")
         advice_tokens = ("怎么完善", "如何完善", "怎样完善", "完善建议", "修改建议", "优化建议", "补充建议", "怎么改", "如何改")
@@ -158,6 +190,7 @@ class IntentRecognizer:
                 normalized_instruction=message,
             )
 
+        # 默认兜底:保守归类为问答
         return IntentResult(
             intent="document_answer",
             skill_name="document-answer",
@@ -168,6 +201,7 @@ class IntentRecognizer:
 
     @staticmethod
     def _registry_for_prompt(skill_registry: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """精简技能注册表,仅提取 LLM 需要的字段,避免 prompt 过大。"""
         return [
             {
                 "name": skill.get("name"),
@@ -180,6 +214,7 @@ class IntentRecognizer:
 
     @staticmethod
     def _coerce_confidence(value: Any) -> float:
+        """安全转换置信度为 0.0~1.0 的浮点数,NaN 视为 0。"""
         try:
             confidence = float(value)
         except (TypeError, ValueError):

+ 67 - 3
core/document_chat/component/llm_utils.py

@@ -3,10 +3,16 @@
 
 import json
 import re
-from typing import Any, Dict
+from typing import Any, Dict, Optional
 
 
 _FENCED_JSON_RE = re.compile(r"```(?:json)?\s*([\s\S]*?)\s*```", re.IGNORECASE)
+# Regex fallback: extract "answer" value from a JSON-like structure.
+# Handles both "answer": "..." (double-quoted) and multi-line values.
+_ANSWER_FIELD_RE = re.compile(
+    r'"answer"\s*:\s*"((?:[^"\\]|\\.)*)"',
+    re.DOTALL,
+)
 
 
 def extract_json_object(text: str) -> Dict[str, Any]:
@@ -28,13 +34,71 @@ def extract_json_object(text: str) -> Dict[str, Any]:
     start = stripped.find("{")
     end = stripped.rfind("}")
     if start >= 0 and end > start:
+        fragment = stripped[start:end + 1]
         try:
-            value = json.loads(stripped[start:end + 1])
+            value = json.loads(fragment)
             return value if isinstance(value, dict) else {}
         except json.JSONDecodeError:
-            return {}
+            # Retry with control characters escaped (common when model
+            # emits literal newlines/tabs inside string values).
+            repaired = _repair_control_chars(fragment)
+            if repaired != fragment:
+                try:
+                    value = json.loads(repaired)
+                    return value if isinstance(value, dict) else {}
+                except json.JSONDecodeError:
+                    pass
     return {}
 
 
+def extract_answer_field(text: str) -> Optional[str]:
+    """Best-effort extraction of the "answer" field from a raw LLM response.
+
+    Used as a fallback when ``extract_json_object`` fails to parse the full
+    JSON (e.g. due to unescaped control characters in streamed output).
+    """
+    if not text:
+        return None
+    match = _ANSWER_FIELD_RE.search(text)
+    if not match:
+        return None
+    raw_value = match.group(1)
+    # Unescape standard JSON escape sequences.
+    try:
+        return json.loads(f'"{raw_value}"')
+    except json.JSONDecodeError:
+        return raw_value
+
+
+def _repair_control_chars(s: str) -> str:
+    """Replace literal control chars inside JSON string values.
+
+    Models sometimes emit raw newlines / tabs inside string literals,
+    which ``json.loads`` rejects. This replaces them with proper escapes
+    while leaving the surrounding JSON structure intact.
+    """
+    # Only replace control characters that appear between quotes.
+    # A simple approach: replace all bare \n/\r/\t with escaped versions,
+    # but skip already-escaped sequences (preceded by backslash).
+    result = []
+    i = 0
+    in_string = False
+    while i < len(s):
+        c = s[i]
+        if c == '"' and (i == 0 or s[i - 1] != "\\"):
+            in_string = not in_string
+            result.append(c)
+        elif in_string and c == "\n":
+            result.append("\\n")
+        elif in_string and c == "\r":
+            result.append("\\r")
+        elif in_string and c == "\t":
+            result.append("\\t")
+        else:
+            result.append(c)
+        i += 1
+    return "".join(result)
+
+
 def compact_json(value: Any) -> str:
     return json.dumps(value, ensure_ascii=False, indent=2)

+ 1 - 1
core/document_chat/component/rerank_service.py

@@ -5,7 +5,7 @@ from __future__ import annotations
 
 from typing import Any, Dict, List, Optional
 
-from foundation.observability.logger.loggering import write_logger as logger
+from core.document_chat.component.document_chat_logger import document_chat_logger as logger
 
 from core.document_chat.component.retrieval_service import RetrievalConfig, load_retrieval_config
 

+ 48 - 3
core/document_chat/component/retrieval_quality_gate.py

@@ -1,5 +1,15 @@
 # -*- coding: utf-8 -*-
-"""Quality gate for document-chat retrieved references."""
+"""检索质量门:过滤低质量参考,仅保留高可信度内容送入 LLM。
+
+质量门过滤条件(全部满足才算合格):
+    1. 有实际文本内容(text 非空)
+    2. vector_similarity >= min_vector_similarity 或 fusion_score > 0 且有 source_hits
+    3. rerank_score >= min_rerank_score(默认 0.65)
+    4. metadata.source_scope_valid 为 True(项目ID和工程类型匹配)
+
+如果合格数量 < min_qualified_count,则返回 low_confidence 状态,
+告知上层未找到足够可信的参考。
+"""
 
 from __future__ import annotations
 
@@ -9,21 +19,31 @@ from core.document_chat.component.retrieval_service import RetrievalConfig, load
 
 
 class RetrievalQualityGate:
-    """Allow only high-quality, scoped references into LLM prompts."""
+    """质量门:仅允许与当前项目 scope 匹配且重排分数达标的参考进入 LLM prompt。"""
 
     def __init__(self, config: Optional[RetrievalConfig] = None):
         self.config = config or load_retrieval_config()
 
     def apply(self, reranked_references: List[Dict[str, Any]]) -> Dict[str, Any]:
+        """对重排后的候选列表执行质量过滤。
+
+        返回:
+        - approved_references:合格的参考(最多 submit_top_k 条)
+        - retrieval_status:usable(有合格)或 low_confidence(无合格)
+        - retrieval_metrics:统计指标
+        - warnings:告警信息
+        """
         if not reranked_references:
             return self._low_confidence([], {"approved_count": 0})
 
+        # 逐条检查是否合格
         qualified = []
         for item in reranked_references:
             if not self._is_qualified(item):
                 continue
             qualified.append(self._pack_reference(item))
 
+        # 合格数量不足,整体降级为 low_confidence
         if len(qualified) < self.config.min_qualified_count:
             metrics = {
                 "approved_count": 0,
@@ -32,6 +52,7 @@ class RetrievalQualityGate:
             }
             return self._low_confidence([], metrics)
 
+        # 取前 submit_top_k 条,限制总字符数
         approved = self._limit_reference_chars(qualified[: self.config.submit_top_k])
         metrics = {
             "approved_count": len(approved),
@@ -46,39 +67,59 @@ class RetrievalQualityGate:
         }
 
     def _is_qualified(self, item: Dict[str, Any]) -> bool:
+        """判断单条候选是否满足质量门所有条件。
+
+        合格条件:
+        - 有文本内容
+        - 向量相似度达标 或 RRF 融合分数有效
+        - 重排分数 >= min_rerank_score
+        - source_scope_valid 为 True(项目/工程类型匹配)
+        """
         text = str(item.get("text") or "").strip()
         metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
+        vector_ok = self._to_float(item.get("vector_similarity"), 0.0) >= self.config.min_vector_similarity
+        fusion_ok = self._to_float(item.get("fusion_score"), 0.0) > 0 and bool(item.get("source_hits"))
         return (
             bool(text)
-            and self._to_float(item.get("vector_similarity"), 0.0) >= self.config.min_vector_similarity
+            and (vector_ok or fusion_ok)
             and self._to_float(item.get("rerank_score"), 0.0) >= self.config.min_rerank_score
             and metadata.get("source_scope_valid") is True
         )
 
     def _pack_reference(self, item: Dict[str, Any]) -> Dict[str, Any]:
+        """将合格的候选条目标准化为 LLM prompt 可使用的参考格式。"""
         metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
         return {
             "source": str(item.get("source") or metadata.get("file_name") or "向量知识库"),
             "content": str(item.get("text") or "").strip()[: self.config.max_single_reference_chars],
             "vector_similarity": self._to_float(item.get("vector_similarity"), 0.0),
+            "fusion_score": self._to_float(item.get("fusion_score"), 0.0),
             "rerank_score": self._to_float(item.get("rerank_score"), 0.0),
             "metadata": {
                 key: metadata.get(key)
                 for key in (
+                    "candidate_key",
                     "tenant_id",
                     "project_id",
                     "knowledge_base_id",
                     "file_name",
                     "chapter_level_1",
                     "chapter_level_2",
+                    "chapter_level_3",
                     "parent_id",
                     "parent_count",
+                    "child_hit_count",
+                    "tag_match_terms",
                 )
                 if metadata.get(key) not in (None, "")
             },
         }
 
     def _limit_reference_chars(self, references: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+        """限制参考内容总字符数不超过 max_reference_chars,避免 LLM prompt 过长。
+
+        按顺序截断:前面的参考占满额度后,后面的参考不再包含。
+        """
         total = 0
         limited = []
         for item in references:
@@ -94,6 +135,7 @@ class RetrievalQualityGate:
         return limited
 
     def _low_confidence(self, approved: List[Dict[str, Any]], metrics: Dict[str, Any]) -> Dict[str, Any]:
+        """无合格参考时的降级响应。"""
         return {
             "approved_references": approved,
             "retrieval_status": "low_confidence",
@@ -102,11 +144,13 @@ class RetrievalQualityGate:
         }
 
     def _warning(self, key: str) -> str:
+        """获取指定类型的告警文案。"""
         warnings = self.config.warnings or {}
         return warnings.get(key) or "未找到可信度足够的知识库片段,本次未引用向量库内容。"
 
     @staticmethod
     def _max_score(items: List[Dict[str, Any]], key: str) -> float:
+        """取指定字段的最大值,用于统计指标。"""
         values = []
         for item in items:
             try:
@@ -117,6 +161,7 @@ class RetrievalQualityGate:
 
     @staticmethod
     def _to_float(value: Any, default: float) -> float:
+        """安全浮点数转换,转换失败返回默认值。"""
         try:
             return float(value)
         except (TypeError, ValueError):

Diff do ficheiro suprimidas por serem muito extensas
+ 753 - 81
core/document_chat/component/retrieval_service.py


+ 2 - 0
core/document_chat/component/state_models.py

@@ -18,6 +18,8 @@ class DocumentChatState(TypedDict, total=False):
     user_message: str
     skill_registry: List[Dict[str, Any]]
     retrieval_query: Optional[str]
+    retrieval_keywords: List[str]
+    retrieval_steps: List[Dict[str, Any]]
     retrieval_method: Optional[str]
     retrieval_candidates: List[Dict[str, Any]]
     reranked_references: List[Dict[str, Any]]

+ 2 - 2
core/document_chat/schemas.py

@@ -24,7 +24,7 @@ class DocumentContext(BaseModel):
 
 
 class DocumentChatRequest(BaseModel):
-    user_id: Optional[str] = None
+    user_id: str
     message: str = Field(..., min_length=1, description="User message")
     selected_section: Optional[SelectedSection] = Field(default=None, description="Selected section; null or empty for general questions")
     conversation_id: Optional[str] = None
@@ -52,7 +52,7 @@ class IntentResult(BaseModel):
 
 
 class DocumentChatSkillInput(BaseModel):
-    user_id: Optional[str] = None
+    user_id: str
     user_message: str
     selected_section: Optional[SelectedSection] = None
     intent_result: IntentResult

+ 12 - 2
core/document_chat/skills/document_answer.py

@@ -3,9 +3,9 @@
 
 from typing import Any, Callable, List
 
-from foundation.observability.logger.loggering import write_logger as logger
+from core.document_chat.component.document_chat_logger import document_chat_logger as logger
 
-from core.document_chat.component.llm_utils import compact_json, extract_json_object
+from core.document_chat.component.llm_utils import compact_json, extract_answer_field, extract_json_object
 from core.document_chat.component.prompt_loader import load_prompt_config
 from core.document_chat.schemas import DocumentChatSkillInput, DocumentChatSkillOutput, model_to_dict
 from core.document_chat.skills.base import BaseDocumentChatSkill
@@ -48,6 +48,11 @@ class DocumentAnswerSkill(BaseDocumentChatSkill):
             references = skill_input.document_context.references
             warnings = self._list_of_strings(parsed.get("warnings")) if parsed else []
 
+            if not answer:
+                # Fallback: try to extract "answer" field via regex
+                answer = extract_answer_field(response) or ""
+                if answer:
+                    logger.warning("[DocumentChat] answer JSON parse failed, used regex fallback")
             if not answer:
                 answer = response.strip()
             if not answer:
@@ -112,6 +117,11 @@ class DocumentAnswerSkill(BaseDocumentChatSkill):
         if parsed and isinstance(parsed.get("warnings"), list):
             warnings.extend(self._list_of_strings(parsed["warnings"]))
 
+        if not answer:
+            # Fallback: try to extract "answer" field via regex
+            answer = extract_answer_field(full_text) or ""
+            if answer:
+                logger.warning("[DocumentChat] answer stream JSON parse failed, used regex fallback")
         if not answer:
             answer = full_text.strip()
         if not answer:

+ 5 - 1
core/document_chat/skills/document_modify.py

@@ -3,7 +3,7 @@
 
 from typing import Any, Callable, Dict, List
 
-from foundation.observability.logger.loggering import write_logger as logger
+from core.document_chat.component.document_chat_logger import document_chat_logger as logger
 
 from core.document_chat.component.llm_utils import compact_json, extract_json_object
 from core.document_chat.component.prompt_loader import load_prompt_config
@@ -52,6 +52,8 @@ class DocumentModifySkill(BaseDocumentChatSkill):
             warnings = self._list_of_strings(parsed.get("warnings")) if parsed else []
 
             if not proposed_content:
+                if response.strip():
+                    logger.warning("[DocumentChat] modify JSON parse failed, using raw text as proposed_content")
                 proposed_content = response.strip()
             if not proposed_content:
                 proposed_content = old_content
@@ -121,6 +123,8 @@ class DocumentModifySkill(BaseDocumentChatSkill):
             warnings.extend(self._list_of_strings(parsed["warnings"]))
 
         if not proposed_content:
+            if full_text.strip():
+                logger.warning("[DocumentChat] modify stream JSON parse failed, using raw text as proposed_content")
             proposed_content = full_text.strip()
         if not proposed_content:
             proposed_content = old_content

+ 210 - 35
core/document_chat/workflows/document_chat_workflow.py

@@ -1,15 +1,34 @@
 # -*- coding: utf-8 -*-
-"""LangGraph workflow for document chat."""
+"""基于 LangGraph 的文档 AI 对话工作流。
+
+工作流节点及路由:
+    validate_input → 校验用户输入(user_id、message、selected_section)
+      ├─ general(无选中章节)→ general_answer(通用 LLM 回答)
+      └─ normal(有选中章节)→ load_context → load_skill_registry → recognize_intent
+         → route_intent
+            ├─ clarify(需补充说明)→ clarify_node → complete
+            ├─ unsupported(不支持的意图)→ unsupported_node → complete
+            ├─ answer(章节问答)→ build_retrieval_query → vector_recall
+            │   → rerank_context → quality_gate → run_answer_skill → complete
+            └─ modify(章节修改)→ build_retrieval_query → vector_recall
+                → rerank_context → quality_gate → run_modify_skill → complete
+
+检索阶段(RAG 链路):
+    build_retrieval_query:拼接用户输入 + 章节标题 + 历史对话为检索 query
+    vector_recall:多路召回(parent_vector / child_locator / tag / chapter_similarity)+ RRF 融合
+    rerank_context:调用重排模型对候选打分排序
+    quality_gate:按 min_rerank_score 阈值过滤低质量参考
+"""
 
 import uuid
 from typing import Any, Dict, List, Optional
 
 from langgraph.graph import END, StateGraph
 
-from foundation.observability.logger.loggering import write_logger as logger
+from core.document_chat.component.document_chat_logger import document_chat_logger as logger
+from core.document_chat.component.document_chat_logger import log_document_chat_event, log_document_chat_event_truncated
 
 from core.document_chat.component.conversation_context import ConversationContextBuilder
-from core.document_chat.component.document_chat_logger import log_document_chat_event
 from core.document_chat.component.intent_recognizer import IntentRecognizer
 from core.document_chat.component.rerank_service import DocumentChatRerankService
 from core.document_chat.component.retrieval_quality_gate import RetrievalQualityGate
@@ -29,7 +48,15 @@ from core.document_chat.schemas import (
 
 
 class DocumentChatWorkflow:
-    """Document chat workflow built with LangGraph."""
+    """施工方案文档 AI 对话的 LangGraph 工作流。
+
+    核心职责:
+    - 接收前端请求,校验输入参数
+    - 通过 LLM 意图识别判断用户是想"问答"还是"修改"当前章节
+    - 对章节问答/修改走 RAG 检索链路(召回 → 重排 → 质量门)
+    - 调用对应技能(document-answer 或 document-modify)生成回答/草案
+    - 统一组装响应数据返回
+    """
 
     def __init__(self):
         self.intent_recognizer = IntentRecognizer()
@@ -41,7 +68,10 @@ class DocumentChatWorkflow:
         self.graph = None
 
     def build_graph(self):
+        """构建 LangGraph 状态图,定义节点和边。"""
         workflow = StateGraph(DocumentChatState)
+
+        # ===== 注册所有节点 =====
         workflow.add_node("validate_input", self.validate_input_node)
         workflow.add_node("load_context", self.load_context_node)
         workflow.add_node("load_skill_registry", self.load_skill_registry_node)
@@ -59,7 +89,10 @@ class DocumentChatWorkflow:
         workflow.add_node("error_handler", self.error_handler_node)
         workflow.add_node("complete", self.complete_node)
 
+        # ===== 定义执行流程 =====
         workflow.set_entry_point("validate_input")
+
+        # 入口分流:有选中章节走 normal,无选中章节走 general 通用回答
         workflow.add_conditional_edges(
             "validate_input",
             self.route_after_validate,
@@ -72,6 +105,8 @@ class DocumentChatWorkflow:
         workflow.add_edge("load_context", "load_skill_registry")
         workflow.add_edge("load_skill_registry", "recognize_intent")
         workflow.add_edge("recognize_intent", "route_intent")
+
+        # 意图分流:clarify / unsupported / answer(问答) / modify(修改)
         workflow.add_conditional_edges(
             "route_intent",
             self.route_intent,
@@ -83,9 +118,13 @@ class DocumentChatWorkflow:
                 "error": "error_handler",
             },
         )
+
+        # RAG 检索链路:检索 → 重排 → 质量门
         workflow.add_edge("build_retrieval_query", "vector_recall")
         workflow.add_edge("vector_recall", "rerank_context")
         workflow.add_edge("rerank_context", "quality_gate")
+
+        # 检索后分流:按意图类型调用对应技能
         workflow.add_conditional_edges(
             "quality_gate",
             self.route_after_retrieval,
@@ -95,6 +134,8 @@ class DocumentChatWorkflow:
                 "error": "error_handler",
             },
         )
+
+        # 终端节点统一汇入 complete
         workflow.add_edge("clarify", "complete")
         workflow.add_edge("unsupported", "complete")
         workflow.add_edge("run_answer_skill", "complete")
@@ -105,11 +146,13 @@ class DocumentChatWorkflow:
         return workflow.compile()
 
     def get_graph(self):
+        """获取编译后的图,懒加载只构建一次。"""
         if self.graph is None:
             self.graph = self.build_graph()
         return self.graph
 
     def build_initial_state(self, request: DocumentChatRequest, callback_task_id: Optional[str] = None) -> DocumentChatState:
+        """根据 HTTP 请求构建初始工作流状态。"""
         task_id = callback_task_id or f"doc_chat_{uuid.uuid4().hex[:12]}"
         return {
             "callback_task_id": task_id,
@@ -123,6 +166,8 @@ class DocumentChatWorkflow:
             "user_message": request.message,
             "skill_registry": [],
             "retrieval_query": None,
+            "retrieval_keywords": [],
+            "retrieval_steps": [],
             "retrieval_method": None,
             "retrieval_candidates": [],
             "reranked_references": [],
@@ -140,15 +185,23 @@ class DocumentChatWorkflow:
         }
 
     async def run(self, request: DocumentChatRequest, callback_task_id: Optional[str] = None) -> DocumentChatState:
+        """执行工作流,返回最终状态。用于非 SSE 同步调用。"""
         initial_state = self.build_initial_state(request, callback_task_id)
         return await self.get_graph().ainvoke(initial_state)
 
+    # ============================================================
+    # 节点:validate_input — 输入校验
+    # ============================================================
     async def validate_input_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """校验必填字段,确保 selected_section 包含 content 键。"""
         try:
             selected_section = state.get("selected_section") or {}
             user_message = (state.get("user_message") or "").strip()
+            if not state.get("user_id"):
+                raise ValueError("user_id is required")
             if not user_message:
                 raise ValueError("message is required")
+            # 保证后续检索和检索 query 构建时 content 键一定存在
             if "content" not in selected_section:
                 selected_section["content"] = ""
             return {
@@ -160,6 +213,7 @@ class DocumentChatWorkflow:
             return self._error_update("validate_input", exc)
 
     def route_after_validate(self, state: DocumentChatState) -> str:
+        """入口路由决策:有章节信息 → normal,无 → general(通用回答)。"""
         if state.get("error_message"):
             return "error"
         selected_section = state.get("selected_section") or {}
@@ -168,9 +222,15 @@ class DocumentChatWorkflow:
             or selected_section.get("chapter_level_1")
             or selected_section.get("chapter_level_2")
         )
-        return "normal" if has_section else "general"
+        route = "normal" if has_section else "general"
+        logger.info(f"[DocumentChat] route_after_validate: route={route}, code={selected_section.get('code')}, level1={selected_section.get('chapter_level_1')}, level2={selected_section.get('chapter_level_2')}")
+        return route
 
+    # ============================================================
+    # 节点:load_context — 加载上下文(项目信息、章节、历史对话)
+    # ============================================================
     async def load_context_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """构建完整的对话上下文,包含项目信息、选中章节、前后文片段、历史对话。"""
         if state.get("error_message"):
             return {}
         context = self.context_builder.build(state)
@@ -182,7 +242,11 @@ class DocumentChatWorkflow:
             "current_stage": "load_context",
         }
 
+    # ============================================================
+    # 节点:load_skill_registry — 加载技能注册表
+    # ============================================================
     async def load_skill_registry_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """加载可用技能列表,供意图识别器作为参考。"""
         if state.get("error_message"):
             return {}
         return {
@@ -190,11 +254,16 @@ class DocumentChatWorkflow:
             "current_stage": "load_skill_registry",
         }
 
+    # ============================================================
+    # 节点:recognize_intent — LLM 意图识别
+    # ============================================================
     async def recognize_intent_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """调用 LLM 分析用户输入,识别是问答(document-answer)还是修改(document-modify)意图。"""
         if state.get("error_message"):
             return {}
         try:
             intent_result = await self.intent_recognizer.recognize(state)
+            logger.info(f"[DocumentChat] intent recognized: intent={intent_result.intent}, skill={intent_result.skill_name}, confidence={intent_result.confidence}, operation={intent_result.operation}")
             return {
                 "intent_result": model_to_dict(intent_result),
                 "current_stage": "recognize_intent",
@@ -202,10 +271,22 @@ class DocumentChatWorkflow:
         except Exception as exc:
             return self._error_update("recognize_intent", exc)
 
+    # ============================================================
+    # 节点:route_intent — 空节点,仅标记阶段
+    # ============================================================
     async def route_intent_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """空节点,仅用于 SSE 流中标记已进入路由阶段。"""
         return {"current_stage": "route_intent"}
 
     def route_intent(self, state: DocumentChatState) -> str:
+        """根据意图识别结果路由到对应分支。
+
+        路由规则:
+        - 需要补充说明 / 置信度 < 0.65 → clarify
+        - skill=document-answer → answer
+        - skill=document-modify → modify
+        - 不支持的意图 → unsupported
+        """
         if state.get("error_message"):
             return "error"
         intent_data = state.get("intent_result") or {}
@@ -224,6 +305,7 @@ class DocumentChatWorkflow:
         return "error"
 
     def route_after_retrieval(self, state: DocumentChatState) -> str:
+        """检索完成后按意图类型路由到对应技能节点。"""
         if state.get("error_message"):
             return "error"
         intent_data = state.get("intent_result") or {}
@@ -234,35 +316,46 @@ class DocumentChatWorkflow:
             return "modify"
         return "error"
 
+    # ============================================================
+    # 节点:build_retrieval_query — 构建检索查询
+    # ============================================================
     async def build_retrieval_query_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """将用户输入、章节标题、历史对话等拼接为检索 query 和关键词。"""
         if state.get("error_message"):
             return {}
         query = self.retrieval_service.build_query(state)
-        log_document_chat_event(
+        keywords = self.retrieval_service.build_query_keywords(state, query)
+        log_document_chat_event_truncated(
             "rag_query_built",
             state.get("callback_task_id", ""),
             {
                 "retrieval_query": query,
-                "intent_result": state.get("intent_result"),
-                "selected_section": state.get("selected_section"),
-                "project_info": state.get("project_info"),
-                "document_context": state.get("document_context"),
+                "retrieval_keywords": keywords,
+                "intent_result": {"skill_name": (state.get("intent_result") or {}).get("skill_name")},
             },
         )
         return {
             "retrieval_query": query,
+            "retrieval_keywords": keywords,
             "current_stage": "build_retrieval_query",
         }
 
+    # ============================================================
+    # 节点:vector_recall — 多路向量召回
+    # ============================================================
     async def vector_recall_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """执行多路向量检索,合并候选结果。支持 parent_vector、child_locator、
+        标签关键词、章节相似度四条召回路径。
+        """
         if state.get("error_message"):
             return {}
         result = self.retrieval_service.recall(state)
-        log_document_chat_event(
+        log_document_chat_event_truncated(
             "rag_recall_completed",
             state.get("callback_task_id", ""),
             {
                 "retrieval_query": state.get("retrieval_query"),
+                "retrieval_keywords": state.get("retrieval_keywords") or [],
                 "retrieval_method": result.get("retrieval_method"),
                 "retrieval_status": result.get("retrieval_status"),
                 "retrieval_metrics": result.get("retrieval_metrics") or {},
@@ -272,6 +365,7 @@ class DocumentChatWorkflow:
         )
         return {
             "retrieval_candidates": result.get("retrieval_candidates") or [],
+            "retrieval_steps": result.get("retrieval_steps") or [],
             "retrieval_status": result.get("retrieval_status"),
             "retrieval_method": result.get("retrieval_method"),
             "retrieval_metrics": self._merge_metrics(state, result.get("retrieval_metrics") or {}),
@@ -279,7 +373,11 @@ class DocumentChatWorkflow:
             "current_stage": "vector_recall",
         }
 
+    # ============================================================
+    # 节点:rerank_context — 重排打分
+    # ============================================================
     async def rerank_context_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """调用重排模型对候选文档打分排序。如果未召回候选则跳过。"""
         if state.get("error_message"):
             return {}
         if state.get("retrieval_status") != "recalled":
@@ -288,9 +386,11 @@ class DocumentChatWorkflow:
                 state.get("callback_task_id", ""),
                 {
                     "retrieval_query": state.get("retrieval_query"),
+                    "retrieval_keywords": state.get("retrieval_keywords") or [],
                     "retrieval_method": state.get("retrieval_method"),
                     "retrieval_status": state.get("retrieval_status"),
                     "retrieval_metrics": state.get("retrieval_metrics") or {},
+                    "retrieval_steps": state.get("retrieval_steps") or [],
                     "warnings": state.get("warnings") or [],
                 },
             )
@@ -304,15 +404,13 @@ class DocumentChatWorkflow:
             query=state.get("retrieval_query") or "",
             candidates=state.get("retrieval_candidates") or [],
         )
-        log_document_chat_event(
+        log_document_chat_event_truncated(
             "rag_rerank_completed",
             state.get("callback_task_id", ""),
             {
                 "retrieval_query": state.get("retrieval_query"),
-                "retrieval_method": state.get("retrieval_method"),
                 "retrieval_status": result.get("retrieval_status"),
                 "retrieval_metrics": result.get("retrieval_metrics") or {},
-                "retrieval_candidates": state.get("retrieval_candidates") or [],
                 "reranked_references": result.get("reranked_references") or [],
                 "warnings": result.get("warnings") or [],
             },
@@ -325,7 +423,17 @@ class DocumentChatWorkflow:
             "current_stage": "rerank_context",
         }
 
+    # ============================================================
+    # 节点:quality_gate — 质量门过滤
+    # ============================================================
     async def quality_gate_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """按 min_rerank_score 阈值过滤低质量参考,保留 scope 匹配的高可信引用。
+
+        合格条件:
+        - rerank_score >= min_rerank_score(默认 0.65)
+        - metadata.source_scope_valid 为 True(项目/工程类型匹配)
+        - 有实际文本内容
+        """
         if state.get("error_message"):
             return {}
         if state.get("retrieval_status") != "reranked":
@@ -334,9 +442,11 @@ class DocumentChatWorkflow:
                 state.get("callback_task_id", ""),
                 {
                     "retrieval_query": state.get("retrieval_query"),
+                    "retrieval_keywords": state.get("retrieval_keywords") or [],
                     "retrieval_method": state.get("retrieval_method"),
                     "retrieval_status": state.get("retrieval_status"),
                     "retrieval_metrics": self._merge_metrics(state, {"approved_count": 0}),
+                    "retrieval_steps": state.get("retrieval_steps") or [],
                     "reranked_references": state.get("reranked_references") or [],
                     "warnings": state.get("warnings") or [],
                 },
@@ -348,15 +458,13 @@ class DocumentChatWorkflow:
             }
 
         result = self.quality_gate.apply(state.get("reranked_references") or [])
-        log_document_chat_event(
+        log_document_chat_event_truncated(
             "rag_quality_gate_completed",
             state.get("callback_task_id", ""),
             {
                 "retrieval_query": state.get("retrieval_query"),
-                "retrieval_method": state.get("retrieval_method"),
                 "retrieval_status": result.get("retrieval_status"),
                 "retrieval_metrics": result.get("retrieval_metrics") or {},
-                "reranked_references": state.get("reranked_references") or [],
                 "approved_references": result.get("approved_references") or [],
                 "warnings": result.get("warnings") or [],
             },
@@ -369,7 +477,11 @@ class DocumentChatWorkflow:
             "current_stage": "quality_gate",
         }
 
+    # ============================================================
+    # 节点:clarify — 需要用户补充说明
+    # ============================================================
     async def clarify_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """意图置信度不足或模型要求澄清时,返回引导性问题。"""
         intent = IntentResult(**(state.get("intent_result") or {"intent": "clarify"}))
         question = intent.clarification_question or "请补充说明希望 AI 对当前章节做什么。"
         skill_result = DocumentChatSkillOutput(
@@ -384,7 +496,11 @@ class DocumentChatWorkflow:
             "current_stage": "clarify",
         }
 
+    # ============================================================
+    # 节点:unsupported — 不支持的意图
+    # ============================================================
     async def unsupported_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """用户请求超出当前模块能力范围(非问答/非修改),返回提示说明。"""
         intent = IntentResult(**(state.get("intent_result") or {"intent": "unsupported"}))
         message = intent.reason or "当前 AI 对话模块只支持选中章节的问答和修改。"
         skill_result = DocumentChatSkillOutput(
@@ -399,8 +515,22 @@ class DocumentChatWorkflow:
             "current_stage": "unsupported",
         }
 
+    # ============================================================
+    # 节点:general_answer — 无选中章节时的通用回答
+    # ============================================================
+    @staticmethod
+    def _capture_stream_writer():
+        """获取 LangGraph 的流式写入器。在流式上下文中可用,否则返回 None。"""
+        try:
+            from langgraph.config import get_stream_writer
+            writer = get_stream_writer()
+            return writer
+        except Exception as exc:
+            logger.debug(f"[DocumentChat] StreamWriter not available: {exc}")
+            return None
+
     async def general_answer_node(self, state: DocumentChatState) -> Dict[str, Any]:
-        """Respond directly via LLM when no section is selected."""
+        """用户未选中任何章节时,以通用助手身份通过 LLM 直接回答问题。"""
         user_message = state.get("user_message", "")
         conversation_history = state.get("conversation_history") or []
         project_info = state.get("project_info") or {}
@@ -415,7 +545,7 @@ class DocumentChatWorkflow:
         user_payload = {
             "user_message": user_message,
             "project_info": project_info,
-            "conversation_history": conversation_history[-6:],
+            "conversation_history": conversation_history[-6:],  # 仅取最近 6 轮历史
         }
 
         try:
@@ -423,15 +553,10 @@ class DocumentChatWorkflow:
             from core.document_chat.component.llm_utils import compact_json
 
             full_text_parts: List[str] = []
+            writer = self._capture_stream_writer()
+            logger.info(f"[DocumentChat] general_answer_node: stream_writer={'captured' if writer else 'None'}")
 
-            def _on_chunk(chunk: str):
-                from langgraph.config import get_stream_writer
-                try:
-                    writer = get_stream_writer()
-                    writer({"stream_chunk": chunk})
-                except Exception:
-                    pass
-
+            # 优先尝试流式生成,失败则降级为非流式
             try:
                 async for chunk in generate_model_client.get_model_generate_invoke_stream(
                     trace_id=state.get("callback_task_id", "general_answer"),
@@ -440,10 +565,11 @@ class DocumentChatWorkflow:
                     timeout=45,
                     function_name="general_answer",
                 ):
-                    _on_chunk(chunk)
+                    if writer:
+                        writer({"stream_chunk": chunk})
                     full_text_parts.append(chunk)
             except Exception as exc:
-                logger.warning(f"[DocumentChat] general_answer stream failed: {exc}, falling back to non-stream")
+                logger.warning(f"[DocumentChat] general_answer stream failed, falling back to non-stream")
                 if not full_text_parts:
                     response = await generate_model_client.get_model_generate_invoke(
                         trace_id=state.get("callback_task_id", "general_answer"),
@@ -458,11 +584,21 @@ class DocumentChatWorkflow:
             if not answer:
                 answer = "您好,我是施工方案编辑 AI 助手。选中一个文档章节后,我可以帮您润色、扩写、改写或回答章节相关问题。"
 
+            logger.info(f"[DocumentChat] general_answer_node completed: chunks={len(full_text_parts)}, answer_len={len(answer)}")
+
             skill_result = DocumentChatSkillOutput(
                 skill_name="general-answer",
                 response_type="general_answer",
                 answer=answer,
             )
+            log_document_chat_event(
+                "final_content_generated",
+                state.get("callback_task_id", ""),
+                {
+                    "stage": "general_answer",
+                    "skill_result": model_to_dict(skill_result),
+                },
+            )
             return {
                 "skill_result": model_to_dict(skill_result),
                 "response_type": "general_answer",
@@ -472,10 +608,15 @@ class DocumentChatWorkflow:
             logger.error(f"[DocumentChat] general_answer_node failed: {exc}", exc_info=True)
             return self._error_update("general_answer", exc)
 
+    # ============================================================
+    # 节点:run_answer_skill / run_modify_skill — 执行技能
+    # ============================================================
     async def run_answer_skill_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """执行 document-answer 技能:基于检索内容回答章节相关问题。"""
         return await self._run_skill(state, "document-answer", "run_answer_skill")
 
     async def run_modify_skill_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """执行 document-modify 技能:基于检索内容生成章节修改草案。"""
         return await self._run_skill(state, "document-modify", "run_modify_skill")
 
     async def _run_skill(
@@ -484,21 +625,39 @@ class DocumentChatWorkflow:
         skill_name: str,
         stage: str,
     ) -> Dict[str, Any]:
+        """通用技能执行方法。构建技能输入,调用流式执行,逐块输出到 SSE。"""
         try:
             skill_input = self._build_skill_input(state)
+            writer = self._capture_stream_writer()
+            logger.info(f"[DocumentChat] _run_skill: skill={skill_name}, stream_writer={'captured' if writer else 'None'}")
+
+            chunk_count = 0
 
             def _on_chunk(chunk: str):
-                from langgraph.config import get_stream_writer
-                try:
-                    writer = get_stream_writer()
+                """逐块回调:将技能生成的文本片段写入 SSE 流。"""
+                nonlocal chunk_count
+                if writer:
                     writer({"stream_chunk": chunk})
-                except Exception:
-                    # 非流式路径(如 workflow.run())或不支持 StreamWriter 时跳过
-                    pass
+                    chunk_count += 1
 
             skill_result = await self.skill_dispatcher.run_skill_stream(
                 skill_name, skill_input, on_chunk=_on_chunk
             )
+            logger.info(f"[DocumentChat] _run_skill completed: skill={skill_name}, chunks_sent={chunk_count}, response_type={skill_result.response_type}")
+            log_document_chat_event(
+                "final_content_generated",
+                state.get("callback_task_id", ""),
+                {
+                    "stage": stage,
+                    "skill_name": skill_name,
+                    "retrieval_query": state.get("retrieval_query"),
+                    "retrieval_keywords": state.get("retrieval_keywords") or [],
+                    "retrieval_status": state.get("retrieval_status"),
+                    "retrieval_metrics": state.get("retrieval_metrics") or {},
+                    "approved_references": state.get("approved_references") or [],
+                    "skill_result": model_to_dict(skill_result),
+                },
+            )
             return {
                 "skill_result": model_to_dict(skill_result),
                 "response_type": skill_result.response_type,
@@ -507,7 +666,11 @@ class DocumentChatWorkflow:
         except Exception as exc:
             return self._error_update(stage, exc)
 
+    # ============================================================
+    # 节点:error_handler — 错误处理
+    # ============================================================
     async def error_handler_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """统一错误处理节点,标记工作流失败。"""
         error_message = state.get("error_message") or "document chat workflow failed"
         logger.error(f"[DocumentChat] workflow error: {error_message}")
         return {
@@ -516,7 +679,11 @@ class DocumentChatWorkflow:
             "current_stage": "error_handler",
         }
 
+    # ============================================================
+    # 节点:complete — 工作流结束
+    # ============================================================
     async def complete_node(self, state: DocumentChatState) -> Dict[str, Any]:
+        """标记工作流完成。如果之前已标记为失败则保留失败状态。"""
         if state.get("overall_task_status") == "failed":
             return {"current_stage": "complete"}
         return {
@@ -524,7 +691,11 @@ class DocumentChatWorkflow:
             "current_stage": "complete",
         }
 
+    # ============================================================
+    # 工具方法
+    # ============================================================
     def to_response_data(self, state: DocumentChatState) -> DocumentChatData:
+        """将工作流最终状态转换为 HTTP 响应数据结构。"""
         skill_result = state.get("skill_result") or {}
         intent_result = state.get("intent_result")
         selected_section = state.get("selected_section") or {}
@@ -556,6 +727,7 @@ class DocumentChatWorkflow:
         )
 
     def _build_skill_input(self, state: DocumentChatState) -> DocumentChatSkillInput:
+        """从工作流状态构建技能执行所需输入。"""
         document_context = dict(state.get("document_context") or {})
         document_context["references"] = state.get("approved_references") or []
         return DocumentChatSkillInput(
@@ -572,6 +744,7 @@ class DocumentChatWorkflow:
 
     @staticmethod
     def _append_warnings(state: DocumentChatState, new_warnings: list) -> list:
+        """合并告警列表,去重且不覆盖已有告警。"""
         warnings = list(state.get("warnings") or [])
         for warning in new_warnings:
             warning = str(warning).strip()
@@ -581,12 +754,14 @@ class DocumentChatWorkflow:
 
     @staticmethod
     def _merge_metrics(state: DocumentChatState, new_metrics: Dict[str, Any]) -> Dict[str, Any]:
+        """合并检索指标,新值覆盖旧值。各节点指标逐层累加到最终响应中。"""
         metrics = dict(state.get("retrieval_metrics") or {})
         metrics.update(new_metrics or {})
         return metrics
 
     @staticmethod
     def _error_update(stage: str, exc: Exception) -> Dict[str, Any]:
+        """构建统一的错误状态更新。"""
         return {
             "current_stage": stage,
             "overall_task_status": "failed",

Diff do ficheiro suprimidas por serem muito extensas
+ 1 - 0
docs/t_kngs_construction_plan_child.csv


Diff do ficheiro suprimidas por serem muito extensas
+ 1 - 0
docs/t_kngs_construction_plan_parent.csv


+ 574 - 0
docs/向量库检索召回优化方案.md

@@ -0,0 +1,574 @@
+# 向量库检索召回优化方案
+
+> 面向 `t_kngs_construction_plan_parent`(父表)和 `t_kngs_construction_plan_child`(子表)的文档编辑 AI 对话召回优化方案。
+>
+> 推荐方向:**父表全文向量召回为主,子表和 tag 做精准定位,章节召回做补充,最后通过 RRF 融合、rerank 和质量门控输出可信引用。**
+
+---
+
+## 1. 数据观察
+
+### 1.1 两表共同字段
+
+| 字段 | 说明 | 检索价值 |
+|------|------|----------|
+| `pk` | 主键 | 排序、兜底唯一标识 |
+| `text` | 文本内容 | 核心检索字段 |
+| `dense` | 稠密向量 | 语义相似度召回 |
+| `sparse` | BM25 稀疏向量 | 关键词召回 |
+| `document_id` | 文档 UUID | 文档级关联 |
+| `parent_id` | 父段 ID | 父子表关联字段,但不能单独作为全局唯一候选键 |
+| `index` | 序号 | 片段排序、候选唯一键补充 |
+| `tag_list` | 逗号分隔关键词 | 精准关键词召回 |
+| `file_name` | 原始文件名 | 来源展示 |
+| `chapter_title` | 章节路径 | 章节过滤、候选唯一键补充 |
+| `chapter_level_1` | 一级章节类型 | 结构化过滤 |
+| `chapter_level_2` | 二级章节类型 | 结构化过滤 |
+| `chapter_level_3` | 三级章节类型 | 可选过滤或加权 |
+| `metadata` | JSON 元数据 | `chunk_id`、页码、源文件、章节信息 |
+
+### 1.2 父表与子表差异
+
+| 维度 | 子表 `child` | 父表 `parent` |
+|------|--------------|---------------|
+| 内容形态 | 短标题、表名、标签 | 完整段落、完整验收内容、表格上下文 |
+| 样例长度 | 4-24 字为主 | 121-2985 字不等 |
+| 适合任务 | 精准定位、tag 命中、标准号命中 | 主语义召回、rerank、最终引用 |
+| 主要风险 | 文本太短,语义不足 | 长文本包含页眉、项目名、表格占位等噪声 |
+
+### 1.3 当前样例数据暴露的问题
+
+从当前 CSV 样例看:
+
+| 指标 | 观察 |
+|------|------|
+| 父表行数 | 21 |
+| 子表行数 | 21 |
+| 父表唯一 `parent_id` | 19 |
+| 子表唯一 `parent_id` | 2 |
+| 父表重复 `parent_id` | `4.41966E+17` 出现 3 次 |
+
+这说明:
+
+1. **不能假设父表一个 `parent_id` 只对应一条记录**。同一个 `parent_id` 可能对应不同章节片段。
+2. **不能过度依赖子表召回覆盖率**。样例中子表只覆盖 2 个 `parent_id`,如果线上也存在类似情况,单靠子表会漏掉大量父表内容。
+3. **最终给 rerank 和 LLM 的候选必须是父表全文**。子表短文本只适合帮助定位,不适合作为最终引用内容。
+
+---
+
+## 2. 当前方案主要问题
+
+当前 `DocumentChatRetrievalService` 的召回逻辑大致是:
+
+```text
+用户问题
+  ↓
+build_query() 拼接项目、章节、用户需求、章节正文
+  ↓
+recall()
+  ├─ 有 chapter_level_1 + chapter_level_2
+  │    -> _recall_by_chapter()
+  │    -> search_similar_fragments()
+  │    -> 子表召回 + parent_id 频次排序 + 回查父表
+  │
+  └─ 无章节字段
+       -> _recall_by_vector()
+       -> 仅查子表
+       -> 返回子表短文本
+```
+
+存在以下问题:
+
+| 问题 | 说明 | 影响 |
+|------|------|------|
+| 子表短文本作为最终候选 | `_recall_by_vector()` 直接返回 child `text` | rerank 和 LLM 拿不到完整依据 |
+| 章节路径和向量路径互斥 | 有章节字段时只走 `search_similar_fragments()` | 父表全文向量召回缺失 |
+| 查询文本太长 | 项目名、位置、章节正文整段进入 query | 稀释核心检索词 |
+| `tag_list` 未有效利用 | 标签本来是高价值召回信号 | 标准号、设备名、验收项命中率低 |
+| `parent_id` 去重风险 | 同一 `parent_id` 可能多条父表记录 | 误合并不同片段 |
+| similarity 直接比较风险 | 子表、父表、tag 召回分数来源不同 | 排序不稳定 |
+
+---
+
+## 3. 推荐召回架构
+
+### 3.1 总体流程
+
+```text
+用户输入 + 当前章节内容 + 历史对话
+  ↓
+Query Signal Builder
+  ├─ semantic_query:短向量检索 query
+  ├─ rerank_query:重排 query
+  ├─ tag_terms:标准号、设备名、验收项
+  └─ scope_filter:章节、知识库、租户、工艺类型等结构化过滤
+  ↓
+并行召回
+  ├─ A. 父表全文 hybrid_search(主召回)
+  ├─ B. 子表标题/tag hybrid_search -> 回查父表
+  ├─ C. tag_list 精准召回 -> 回查/返回父表
+  └─ D. 现有 chapter_similarity -> 补充召回
+  ↓
+候选规范化
+  ↓
+RRF 融合排序
+  ↓
+rerank
+  ↓
+quality gate
+  ↓
+approved_references
+```
+
+### 3.2 核心原则
+
+1. **父表是主召回集合**  
+   父表 `text` 包含完整语义和引用上下文,应作为最重要的候选来源。
+
+2. **子表是定位器,不是最终引用**  
+   子表命中后必须通过 `document_id / parent_id / chunk_id` 等字段回查父表全文。
+
+3. **tag 是强信号,但不是唯一排序依据**  
+   标准号、设备名、验收项完全命中时加权;不要只靠 `like` 结果直接决定最终排序。
+
+4. **不同召回路径用排名融合,不直接比 similarity**  
+   父表长文本、子表短文本和 tag 召回的原始分数不可直接横向比较,推荐使用 RRF。
+
+5. **候选唯一键不能只用 `parent_id`**  
+   优先使用 `document_id + parent_id + metadata.chunk_id`。缺少 `chunk_id` 时再退化。
+
+---
+
+## 4. Query Signal Builder
+
+### 4.1 是否需要从三类来源提取关键词
+
+需要,但不要把三类来源原文直接拼成长 query。
+
+| 来源 | 是否使用 | 用法 |
+|------|----------|------|
+| 用户输入 | 必须 | 决定主检索意图,权重最高 |
+| 当前章节内容 | 需要 | 只抽取设备、工序、标准号、验收主题,不整段入 query |
+| 历史对话 | 需要但谨慎 | 只抽最近几轮明确确认的实体词 |
+| 项目信息 | 不进入 query | 只作为结构化过滤或加权,如 `knowledge_base_id`、`engineering_type` |
+
+### 4.2 输出结构
+
+```python
+@dataclass
+class RetrievalSignals:
+    semantic_query: str
+    rerank_query: str
+    tag_terms: list[str]
+    scope: dict[str, str]
+```
+
+示例:
+
+```json
+{
+  "semantic_query": "箱梁 验收标准 TB10212-2012 梁板安装 机械设备验收",
+  "rerank_query": "用户想查询箱梁验收需要满足哪些标准,当前章节是验收内容。",
+  "tag_terms": ["箱梁", "TB10212-2012", "梁板安装", "机械设备验收"],
+  "scope": {
+    "chapter_level_1": "acceptance",
+    "chapter_level_2": "Content",
+    "chapter_level_3": "AcceptanceOfMechanicalEquipment"
+  }
+}
+```
+
+### 4.3 基础规则提取
+
+第一阶段不一定需要 LLM,可以先用规则提取:
+
+```python
+STANDARD_PATTERN = r"[A-Z]{1,4}\\s*\\d{3,6}(?:[-—]\\d{4})?"
+
+DOMAIN_SUFFIXES = (
+    "验收", "检查", "试验", "检测", "安装", "拆除", "吊装",
+    "架桥机", "龙门吊", "吊车", "箱梁", "T梁", "钢丝绳",
+    "支座", "地基", "安全装置", "操作证", "出厂合格证",
+)
+```
+
+抽取来源优先级:
+
+```text
+用户输入 > 归一化需求 > 当前章节标题 > 当前章节正文前 500 字 > 最近 3 轮历史对话
+```
+
+生成 `semantic_query` 时控制长度:
+
+```text
+建议 20-80 字,最多不超过 120 字。
+```
+
+### 4.4 LLM 提取增强
+
+当规则提取效果不稳定时,再启用轻量 LLM:
+
+```text
+从用户问题、当前章节片段、最近历史对话中提取用于施工方案知识库检索的关键词。
+只输出 JSON:
+{
+  "semantic_query": "...",
+  "tag_terms": ["..."],
+  "intent": "..."
+}
+不要包含项目名、人名、地名、时间,除非它们是规范或设备名的一部分。
+```
+
+---
+
+## 5. 召回路径设计
+
+### 5.1 Path A:父表全文 hybrid_search(主路径)
+
+目标:直接召回完整段落,作为 rerank 和最终引用的主候选。
+
+```python
+def recall_parent_vector(signals: RetrievalSignals) -> list[Candidate]:
+    expr = build_scope_expr(signals.scope)
+    rows = MilvusVectorManager().hybrid_search(
+        param={
+            "collection_name": "t_kngs_construction_plan_parent",
+            "expr": expr,
+        },
+        query_text=signals.semantic_query,
+        top_k=30,
+        ranker_type="weighted",
+        dense_weight=0.7,
+        sparse_weight=0.3,
+    )
+    return normalize_parent_rows(rows, source="parent_vector")
+```
+
+建议:
+
+- `top_k`: 30
+- 必须输出字段:`text`、`document_id`、`parent_id`、`index`、`tag_list`、`chapter_title`、`chapter_level_1/2/3`、`metadata`、`file_name`
+- 默认过滤:`is_deleted == false`
+
+### 5.2 Path B:子表 hybrid_search -> 回查父表
+
+目标:利用短标题、验收项、标准号快速定位父表。
+
+```python
+def recall_child_locator(signals: RetrievalSignals) -> list[Candidate]:
+    expr = build_scope_expr(signals.scope)
+    child_rows = MilvusVectorManager().hybrid_search(
+        param={
+            "collection_name": "t_kngs_construction_plan_child",
+            "expr": expr,
+        },
+        query_text=signals.semantic_query,
+        top_k=40,
+        ranker_type="weighted",
+        dense_weight=0.6,
+        sparse_weight=0.4,
+    )
+    parent_keys = extract_parent_lookup_keys(child_rows)
+    parent_rows = fetch_parent_rows(parent_keys)
+    return normalize_parent_rows(parent_rows, source="child_locator", child_hits=child_rows)
+```
+
+注意:
+
+- 子表召回结果不要直接进入 rerank。
+- 子表命中同一父表时,可以记录 `child_hit_count` 和 `matched_child_texts`,作为融合加权信号。
+- 如果只能按 `parent_id` 回查父表,要保留多条父表记录,不要简单拼成一条。
+
+### 5.3 Path C:tag_list 精准召回
+
+目标:标准号、设备名、验收项等明确关键词命中时,提供强召回信号。
+
+优先策略:
+
+1. 同时查父表和子表的 `tag_list`。
+2. 子表 tag 命中后回查父表。
+3. 父表 tag 命中直接进入候选。
+4. tag 命中作为加分信号参与融合,不直接绕过 rerank。
+
+```python
+def recall_by_tag(signals: RetrievalSignals) -> list[Candidate]:
+    if not signals.tag_terms:
+        return []
+
+    expr = combine_expr(
+        build_scope_expr(signals.scope),
+        build_tag_expr(signals.tag_terms),
+    )
+
+    parent_rows = condition_or_hybrid_query_parent(expr)
+    child_rows = condition_or_hybrid_query_child(expr)
+    child_parent_rows = fetch_parent_rows(extract_parent_lookup_keys(child_rows))
+
+    return normalize_parent_rows(parent_rows + child_parent_rows, source="tag")
+```
+
+如果 Milvus `like` 表达式不稳定,可以使用服务端二次过滤:
+
+```text
+先按 scope 查询或召回一批候选,再在 Python 中对 tag_list split(",") 后做精确/包含匹配。
+```
+
+tag 匹配分级:
+
+| 匹配类型 | 加权建议 |
+|----------|----------|
+| 标准号完全匹配,如 `TB10212-2012` | 最高 |
+| 完整 tag 匹配,如 `机械设备验收` | 高 |
+| 设备名匹配,如 `架桥机`、`龙门吊` | 中 |
+| 单字或泛词,如 `验收`、`检查` | 低,通常不单独触发 tag 召回 |
+
+### 5.4 Path D:章节相似度召回(补充路径)
+
+保留当前 `search_similar_fragments()`,但定位为补充召回。
+
+建议调整:
+
+- 不再作为有章节字段时的唯一召回路径。
+- 返回结果需要补齐 `document_id`、`parent_id`、`chapter_title`、`metadata`。
+- 父表查询时不要把同一 `parent_id` 的多条记录无条件拼接成一条,除非确认它们属于同一 `chunk_id` 或连续片段。
+
+---
+
+## 6. 候选规范化与唯一键
+
+### 6.1 Candidate 结构
+
+```python
+@dataclass
+class Candidate:
+    candidate_key: str
+    text: str
+    source: str
+    source_hits: dict[str, Any]
+    vector_similarity: float
+    metadata: dict[str, Any]
+```
+
+### 6.2 候选唯一键
+
+优先级:
+
+```python
+def build_candidate_key(row: dict) -> str:
+    metadata = parse_metadata(row.get("metadata"))
+    chunk_id = metadata.get("chunk_id")
+    if row.get("document_id") and row.get("parent_id") and chunk_id:
+        return f"{row['document_id']}::{row['parent_id']}::{chunk_id}"
+
+    if row.get("document_id") and row.get("parent_id") and row.get("chapter_title") and row.get("index") is not None:
+        return f"{row['document_id']}::{row['parent_id']}::{row['chapter_title']}::{row['index']}"
+
+    return str(row.get("pk") or "")
+```
+
+不要只用 `parent_id` 去重,原因是父表中同一个 `parent_id` 可能对应多条不同内容。
+
+---
+
+## 7. RRF 融合排序
+
+### 7.1 为什么不用 `max(similarity)`
+
+不同路径的分数不可直接比较:
+
+- 父表是长文本向量召回。
+- 子表是短标签向量召回。
+- tag 是结构化命中。
+- chapter_similarity 还带有 `parent_id` 频次排序。
+
+所以推荐使用 RRF(Reciprocal Rank Fusion)按排名融合。
+
+### 7.2 RRF 公式
+
+```python
+SOURCE_WEIGHTS = {
+    "parent_vector": 1.0,
+    "child_locator": 0.8,
+    "tag": 1.2,
+    "chapter_similarity": 0.5,
+}
+
+def rrf_score(rank: int, source: str, k: int = 60) -> float:
+    return SOURCE_WEIGHTS[source] / (k + rank)
+```
+
+融合逻辑:
+
+```python
+def merge_by_rrf(source_results: dict[str, list[Candidate]], top_k: int = 30) -> list[Candidate]:
+    merged = {}
+
+    for source, candidates in source_results.items():
+        for rank, candidate in enumerate(candidates, start=1):
+            key = candidate.candidate_key
+            if key not in merged:
+                merged[key] = candidate
+                merged[key].source_hits = {}
+                merged[key].fusion_score = 0.0
+
+            merged[key].fusion_score += rrf_score(rank, source)
+            merged[key].source_hits[source] = {
+                "rank": rank,
+                "vector_similarity": candidate.vector_similarity,
+            }
+
+    for candidate in merged.values():
+        candidate.fusion_score += calc_tag_bonus(candidate)
+        candidate.fusion_score += calc_scope_bonus(candidate)
+
+    return sorted(merged.values(), key=lambda item: item.fusion_score, reverse=True)[:top_k]
+```
+
+### 7.3 加分项
+
+| 加分项 | 条件 | 建议 |
+|--------|------|------|
+| `tag_bonus` | 标准号或完整 tag 命中 | 适度加分 |
+| `scope_bonus` | `chapter_level_1/2/3` 与当前章节匹配 | 适度加分 |
+| `multi_source_bonus` | 同一候选被多个路径召回 | 小幅加分 |
+| `child_hit_bonus` | 多个 child tag 指向同一候选 | 小幅加分,避免频次过度放大 |
+
+---
+
+## 8. rerank 与质量门控
+
+### 8.1 rerank query
+
+rerank 使用 `rerank_query`,不要使用过短的纯关键词,也不要使用包含大量项目噪声的长 query。
+
+推荐格式:
+
+```text
+用户需求:箱梁验收需要满足哪些标准?
+检索意图:查询箱梁工程的验收规范、标准号、梁板安装和机械设备验收要求。
+当前章节:验收要求 / 验收内容。
+```
+
+### 8.2 质量门控
+
+保留现有质量门控,但建议增加:
+
+| 字段 | 用途 |
+|------|------|
+| `fusion_score` | 融合排序可观测 |
+| `source_hits` | 判断命中来源 |
+| `tag_match_terms` | 判断是否存在强关键词命中 |
+| `candidate_key` | 调试去重 |
+
+阈值建议:
+
+- 第一阶段保留 `min_rerank_score: 0.70`,不要直接降到 `0.65`。
+- 通过日志和人工样本评测后再调整。
+- `min_vector_similarity` 只作为参考,父表/子表/tag 多路融合后不宜作为唯一强过滤。
+
+---
+
+## 9. 配置建议
+
+```yaml
+retrieval:
+  enabled: true
+
+  parent_collection: "t_kngs_construction_plan_parent"
+  child_collection: "t_kngs_construction_plan_child"
+
+  parent_recall_top_k: 30
+  child_recall_top_k: 40
+  tag_recall_top_k: 30
+  chapter_recall_top_k: 15
+  recall_top_k: 30
+
+  rerank_top_k: 8
+  submit_top_k: 3
+  min_rerank_score: 0.70
+  min_qualified_count: 1
+
+  max_reference_chars: 4000
+  max_single_reference_chars: 1500
+
+  query_rewrite_enabled: true
+  query_rewrite_with_llm: false
+  max_semantic_query_chars: 120
+  max_rerank_query_chars: 500
+
+  tag_recall_enabled: true
+  tag_terms_limit: 8
+  tag_exact_bonus: 0.08
+  tag_partial_bonus: 0.03
+  multi_source_bonus: 0.02
+  scope_bonus: 0.03
+
+  dense_weight: 0.7
+  sparse_weight: 0.3
+  child_dense_weight: 0.6
+  child_sparse_weight: 0.4
+  ranker_type: "weighted"
+
+  allow_vector_fallback: false
+  allow_unscoped_search: false
+```
+
+---
+
+## 10. 分阶段落地
+
+### 第一阶段:修正召回主链路
+
+必须完成:
+
+1. 新增父表全文 hybrid_search。
+2. 子表召回统一回查父表,不再把 child 短文本交给 rerank。
+3. 召回逻辑从 if-else 改为多路并行或顺序聚合。
+4. 候选唯一键从 `parent_id` 改为 `document_id + parent_id + chunk_id/chapter_title/index`。
+5. 融合排序改为 RRF,不再直接 `max(similarity)`。
+6. `build_query()` 改为输出 `semantic_query` 和 `rerank_query`。
+
+涉及文件:
+
+| 文件 | 改动 |
+|------|------|
+| `core/document_chat/component/retrieval_service.py` | 主改造:signals、父表召回、子表回查、RRF 融合 |
+| `config/document_chat_retrieval.yaml` | 增加父表、tag、融合配置 |
+| `core/construction_write/component/similar_fragment_service.py` | 补齐输出字段,避免无条件拼接同 parent_id 多条父表 |
+
+### 第二阶段:tag 召回与质量提升
+
+建议完成:
+
+1. 增加 tag 精准召回。
+2. 增加标准号、设备名、验收项规则提取。
+3. 增加 `source_hits`、`fusion_score`、`candidate_key` 日志。
+4. 建立 20-50 条真实查询评测集。
+5. 根据评测调整 RRF 权重和质量阈值。
+
+### 第三阶段:数据治理
+
+建议完成:
+
+1. 将父表 `tag_list` 拆成标准化 tag 子记录,保证子表覆盖所有父表 tag。
+2. 清洗父表 text 中重复页眉、项目名、页码和无意义表格占位。
+3. 保证 `parent_id` 以字符串保存和查询,避免科学计数法导致精度损失。
+4. 为 `document_id`、`parent_id`、`chapter_level_1/2/3`、`is_deleted` 建立稳定过滤策略。
+
+---
+
+## 11. 推荐最终形态
+
+最终召回不应是“子表召回后按 parent_id 频次排序”,而应是:
+
+```text
+父表全文语义召回负责覆盖
+子表短标签召回负责精准定位
+tag_list 负责强关键词命中
+章节字段负责范围约束
+RRF 负责多路融合
+rerank 负责最终相关性排序
+quality gate 负责可信引用输出
+```
+
+这样既能解决“子表文本太短导致召回内容不足”,也能避免“只靠父表长文本导致精准标签命中弱”的问题。

+ 85 - 13
views/document_chat/views.py

@@ -1,5 +1,23 @@
 # -*- coding: utf-8 -*-
-"""HTTP API for document chat."""
+"""文档 AI 对话 HTTP API。
+
+提供两个接口:
+    POST /sgbx/document_chat    — 发起对话,支持 SSE 流式和非流式同步两种模式
+    GET  /sgbx/document_chat/health — 健康检查
+
+SSE 流式输出事件类型:
+    connected          — 连接建立
+    processing         — 工作流各阶段进度通知
+    reasoning          — 推理状态(启动、检索、重排、技能执行等)
+    intent             — 意图识别结果
+    retrieval_result   — 检索召回详情(含参考预览)
+    skill_started      — 技能开始执行(answer 或 proposal)
+    chunk              — 技能生成的文本片段(流式逐块输出)
+    answer_completed   — 回答完成
+    proposal_completed — 修改草案完成
+    completed          — 全部完成
+    error              — 异常错误
+"""
 
 import json
 import time
@@ -10,17 +28,20 @@ from fastapi import APIRouter, HTTPException, Query
 from fastapi.responses import StreamingResponse
 
 from foundation.infrastructure.tracing import TraceContext, auto_trace
-from foundation.observability.logger.loggering import write_logger as logger
-
-from core.document_chat.component.document_chat_logger import log_document_chat_event
+from core.document_chat.component.document_chat_logger import document_chat_logger as logger
+from core.document_chat.component.document_chat_logger import log_document_chat_event, log_document_chat_event_truncated
 from core.document_chat.schemas import DocumentChatRequest, DocumentChatResponse, model_to_dict
 
 
 document_chat_router = APIRouter(prefix="/sgbx", tags=["文档编辑AI对话"])
+
+# SSE 事件中对客户端暴露的参考条数上限,防止响应体过大
 MAX_REFERENCES_PER_EVENT = 8
+# 单条参考内容预览长度上限
 REFERENCE_PREVIEW_CHARS = 600
 
 
+# 工作流各阶段的前端提示文案映射
 STAGE_MESSAGES = {
     "workflow_started": "文档 AI 对话工作流已启动",
     "recognize_intent": "已完成用户意图识别",
@@ -33,16 +54,23 @@ STAGE_MESSAGES = {
 
 
 def format_sse_event(event_type: str, data: dict) -> str:
+    """格式化为 SSE event + data 行。"""
     return f"event: {event_type}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
 
 
 def get_document_chat_workflow():
+    """延迟加载工作流实例,避免循环导入。"""
     from core.document_chat.workflows.document_chat_workflow import document_chat_workflow
 
     return document_chat_workflow
 
 
 def _iter_node_updates(raw_update: Any) -> Iterable[Tuple[str, Dict[str, Any]]]:
+    """解析 LangGraph 的 updates 负载,提取 (节点名, 更新内容) 对。
+
+    如果 raw_update 的键本身就是节点名,直接返回;
+    否则把整个 payload 作为 single-stage 更新处理。
+    """
     if not isinstance(raw_update, dict):
         return []
 
@@ -58,11 +86,13 @@ def _iter_node_updates(raw_update: Any) -> Iterable[Tuple[str, Dict[str, Any]]]:
 
 
 def _merge_state_update(state: Dict[str, Any], update: Dict[str, Any]) -> None:
+    """将节点返回的增量字段合并到全局状态。"""
     for key, value in update.items():
         state[key] = value
 
 
 def _preview_text(text: Any, limit: int = REFERENCE_PREVIEW_CHARS) -> str:
+    """截取文本预览,超过 limit 长度的加 "..." 后缀。"""
     value = str(text or "").strip()
     if len(value) <= limit:
         return value
@@ -70,6 +100,7 @@ def _preview_text(text: Any, limit: int = REFERENCE_PREVIEW_CHARS) -> str:
 
 
 def _safe_metadata(metadata: Any) -> Dict[str, Any]:
+    """过滤出 SSE 事件允许透传的 metadata 白名单字段。"""
     if not isinstance(metadata, dict):
         return {}
     allowed_keys = (
@@ -87,6 +118,7 @@ def _safe_metadata(metadata: Any) -> Dict[str, Any]:
 
 
 def _pack_reference_preview(item: Dict[str, Any]) -> Dict[str, Any]:
+    """将单条检索参考压缩为前端预览格式(来源 + 内容预览 + 相似度)。"""
     metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {}
     content = item.get("content") if "content" in item else item.get("text")
     data = {
@@ -101,10 +133,12 @@ def _pack_reference_preview(item: Dict[str, Any]) -> Dict[str, Any]:
 
 
 def _limited_items(items: List[Dict[str, Any]], packer) -> List[Dict[str, Any]]:
+    """截断列表至上限,并对每条应用打包函数。"""
     return [packer(item) for item in (items or [])[:MAX_REFERENCES_PER_EVENT] if isinstance(item, dict)]
 
 
 def _reasoning_event(callback_task_id: str, node_name: str, state: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
+    """构建 reasoning 阶段事件:有错误时标记 failed,否则 processing。"""
     status = "failed" if state.get("error_message") else "processing"
     return (
         "reasoning",
@@ -123,11 +157,18 @@ def _build_realtime_events(
     node_name: str,
     skill_started_sent: bool,
 ) -> Tuple[List[Tuple[str, Dict[str, Any]]], bool]:
+    """根据当前节点和状态构建需要推送的 SSE 事件列表。
+
+    每个节点可能产生多个事件类型(reasoning + 专项事件),
+    skill_started_sent 用于防止 quality_gate 阶段重复推送 skill_started。
+    """
     events: List[Tuple[str, Dict[str, Any]]] = []
 
+    # 通用推理进度事件
     if node_name in STAGE_MESSAGES:
         events.append(_reasoning_event(callback_task_id, node_name, state))
 
+    # 意图识别完成事件
     if node_name == "recognize_intent" and state.get("intent_result"):
         events.append(
             (
@@ -139,6 +180,7 @@ def _build_realtime_events(
             )
         )
 
+    # 检索结果事件(含参考预览)
     if node_name == "rerank_context":
         reranked = state.get("reranked_references") or []
         events.append(
@@ -156,6 +198,7 @@ def _build_realtime_events(
             )
         )
 
+    # 技能开始执行通知(quality_gate 之后、实际调用技能之前)
     if node_name == "quality_gate":
         intent_result = state.get("intent_result") or {}
         skill_name = intent_result.get("skill_name") or ""
@@ -179,9 +222,21 @@ def _build_realtime_events(
 @document_chat_router.post("/document_chat")
 @auto_trace(generate_if_missing=True)
 async def document_chat(request: DocumentChatRequest, stream: bool = Query(False)):
+    """文档 AI 对话主接口。
+
+    参数:
+        stream: true 时走 SSE 流式响应
+        request.response_mode: "sse" 时同样走 SSE,"json" 时走同步返回
+
+    流程:
+        1. 生成 callback_task_id 用于全链路追踪
+        2. 记录请求入日志(截断模式,避免大 payload)
+        3. 流式:返回 StreamingResponse,逐步推送事件
+        4. 非流式:同步执行工作流,一次性返回结果
+    """
     callback_task_id = f"doc_chat_{uuid.uuid4().hex[:12]}"
     TraceContext.set_trace_id(callback_task_id)
-    log_document_chat_event(
+    log_document_chat_event_truncated(
         "request_received",
         callback_task_id,
         {
@@ -202,6 +257,7 @@ async def document_chat(request: DocumentChatRequest, stream: bool = Query(False
             },
         )
 
+    # 同步模式:阻塞等待工作流执行完毕
     try:
         workflow = get_document_chat_workflow()
         state = await workflow.run(request, callback_task_id)
@@ -226,6 +282,12 @@ async def _generate_document_chat_events(
     callback_task_id: str,
     request: DocumentChatRequest,
 ) -> AsyncGenerator[str, None]:
+    """SSE 流式生成器。逐步推送工作流执行事件。
+
+    事件推送顺序:
+        connected → processing → (reasoning / intent / retrieval_result) × N
+        → chunk × M → answer_completed / proposal_completed → completed
+    """
     started_at = time.time()
     try:
         yield format_sse_event(
@@ -250,19 +312,24 @@ async def _generate_document_chat_events(
         state = workflow.build_initial_state(request, callback_task_id)
         graph_state = dict(state)
         skill_started_sent = False
+        custom_event_count = 0
 
         async for mode, payload in workflow.get_graph().astream(
             graph_state, stream_mode=["updates", "custom"]
         ):
-            if mode == "custom" and isinstance(payload, dict) and payload.get("stream_chunk"):
-                yield format_sse_event(
-                    "chunk",
-                    {
-                        "callback_task_id": callback_task_id,
-                        "chunk": payload["stream_chunk"],
-                    },
-                )
+            if mode == "custom" and isinstance(payload, dict):
+                # custom 事件:技能流式输出的文本片段
+                custom_event_count += 1
+                if payload.get("stream_chunk"):
+                    yield format_sse_event(
+                        "chunk",
+                        {
+                            "callback_task_id": callback_task_id,
+                            "chunk": payload["stream_chunk"],
+                        },
+                    )
             elif mode == "updates":
+                # updates 事件:节点完成,更新状态并推送对应事件
                 for node_name, node_update in _iter_node_updates(payload):
                     _merge_state_update(state, node_update)
                     realtime_events, skill_started_sent = _build_realtime_events(
@@ -274,6 +341,9 @@ async def _generate_document_chat_events(
                     for event_type, event_data in realtime_events:
                         yield format_sse_event(event_type, event_data)
 
+        logger.info(f"[DocumentChat] SSE stream completed: custom_events_received={custom_event_count}")
+
+        # 工作流执行完毕,推送最终结果事件
         data = workflow.to_response_data(state)
         data_dict = model_to_dict(data)
         log_document_chat_event("response_completed", callback_task_id, data_dict)
@@ -287,6 +357,7 @@ async def _generate_document_chat_events(
         else:
             yield format_sse_event("error", data_dict)
 
+        # 非错误时推送 completed 事件(含耗时)
         if data.response_type != "error":
             yield format_sse_event(
                 "completed",
@@ -316,6 +387,7 @@ async def _generate_document_chat_events(
 
 @document_chat_router.get("/document_chat/health")
 async def document_chat_health():
+    """健康检查:返回模块状态和工作流基本信息。"""
     return {
         "status": "healthy",
         "module": "document_chat",

Alguns ficheiros não foram mostrados porque muitos ficheiros mudaram neste diff