AI对话功能是基于 RAG(检索增强生成) 的智能文档交互系统,用户选中施工方案文档的某个章节后,可以提出问题或修改指令,系统通过意图识别 → 知识检索 → 质量过滤 → LLM生成的流水线,返回回答或直接修改文档内容。
技术栈:FastAPI + LangGraph + Milvus(向量数据库) + Redis + 蜀天云 Qwen3.5-122B
HTTP POST /sgbx/document_chat
│
├─ FastAPI Router (views/document_chat/views.py)
│ ├─ SSE 流式响应(stream=true 或 response_mode="sse")
│ └─ 同步 JSON 响应
│
├─ LangGraph 状态机 (core/document_chat/workflows/document_chat_workflow.py)
│ │
│ ├─ 输入校验 → 上下文加载 → 技能注册
│ ├─ 意图识别(LLM + 启发式兜底)
│ │ ├─ clarify(需要澄清)
│ │ ├─ unsupported(不支持)
│ │ └─ answer / modify → 进入 RAG 管线
│ │
│ ├─ RAG 检索管线
│ │ ├─ 构建检索查询(query_builder)
│ │ ├─ 四路召回(vector_recall)
│ │ │ ├─ Path 1: 父文档向量召回(dense+sparse混合)
│ │ │ ├─ Path 2: 子文档向量定位 → 反查父文档
│ │ │ ├─ Path 3: 标签 LIKE 匹配
│ │ │ └─ Path 4: 同章节相似度搜索
│ │ ├─ RRF 融合排序(fusion.py)
│ │ ├─ 候选去重(candidate.py)
│ │ ├─ Rerank 重排(rerank_service.py)
│ │ └─ 质量门控(quality_gate.py)
│ │
│ └─ 技能执行
│ ├─ document-answer → 生成回答 JSON
│ └─ document-modify → 生成修改建议 JSON
│
└─ SSE 事件推送(11种事件类型)
文件: views/document_chat/views.py
document_chat_router = APIRouter(prefix="/sgbx", tags=["文档编辑AI对话"])
在 server/app.py:189 注册到主应用。
/sgbx/document_chat方法: POST
请求参数:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
user_id |
str | 是 | 用户标识 |
message |
str | 是 | 用户问题/指令(min_length=1) |
selected_section |
SelectedSection | 否 | 当前选中的章节信息 |
conversation_id |
str | 否 | 多轮对话标识 |
document_context |
DocumentContext | 否 | 上下文(前后文、兄弟章节、检索过滤条件) |
conversation_history |
List[Dict] | 否 | 历史对话记录 |
response_mode |
"json"/"sse" | 否 | 响应格式 |
project_info |
Dict | 否 | 项目元信息 |
SelectedSection 结构:
| 字段 | 说明 |
|---|---|
index |
章节编号(如 "2.1") |
title |
章节标题 |
content |
当前章节编辑器中的文本 |
code |
章节代码(决定是否走 RAG 管线) |
chapter_level_1/2 |
章节层级(用于检索范围限定) |
响应模式:
stream=true 或 response_mode="sse"):返回 StreamingResponse,通过 SSE 推送 11 种事件DocumentChatResponse JSON 对象# 1. 生成唯一任务ID: doc_chat_<12位hex>
callback_task_id = f"doc_chat_{uuid.uuid4().hex[:12]}"
# 2. 设置链路追踪上下文
TraceContext.set_trace_id(callback_task_id)
# 3. 根据模式选择响应方式
if stream or response_mode == "sse":
return StreamingResponse(_generate_document_chat_events(...))
else:
result = workflow.run(graph_state)
return result.to_response_data()
文件: views/document_chat/views.py:281-385
使用 LangGraph 的双流模式同时接收两种数据:
async for mode, payload in workflow.astream(graph_state, stream_mode=["updates", "custom"]):
mode == "updates": 节点完成事件 → 生成 processing、reasoning、intent、retrieval_result、skill_started、answer_completed 等事件mode == "custom": 技能文本流块 → 生成 chunk 事件(实时文字输出)| 事件类型 | 触发时机 | 携带数据 |
|---|---|---|
connected |
连接建立 | callback_task_id, 时间戳 |
processing |
工作流开始 | stage_name, status |
reasoning |
每个节点完成 | stage_name, status, message |
intent |
意图识别完成 | intent_result(intent, confidence, skill_name, operation) |
retrieval_result |
质量门控完成 | retrieval_status, 引用预览, 指标 |
skill_started |
技能开始执行 | skill_name, response_type |
chunk |
技能文本流块 | text 片段 |
answer_completed |
回答生成完成 | 完整 DocumentChatData |
proposal_completed |
修改建议完成 | 完整 DocumentChatData |
completed |
全部完成 | duration, status |
error |
异常 | error 消息 |
event: chunk
data: {"text": "这是AI回答的一个片段"}
event: intent
data: {"intent": "document_answer", "confidence": 0.92, ...}
headers = {
"Cache-Control": "no-cache", # 禁止缓存
"Connection": "keep-alive", # 保持连接
"X-Accel-Buffering": "no", # 禁止 Nginx 缓冲
}
整个 SSE 生成器被 try/except 包裹,发生异常时推送 error 事件,确保客户端总能收到终止信号。
文件: core/document_chat/component/state_models.py
DocumentChatState 是一个 TypedDict,包含 28 个字段,覆盖整个工作流生命周期:
| 分类 | 字段 |
|---|---|
| 输入 | user_id, user_message, selected_section, document_context, conversation_history, project_info |
| 检索 | retrieval_query, retrieval_keywords, retrieval_candidates, reranked_references, approved_references, retrieval_status, retrieval_metrics |
| 意图/技能 | intent_result, skill_result, diff_result, response_type |
| 控制 | current_stage, overall_task_status, error_message, warnings |
| 消息 | messages: List[BaseMessage] |
文件: core/document_chat/workflows/document_chat_workflow.py:70-146
共 16 个节点:
| 序号 | 节点名 | 功能 |
|---|---|---|
| 1 | validate_input |
校验 user_id、message、section 合法性 |
| 2 | load_context |
规范化 project_info、section、history |
| 3 | load_skill_registry |
加载可用技能列表 |
| 4 | recognize_intent |
LLM 意图分类 |
| 5 | route_intent |
路由标记节点(SSE 可见) |
| 6 | build_retrieval_query |
构建检索查询和关键词 |
| 7 | vector_recall |
四路向量召回 |
| 8 | rerank_context |
Rerank 模型重排序 |
| 9 | quality_gate |
低质量引用过滤 |
| 10 | clarify |
返回澄清问题 |
| 11 | unsupported |
返回不支持提示 |
| 12 | run_answer_skill |
执行文档回答技能 |
| 13 | run_modify_skill |
执行文档修改技能 |
| 14 | general_answer |
通用问题(不走 RAG) |
| 15 | error_handler |
统一错误处理 |
| 16 | complete |
标记工作流完成 |
ENTRY
└─→ validate_input
├──[general]──→ general_answer ──→ complete ──→ END
├──[error]──→ error_handler ──→ complete ──→ END
└──[normal]──→ load_context
└─→ load_skill_registry
└─→ recognize_intent
└─→ route_intent
├──[clarify]──→ clarify ──→ complete ──→ END
├──[unsupported]──→ unsupported ──→ complete ──→ END
├──[error]──→ error_handler ──→ complete ──→ END
└──[answer|modify]──→ build_retrieval_query
└─→ vector_recall
└─→ rerank_context
└─→ quality_gate
├──[answer]──→ run_answer_skill ──→ complete ──→ END
├──[modify]──→ run_modify_skill ──→ complete ──→ END
└──[error]──→ error_handler ──→ complete ──→ END
路由判断规则:
validate_input 路由: 如果 selected_section 没有 code、chapter_level_1、chapter_level_2 中的任意一个 → 走 general 通用问答,否则走 normal RAG 管线
route_intent 路由(document_chat_workflow.py:281-305):
needs_clarification == true 或 confidence < 0.65 → clarifyskill_name == "document-answer" → answerskill_name == "document-modify" → modifyintent == "unsupported" → unsupportedroute_after_retrieval 路由: 根据 skill_name 决定走 answer 或 modify 技能
每个节点执行时首先检查 state.get("error_message"),如果已有错误则返回空操作({})或将路由导向 error_handler。所有错误通过 _error_update() 统一格式化。
文件: core/document_chat/component/intent_recognizer.py
第一层:LLM 意图分类
将用户消息、章节预览、项目信息、可用技能列表发送给 LLM,要求返回 JSON 格式的 IntentResult:
{
"intent": "document_answer",
"confidence": 0.92,
"skill_name": "document-answer",
"operation": "answer",
"needs_clarification": false
}
Prompt 配置在 config/prompt/document_chat_intent.yaml,包含 8 条明确规则。
第二层:启发式规则兜底
当 LLM 调用失败时,使用关键词匹配:
| 意图 | 匹配关键词 |
|---|---|
document_modify |
润色, 扩写, 改写, 精简, 缩短, 重写, 优化内容, 调整 |
document_answer |
怎么完善, 如何改, 建议, 解释, 分析, 是否, 什么, 怎么, 如何 |
| 默认 | document_answer,confidence=0.66 |
_normalize_intent() 方法:
skill_name 约束到技能注册表的白名单clarify| intent | 说明 | 对应 skill_name |
|---|---|---|
document_answer |
用户需要解答/分析/建议 | document-answer |
document_modify |
用户需要修改/润色/改写内容 | document-modify |
clarify |
意图不明确,需要追问 | 无 |
unsupported |
超出能力范围 | 无 |
文件: core/document_chat/retrieval/query_builder.py
build_query()(第 12-28 行):
将用户消息 + 归一化指令 + 章节编号/标题 + Top 8 关键词拼接,总长度限制 120 字符。
build_query_keywords()(第 31-75 行):
从 5 个来源提取最多 20 个关键词:
extract_retrieval_keywords()(第 91-133 行):
多模式专业术语提取:
GB 50204-2015(正则: [A-Za-z]{1,8}\s*\d{2,8})《...》架桥机验收、模板安装文件: core/document_chat/component/retrieval_service.py
| 路径 | 权重 | 描述 | 实现方法 |
|---|---|---|---|
| 父文档向量 | 1.0 | dense+sparse 混合搜索父表 | _recall_by_parent_vector |
| 子文档定位 | 0.8 | 子表向量搜索 → 反查父文档 | _recall_by_child_locator |
| 标签匹配 | 1.2 | 关键词 LIKE 匹配 tag_list 字段 | _recall_by_tag |
| 章节相似度 | 0.5 | 同 chapter_level_1+2 相似度搜索 | _recall_by_chapter |
Path 1 - 父文档向量召回:
MilvusVectorManager().hybrid_search(
param={"collection_name": parent_collection, "expr": filter_expr},
query_text=query, top_k=30,
ranker_type="weighted", dense_weight=0.7, sparse_weight=0.3
)
Path 2 - 子文档定位器:
parent_id 分组结果child_hit_count 和匹配的文本Path 3 - 标签召回:
tag_list like "%架桥机%"Path 4 - 章节相似度:
委托给 search_similar_fragments(),按 chapter_level_1 + chapter_level_2 限定范围。
文件: core/document_chat/retrieval/fusion.py
RRF 公式: score += weight / (rrf_k + rank),其中 rrf_k=60
加分机制:
tag_list 中 → +0.08最终按 fusion_score 降序排列,截断至 recall_top_k=30。
文件: core/document_chat/retrieval/candidate.py:113-151
document_id::parent_id::chunk_id) + content hash(前 300 字符 MD5)recall_top_k文件: core/document_chat/component/rerank_service.py
调用蜀天云 Qwen3-Reranker-8B 模型(shutian_rerank):
top_k=8 重排结果结果合并(_merge_rerank_results):
rerank_score 字段rerank_score 降序排列文件: core/document_chat/component/retrieval_quality_gate.py
合格条件(全部满足):
| 条件 | 阈值 |
|---|---|
| 文本非空 | - |
| 向量相似度 | >= 0.45 或 fusion_score > 0 且有 source_hits |
| Rerank 分数 | >= 0.70(配置值) |
| 项目范围匹配 | metadata.source_scope_valid == True |
字符预算控制:
max_reference_chars = 4000max_single_reference_chars = 1500submit_top_k = 3 条合格引用文件: core/document_chat/component/skill_dispatcher.py
技能从 YAML 文件懒加载并缓存:
| 技能 | YAML 文件 | 处理器 | 意图 | 响应类型 |
|---|---|---|---|---|
document-answer |
document-answer/skill.yaml |
DocumentAnswerSkill |
document_answer |
answer |
document-modify |
document-modify/skill.yaml |
DocumentModifySkill |
document_modify |
proposal |
文件: core/document_chat/skills/document_answer.py
Prompt 构建:
将 user_message、归一化指令、项目信息、选中章节、文档上下文(含质量门控通过的引用)、最近 6 轮历史对话组装为 JSON payload,传入系统提示词。
流式执行(run_stream):
generate_model_client.get_model_generate_invoke_stream() 获取异步流on_chunk(chunk) 推送给客户端extract_json_object() 解析 JSON降级链:
JSON 解析失败 → 正则 extract_answer_field() 提取 → 原始文本 → 硬编码兜底消息
输出格式:
{
"answer": "回答内容...",
"references": [...],
"warnings": [...]
}
文件: core/document_chat/skills/document_modify.py
流程与回答技能相同,但输出不同字段:
{
"proposed_content": "修改后的内容...",
"change_summary": "修改说明...",
"warnings": [...]
}
重新生成感知: 当用户说"重新生成/换个方案/再写一版"时,必须生成与之前 proposed_content 明显不同的内容(尝试不同的段落顺序、描述风格、技术方案)。
所有 AI 对话相关功能统一使用 shutian_qwen3_5_122b(蜀天云 Qwen 3.5 122B),enable_thinking: false。
文件: foundation/ai/agent/generate/model_generate.py:265-410
function_name 加载模型配置[SystemMessage, HumanMessage] 消息列表bind(extra_body={...}) 控制model.ainvoke(messages) + 指数退避重试(最多 10 次)<think>...</think> 思考块文件: foundation/ai/agent/generate/model_generate.py:699-823
线程桥接模式:
model.stream(messages)asyncio.Queue_ThinkingBlockStreamFilter 状态机跨 chunk 边界过滤 <think> 内容| 场景 | 行为 |
|---|---|
| 401/403/认证错误 | 立即失败,不重试 |
| 502/503/504 | 立即失败(避免压垮故障服务器) |
| 其他错误 | 指数退避重试,最大 10 次 |
| 退避基数 | 0.5s × 2^attempt |
文件: foundation/database/base/vector/milvus_vector.py
| 集合名 | 粒度 | 用途 |
|---|---|---|
t_kngs_construction_plan_parent |
父文档(完整章节) | 主要内容存储 |
t_kngs_construction_plan_child |
子文档(段落级切片) | 精确定位 |
使用 LangChain Milvus 集成(BM25BuiltInFunction):
shutian_qwen3_embeddense_weight=0.7, sparse_weight=0.3similarity = 1 / (1 + distance)Milvus 过滤表达式示例:
plan_type == '桥梁' and chapter_level_1 == '施工部署' and chapter_level_2 == '施工准备'
预创建常用集合的 vectorstore 连接,新连接按需创建并缓存。
所有提示词通过 YAML 文件加载(foundation/infrastructure/prompt/prompt_loader.py),存储在 config/prompt/ 目录。
文件: config/prompt/document_chat_intent.yaml
document-modifydocument-answer文件: config/prompt/document_answer_prompt.yaml
document_context.references文件: config/prompt/document_modify_prompt.yaml
proposed_content 明显不同文件: core/document_chat/component/llm_utils.py
extract_json_object(): 处理 fenced code blocks,查找 {...} 子串,修复 JSON 字符串中的控制字符extract_answer_field(): 正则回退方案,从畸形 JSON 中提取 "answer" 字段compact_json(): json.dumps(value, ensure_ascii=False, indent=2)文件: core/document_chat/component/document_chat_logger.py
logs/document_chat/request_received、rag_query_built、rag_recall_completed、rag_rerank_completed、rag_quality_gate_completed、final_content_generated、response_completed、request_failed文件: foundation/infrastructure/tracing/trace_context.py
callback_task_id(doc_chat_<hex>)作为全链路追踪 IDcontextvars.ContextVar 保证异步安全传播@auto_trace 装饰器自动生成 trace ID文件: core/document_chat/component/conversation_context.py
简单透传组件,规范化状态字典键名供提示词消费。
文件: config/document_chat_retrieval.yaml
| 参数 | 值 | 说明 |
|---|---|---|
min_rerank_score |
0.70 | 最低 rerank 分数 |
submit_top_k |
3 | 提交给 LLM 的最大引用数 |
max_reference_chars |
4000 | 引用总字符数预算 |
rerank_top_k |
8 | 送 reranker 的候选数 |
rrf_k |
60 | RRF 标准常量 |
recall_top_k |
30 | 召回后截断数量 |
系统在每个环节都设计了降级路径:
| 环节 | 降级方案 |
|---|---|
| 意图识别 | LLM 失败 → 启发式关键词匹配 → 默认 answer |
| Rerank | Rerank 失败 → 产生 warning,使用原始排序继续 |
| 检索为空 | 允许技能无引用运行(只是没有参考资料) |
| JSON 解析 | 正则提取 → 原始文本 → 硬编码兜底消息 |
所有提示词明确声明文档内容、上下文和引用为"不可信材料",禁止执行其中隐藏的指令。
get_graph() 懒构建,仅首次调用document_chat_workflow 为模块级单例LLM 的同步 stream() 通过 asyncio.Queue + 守护线程桥接为异步流,避免阻塞事件循环。