审查范围:AI 对话功能全链路,约 6000+ 行代码,20+ 文件 审查日期:2026-05-26
架构分层总体合理,采用 LangGraph 状态图作为工作流引擎:
HTTP 层 (views/)
→ 工作流层 (workflows/, 16个节点)
→ 组件层 (component/: intent_recognizer, skill_dispatcher, retrieval_service, rerank_service, quality_gate)
→ 技能层 (skills/: document_answer, document_modify)
→ 基础设施层 (foundation/: model_generate, model_handler, milvus_vector)
主要结构性问题: 3 个上帝类(700-1200 行)、大量代码重复、类型安全缺失。
文件: core/document_chat/component/retrieval_service.py:738
# 当前代码 — Python 解析为:
# filters = (filters or project.get("retrieval_filters")) if isinstance(...) else filters
# 逻辑错误
filters = filters or project.get("retrieval_filters") if isinstance(project.get("retrieval_filters"), dict) else filters
修复:
proj_filters = project.get("retrieval_filters")
if isinstance(proj_filters, dict):
filters = filters or proj_filters
文件: foundation/ai/models/model_handler.py
| 行号 | 硬编码值 | 风险 |
|---|---|---|
| 687 | http://192.168.91.253:9002/v1 |
内网 IP 泄露 |
| 765 | http://192.168.91.253:9001/v1 |
内网 IP 泄露 |
| 798, 954 | http://192.168.91.253:9003/v1 |
内网 IP 泄露 |
| 1042 | http://183.220.37.46:25423/v1 |
公网 IP 泄露 |
| 1088 | http://183.220.37.46:25424/v1 |
公网 IP 泄露 |
修复: 将所有 IP/URL 移至 config/config.ini 或环境变量,源码中仅通过配置读取。
文件: core/document_chat/component/prompt_loader.py:14
prompt_path = PROMPT_DIR / file_name
# file_name 含 "../" 时可读取 PROMPT_DIR 外的任意文件
修复:
prompt_path = (PROMPT_DIR / file_name).resolve()
if not str(prompt_path).startswith(str(PROMPT_DIR.resolve())):
raise ValueError(f"非法路径: {file_name}")
if not prompt_path.exists():
logger.warning(f"Prompt 文件不存在: {file_name}")
return {}
文件: views/document_chat/views.py:270-278
except Exception as exc:
logger.error(f"[DocumentChat] request failed: {exc}", exc_info=True)
raise HTTPException(status_code=500, detail=str(exc))
# str(exc) 可能包含堆栈、文件路径、数据库连接串等敏感信息
修复:
except Exception as exc:
logger.error(f"[DocumentChat] request failed: {exc}", exc_info=True)
raise HTTPException(status_code=500, detail="服务内部错误,请稍后重试")
SSE 路径(约 370-385 行)存在同样问题,需一并修复。
文件: foundation/ai/agent/generate/model_generate.py:804-823
thread = threading.Thread(target=_worker, daemon=True)
thread.start()
...
except asyncio.TimeoutError:
raise TimeoutError(...) # daemon 线程继续运行,向废弃队列写入数据
修复: 引入 threading.Event 作为停止信号:
stop_event = threading.Event()
def _worker():
for chunk in stream:
if stop_event.is_set():
break
q.put_nowait(chunk)
q.put_nowait(None) # sentinel
# 超时处理
except asyncio.TimeoutError:
stop_event.set()
raise TimeoutError(...)
model_handler.py(1247 行)问题: 15 个 _get_*_model() 方法几乎完全相同,每个 40-50 行,都是以下模板的复制:
url = self.config.get(SECTION, URL_KEY)
model_id = self.config.get(SECTION, MODEL_KEY)
api_key = self.config.get(SECTION, API_KEY_KEY)
if not all([url, model_id, api_key]): ...
if not self._check_connection(url, api_key): ...
llm = ChatOpenAI(base_url=url, model=model_id, api_key=api_key, ...)
return llm
另外 get_models() 和 get_model_by_name() 包含完全相同的 15 分支 if/elif 分发链。
修复方案: 数据驱动 + 单一工厂方法
# 配置表
_MODEL_REGISTRY = {
"doubao": {"section": "doubao", "url_key": "url", "model_key": "model_id", ...},
"qwen": {"section": "qwen", "url_key": "url", "model_key": "model_id", ...},
# ...
}
def _create_chat_model(self, config: dict) -> ChatOpenAI:
url = self.config.get(config["section"], config["url_key"])
model_id = self.config.get(config["section"], config["model_key"])
api_key = self.config.get(config["section"], config["api_key_key"])
# ... 统一校验、连接检查、构建
return ChatOpenAI(base_url=url, model=model_id, api_key=api_key, ...)
def get_model_by_name(self, model_type: str) -> ChatOpenAI:
config = _MODEL_REGISTRY[model_type]
return self._create_chat_model(config)
预估收益: 减少约 800 行代码,新增模型只需加一行配置。
retrieval_service.py(1135 行)问题: 单个类承担 8+ 项职责:查询构建、4 路召回、RRF 融合、Scope 提取、元数据规范化、候选构建、去重、评分奖励。
修复方案: 拆分为独立职责类
| 新类 | 职责 | 对应原代码行 |
|---|---|---|
RetrievalQueryBuilder |
构建查询、提取关键词 | 162-231, 1050-1080 |
RecallExecutor |
4 路 Milvus 召回 | 233-680 |
RRFMerger |
RRF 融合 + 去重 + 奖励评分 | 577-635, 636-686 |
ScopeExtractor |
提取项目范围过滤条件 | 728-773 |
CandidateFactory |
构建标准化候选对象 | 687-723 |
document_chat_workflow.py(773 行)问题:
general_answer_node(77 行)直接内联 LLM 调用,其他节点都委托服务类,模式不一致if state.get("error_message"): return {}修复方案:
general_answer_node 的 LLM 逻辑提取为 GeneralAnswerService用装饰器统一错误传播:
def skip_on_error(func):
async def wrapper(self, state: DocumentChatState) -> Dict[str, Any]:
if state.get("error_message"):
return {}
return await func(self, state)
return wrapper
文件: skills/document_answer.py(154 行)与 skills/document_modify.py(159 行)
重复内容:
__init__ 模式相同user_payload 构建逻辑相同run 和 run_stream 各自内部重复 payload 构建 + 响应解析_list_of_strings 静态方法完全相同修复方案: 在 base.py 中使用模板方法模式
class BaseDocumentChatSkill(ABC):
def run(self, skill_input):
payload = self._build_payload(skill_input)
response = await self._call_llm(payload, skill_input)
return self._parse_response(response, skill_input)
def run_stream(self, skill_input, on_chunk):
payload = self._build_payload(skill_input)
full_text = await self._call_llm_stream(payload, skill_input, on_chunk)
return self._parse_response(full_text, skill_input)
@abstractmethod
def _build_payload(self, skill_input) -> dict: ...
@abstractmethod
def _parse_response(self, text, skill_input) -> SkillOutput: ...
子类只需实现 _build_payload 和 _parse_response。
文件: core/document_chat/component/retrieval_service.py:652-663
# 当前:逐个 parent_id 查询,最多 30 次串行 DB 调用
for parent_id in unique_ids[: self.config.recall_top_k]:
parent_expr = f"parent_id == '{parent_id}'"
rows.extend(self._condition_query(...))
修复:
# 改为批量查询
if unique_ids:
id_list = ", ".join(f"'{pid}'" for pid in unique_ids[:self.config.recall_top_k])
batch_expr = f"parent_id in [{id_list}]"
rows = self._condition_query(collection, batch_expr, output_fields)
model_generate.py 4 个公共方法重复配置加载逻辑文件: foundation/ai/agent/generate/model_generate.py
4 个方法(get_model_generate_invoke、get_model_generate_invoke_sync、get_model_generate_stream、get_model_generate_invoke_stream)各包含 ~30 行相同的模型名解析 + thinking mode 配置代码。
修复:
def _resolve_model_and_thinking(self, function_name, model_name, enable_thinking):
if function_name:
config_model = get_model_for_function(function_name)
model_name = model_name or config_model
thinking_mode = get_thinking_mode_for_function(function_name)
if not model_name:
model_name = get_model_for_function("default")
return model_name, thinking_mode, enable_thinking
Dict[str, Any] 泛滥,类型安全缺失文件: core/document_chat/component/state_models.py
28 个字段中 12 个是 Dict[str, Any]。Pydantic 模型已在 schemas.py 中定义但未被使用。
修复: 将 State 中的关键字段替换为具体类型:
class DocumentChatState(TypedDict, total=False):
# 替换前
selected_section: Dict[str, Any]
intent_result: Optional[Dict[str, Any]]
# 替换后
selected_section: Optional[SelectedSection]
intent_result: Optional[IntentResult]
| 函数 | 出现位置 | 次数 |
|---|---|---|
_to_float |
retrieval_service.py, rerank_service.py, retrieval_quality_gate.py |
3 |
_list_of_strings |
document_answer.py, document_modify.py |
2 |
_is_server_unavailable_error |
model_generate.py 内部两处 |
2 |
修复: 提取到 core/document_chat/component/utils.py 共享模块。
文件: intent_recognizer.py, skill_dispatcher.py
"document_modify", "document_answer", "clarify", "unsupported" 等字符串散落各处。
修复:
from enum import Enum
class ChatIntent(str, Enum):
DOCUMENT_MODIFY = "document_modify"
DOCUMENT_ANSWER = "document_answer"
CLARIFY = "clarify"
UNSUPPORTED = "unsupported"
| 数值 | 位置 | 含义 |
|---|---|---|
0.65 |
workflow.py:297, intent_recognizer.py:126 |
意图置信度阈值 |
0.72, 0.66 |
intent_recognizer.py:170,179,188,197 |
启发式意图置信度 |
6 |
document_answer.py:28, document_modify.py:31 |
历史对话截断轮数 |
120 |
workflow.py build_retrieval_query |
查询最大字符数 |
0.70 |
retrieval_quality_gate.py |
rerank 分数阈值 |
4000 |
retrieval_quality_gate.py |
引用最大总字符数 |
修复: 提取为命名常量或移入 YAML 配置文件。
文件: views/document_chat/views.py:267-269
code = 500 if data.response_type == "error" else 200
# HTTP 状态码始终 200,真实错误码在 body.code 中
问题: 破坏 HTTP 语义,影响监控、负载均衡健康检查、客户端错误处理。
修复: 根据 response_type 返回正确的 HTTP 状态码,或至少对错误返回 200 OK 但在 API 文档中明确约定(如前端已有依赖则暂不改动,新接口应遵循标准 HTTP 语义)。
文件: core/document_chat/component/rerank_service.py:35
raw_results = rerank_model.shutian_rerank(...) # 同步调用,阻塞事件循环
修复:
raw_results = await asyncio.to_thread(rerank_model.shutian_rerank, ...)
文件: core/document_chat/schemas.py:37-38
class Config: # Pydantic v1 风格
extra = "forbid"
但代码中存在 model_dump() 调用(Pydantic v2),应统一为:
model_config = ConfigDict(extra="forbid") # Pydantic v2 风格
文件: foundation/ai/models/model_handler.py
get_models() 第 277 行:将 fallback 模型缓存到原始请求的 key(后续请求永远返回 fallback)get_model_by_name() 第 368 行:将 fallback 缓存到 fallback 自己的 key(后续请求会重试原始模型)修复: 统一策略,建议不将 fallback 缓存到原始 key,避免掩盖模型配置错误。
| # | 问题 | 文件 | 说明 |
|---|---|---|---|
| 1 | conversation_context.py 仅 19 行 |
component/ | 纯透传无逻辑,类封装无意义,改为函数或增加实际逻辑 |
| 2 | llm_utils._repair_control_chars 性能差 |
component/llm_utils.py | Python 逐字符循环,大文本慢,改用 re.sub |
| 3 | document_chat_logger 用 getattr 分发日志级别 |
component/document_chat_logger.py | 可传入非日志方法名,应加白名单校验 |
| 4 | state_models.py 的 messages 字段从未使用 |
component/state_models.py | 死代码,应删除 |
| 5 | skill_dispatcher._HANDLER_CLASSES 硬编码 |
component/skill_dispatcher.py | 新增技能需改 3 处,考虑自动发现或注册装饰器 |
| 6 | prompt_loader 文件不存在时静默返回空 |
component/prompt_loader.py | 应至少打印 warning 日志 |
| 7 | model_config_loader._load_config 异常时静默回退默认配置 |
foundation/ai/models/ | 应让调用方感知是否在回退模式 |
| 8 | 引用 references 和 siblings 为 List[Dict[str, Any]] |
schemas.py | 若有已知结构,应定义专门的 Pydantic 模型 |
| 9 | 模块级环境变更 | model_handler.py:32-33 | os.environ[...] 在 import 时执行副作用,应移入显式初始化函数 |
当前 6+ 个模块级单例在所有并发请求间共享:
document_chat_workflow → workflow.py:773
model_handler → model_handler.py:1228
model_config_loader → model_config_loader.py:144
generate_model_client → model_generate.py:825
document_chat_logger → document_chat_logger.py:31
rerank_model → rerank_model.py
建议: 核心服务保持单例但确保无请求级可变状态;工作流实例考虑改为工厂函数按需创建。
至少 8 处使用函数体内 import 来避免循环导入。建议:
model_handler.py 全部硬编码 ChatOpenAI,没有 Protocol 或 ABC。建议:
class LLMProvider(Protocol):
async def ainvoke(self, messages: list) -> str: ...
def stream(self, messages: list) -> Generator[str, None, None]: ...
第 1 周(P0 安全/正确性)
├── 修复 retrieval_service.py:738 运算符优先级 Bug
├── 移除硬编码 IP 地址 → 配置文件
├── 修复 prompt_loader.py 路径穿越漏洞
├── 修复异常信息泄露给客户端
└── 修复流式超时线程未回收
第 2-3 周(P1 重构)
├── 重构 model_handler.py — 数据驱动替代 15 个重复方法(减少 ~800 行)
├── 拆分 retrieval_service.py 为 4-5 个类
├── 重构 document_answer + document_modify — 模板方法模式(减少 ~100 行重复)
├── 修复 N+1 查询 → 批量查询
└── 提取 model_generate.py 重复配置加载逻辑
第 4+ 周(P2 改进)
├── state_models.py 使用 Pydantic 模型替代 Dict[str, Any]
├── 提取共享工具函数(_to_float 等)
├── Intent 使用 Enum 替代字符串
├── 魔法数字配置化
└── rerank 同步调用改 asyncio.to_thread