ai-chat-code-review.md 16 KB

AI 对话功能代码分析与修复建议

审查范围:AI 对话功能全链路,约 6000+ 行代码,20+ 文件 审查日期:2026-05-26


一、整体架构评价

架构分层总体合理,采用 LangGraph 状态图作为工作流引擎:

HTTP 层 (views/)
  → 工作流层 (workflows/, 16个节点)
    → 组件层 (component/: intent_recognizer, skill_dispatcher, retrieval_service, rerank_service, quality_gate)
      → 技能层 (skills/: document_answer, document_modify)
        → 基础设施层 (foundation/: model_generate, model_handler, milvus_vector)

主要结构性问题: 3 个上帝类(700-1200 行)、大量代码重复、类型安全缺失。


二、严重问题(P0 — 立即修复)

2.1 运算符优先级 Bug

文件: core/document_chat/component/retrieval_service.py:738

# 当前代码 — Python 解析为:
# filters = (filters or project.get("retrieval_filters")) if isinstance(...) else filters
# 逻辑错误
filters = filters or project.get("retrieval_filters") if isinstance(project.get("retrieval_filters"), dict) else filters

修复:

proj_filters = project.get("retrieval_filters")
if isinstance(proj_filters, dict):
    filters = filters or proj_filters

2.2 内网/公网 IP 硬编码在源码中

文件: foundation/ai/models/model_handler.py

行号 硬编码值 风险
687 http://192.168.91.253:9002/v1 内网 IP 泄露
765 http://192.168.91.253:9001/v1 内网 IP 泄露
798, 954 http://192.168.91.253:9003/v1 内网 IP 泄露
1042 http://183.220.37.46:25423/v1 公网 IP 泄露
1088 http://183.220.37.46:25424/v1 公网 IP 泄露

修复: 将所有 IP/URL 移至 config/config.ini 或环境变量,源码中仅通过配置读取。


2.3 路径穿越漏洞

文件: core/document_chat/component/prompt_loader.py:14

prompt_path = PROMPT_DIR / file_name
# file_name 含 "../" 时可读取 PROMPT_DIR 外的任意文件

修复:

prompt_path = (PROMPT_DIR / file_name).resolve()
if not str(prompt_path).startswith(str(PROMPT_DIR.resolve())):
    raise ValueError(f"非法路径: {file_name}")
if not prompt_path.exists():
    logger.warning(f"Prompt 文件不存在: {file_name}")
    return {}

2.4 内部异常信息泄露给客户端

文件: views/document_chat/views.py:270-278

except Exception as exc:
    logger.error(f"[DocumentChat] request failed: {exc}", exc_info=True)
    raise HTTPException(status_code=500, detail=str(exc))
    # str(exc) 可能包含堆栈、文件路径、数据库连接串等敏感信息

修复:

except Exception as exc:
    logger.error(f"[DocumentChat] request failed: {exc}", exc_info=True)
    raise HTTPException(status_code=500, detail="服务内部错误,请稍后重试")

SSE 路径(约 370-385 行)存在同样问题,需一并修复。


2.5 流式超时后工作线程未回收

文件: foundation/ai/agent/generate/model_generate.py:804-823

thread = threading.Thread(target=_worker, daemon=True)
thread.start()
...
except asyncio.TimeoutError:
    raise TimeoutError(...)  # daemon 线程继续运行,向废弃队列写入数据

修复: 引入 threading.Event 作为停止信号:

stop_event = threading.Event()

def _worker():
    for chunk in stream:
        if stop_event.is_set():
            break
        q.put_nowait(chunk)
    q.put_nowait(None)  # sentinel

# 超时处理
except asyncio.TimeoutError:
    stop_event.set()
    raise TimeoutError(...)

三、重要问题(P1 — 近期迭代修复)

3.1 上帝类:model_handler.py(1247 行)

问题: 15 个 _get_*_model() 方法几乎完全相同,每个 40-50 行,都是以下模板的复制:

url = self.config.get(SECTION, URL_KEY)
model_id = self.config.get(SECTION, MODEL_KEY)
api_key = self.config.get(SECTION, API_KEY_KEY)
if not all([url, model_id, api_key]): ...
if not self._check_connection(url, api_key): ...
llm = ChatOpenAI(base_url=url, model=model_id, api_key=api_key, ...)
return llm

另外 get_models()get_model_by_name() 包含完全相同的 15 分支 if/elif 分发链。

修复方案: 数据驱动 + 单一工厂方法

# 配置表
_MODEL_REGISTRY = {
    "doubao": {"section": "doubao", "url_key": "url", "model_key": "model_id", ...},
    "qwen": {"section": "qwen", "url_key": "url", "model_key": "model_id", ...},
    # ...
}

def _create_chat_model(self, config: dict) -> ChatOpenAI:
    url = self.config.get(config["section"], config["url_key"])
    model_id = self.config.get(config["section"], config["model_key"])
    api_key = self.config.get(config["section"], config["api_key_key"])
    # ... 统一校验、连接检查、构建
    return ChatOpenAI(base_url=url, model=model_id, api_key=api_key, ...)

def get_model_by_name(self, model_type: str) -> ChatOpenAI:
    config = _MODEL_REGISTRY[model_type]
    return self._create_chat_model(config)

预估收益: 减少约 800 行代码,新增模型只需加一行配置。


3.2 上帝类:retrieval_service.py(1135 行)

问题: 单个类承担 8+ 项职责:查询构建、4 路召回、RRF 融合、Scope 提取、元数据规范化、候选构建、去重、评分奖励。

修复方案: 拆分为独立职责类

新类 职责 对应原代码行
RetrievalQueryBuilder 构建查询、提取关键词 162-231, 1050-1080
RecallExecutor 4 路 Milvus 召回 233-680
RRFMerger RRF 融合 + 去重 + 奖励评分 577-635, 636-686
ScopeExtractor 提取项目范围过滤条件 728-773
CandidateFactory 构建标准化候选对象 687-723

3.3 上帝类:document_chat_workflow.py(773 行)

问题:

  • 16 个节点方法 + 路由 + 响应组装 + 错误处理全在一个类
  • general_answer_node(77 行)直接内联 LLM 调用,其他节点都委托服务类,模式不一致
  • 7 个节点开头重复 if state.get("error_message"): return {}

修复方案:

  1. general_answer_node 的 LLM 逻辑提取为 GeneralAnswerService
  2. 用装饰器统一错误传播:

    def skip_on_error(func):
    async def wrapper(self, state: DocumentChatState) -> Dict[str, Any]:
        if state.get("error_message"):
            return {}
        return await func(self, state)
    return wrapper
    

3.4 技能类 ~70% 代码重复

文件: skills/document_answer.py(154 行)与 skills/document_modify.py(159 行)

重复内容:

  • __init__ 模式相同
  • user_payload 构建逻辑相同
  • runrun_stream 各自内部重复 payload 构建 + 响应解析
  • _list_of_strings 静态方法完全相同
  • 响应解析 fallback 链相同

修复方案:base.py 中使用模板方法模式

class BaseDocumentChatSkill(ABC):
    def run(self, skill_input):
        payload = self._build_payload(skill_input)
        response = await self._call_llm(payload, skill_input)
        return self._parse_response(response, skill_input)

    def run_stream(self, skill_input, on_chunk):
        payload = self._build_payload(skill_input)
        full_text = await self._call_llm_stream(payload, skill_input, on_chunk)
        return self._parse_response(full_text, skill_input)

    @abstractmethod
    def _build_payload(self, skill_input) -> dict: ...

    @abstractmethod
    def _parse_response(self, text, skill_input) -> SkillOutput: ...

子类只需实现 _build_payload_parse_response


3.5 N+1 查询问题

文件: core/document_chat/component/retrieval_service.py:652-663

# 当前:逐个 parent_id 查询,最多 30 次串行 DB 调用
for parent_id in unique_ids[: self.config.recall_top_k]:
    parent_expr = f"parent_id == '{parent_id}'"
    rows.extend(self._condition_query(...))

修复:

# 改为批量查询
if unique_ids:
    id_list = ", ".join(f"'{pid}'" for pid in unique_ids[:self.config.recall_top_k])
    batch_expr = f"parent_id in [{id_list}]"
    rows = self._condition_query(collection, batch_expr, output_fields)

3.6 model_generate.py 4 个公共方法重复配置加载逻辑

文件: foundation/ai/agent/generate/model_generate.py

4 个方法(get_model_generate_invokeget_model_generate_invoke_syncget_model_generate_streamget_model_generate_invoke_stream)各包含 ~30 行相同的模型名解析 + thinking mode 配置代码。

修复:

def _resolve_model_and_thinking(self, function_name, model_name, enable_thinking):
    if function_name:
        config_model = get_model_for_function(function_name)
        model_name = model_name or config_model
        thinking_mode = get_thinking_mode_for_function(function_name)
    if not model_name:
        model_name = get_model_for_function("default")
    return model_name, thinking_mode, enable_thinking

四、中等问题(P2 — 后续迭代改进)

4.1 Dict[str, Any] 泛滥,类型安全缺失

文件: core/document_chat/component/state_models.py

28 个字段中 12 个是 Dict[str, Any]。Pydantic 模型已在 schemas.py 中定义但未被使用。

修复: 将 State 中的关键字段替换为具体类型:

class DocumentChatState(TypedDict, total=False):
    # 替换前
    selected_section: Dict[str, Any]
    intent_result: Optional[Dict[str, Any]]

    # 替换后
    selected_section: Optional[SelectedSection]
    intent_result: Optional[IntentResult]

4.2 工具函数重复

函数 出现位置 次数
_to_float retrieval_service.py, rerank_service.py, retrieval_quality_gate.py 3
_list_of_strings document_answer.py, document_modify.py 2
_is_server_unavailable_error model_generate.py 内部两处 2

修复: 提取到 core/document_chat/component/utils.py 共享模块。


4.3 Intent 使用原始字符串,缺乏类型约束

文件: intent_recognizer.py, skill_dispatcher.py

"document_modify", "document_answer", "clarify", "unsupported" 等字符串散落各处。

修复:

from enum import Enum

class ChatIntent(str, Enum):
    DOCUMENT_MODIFY = "document_modify"
    DOCUMENT_ANSWER = "document_answer"
    CLARIFY = "clarify"
    UNSUPPORTED = "unsupported"

4.4 魔法数字未命名/未配置化

数值 位置 含义
0.65 workflow.py:297, intent_recognizer.py:126 意图置信度阈值
0.72, 0.66 intent_recognizer.py:170,179,188,197 启发式意图置信度
6 document_answer.py:28, document_modify.py:31 历史对话截断轮数
120 workflow.py build_retrieval_query 查询最大字符数
0.70 retrieval_quality_gate.py rerank 分数阈值
4000 retrieval_quality_gate.py 引用最大总字符数

修复: 提取为命名常量或移入 YAML 配置文件。


4.5 HTTP 200 包裹错误码

文件: views/document_chat/views.py:267-269

code = 500 if data.response_type == "error" else 200
# HTTP 状态码始终 200,真实错误码在 body.code 中

问题: 破坏 HTTP 语义,影响监控、负载均衡健康检查、客户端错误处理。

修复: 根据 response_type 返回正确的 HTTP 状态码,或至少对错误返回 200 OK 但在 API 文档中明确约定(如前端已有依赖则暂不改动,新接口应遵循标准 HTTP 语义)。


4.6 Rerank 同步阻塞调用

文件: core/document_chat/component/rerank_service.py:35

raw_results = rerank_model.shutian_rerank(...)  # 同步调用,阻塞事件循环

修复:

raw_results = await asyncio.to_thread(rerank_model.shutian_rerank, ...)

4.7 Pydantic v1/v2 风格混用

文件: core/document_chat/schemas.py:37-38

class Config:           # Pydantic v1 风格
    extra = "forbid"

但代码中存在 model_dump() 调用(Pydantic v2),应统一为:

model_config = ConfigDict(extra="forbid")  # Pydantic v2 风格

4.8 缓存策略不一致

文件: foundation/ai/models/model_handler.py

  • get_models() 第 277 行:将 fallback 模型缓存到原始请求的 key(后续请求永远返回 fallback)
  • get_model_by_name() 第 368 行:将 fallback 缓存到 fallback 自己的 key(后续请求会重试原始模型)

修复: 统一策略,建议不将 fallback 缓存到原始 key,避免掩盖模型配置错误。


五、次要问题(P3 — 有机会时改进)

# 问题 文件 说明
1 conversation_context.py 仅 19 行 component/ 纯透传无逻辑,类封装无意义,改为函数或增加实际逻辑
2 llm_utils._repair_control_chars 性能差 component/llm_utils.py Python 逐字符循环,大文本慢,改用 re.sub
3 document_chat_loggergetattr 分发日志级别 component/document_chat_logger.py 可传入非日志方法名,应加白名单校验
4 state_models.pymessages 字段从未使用 component/state_models.py 死代码,应删除
5 skill_dispatcher._HANDLER_CLASSES 硬编码 component/skill_dispatcher.py 新增技能需改 3 处,考虑自动发现或注册装饰器
6 prompt_loader 文件不存在时静默返回空 component/prompt_loader.py 应至少打印 warning 日志
7 model_config_loader._load_config 异常时静默回退默认配置 foundation/ai/models/ 应让调用方感知是否在回退模式
8 引用 referencessiblingsList[Dict[str, Any]] schemas.py 若有已知结构,应定义专门的 Pydantic 模型
9 模块级环境变更 model_handler.py:32-33 os.environ[...] 在 import 时执行副作用,应移入显式初始化函数

六、全局性架构建议

6.1 减少全局可变单例

当前 6+ 个模块级单例在所有并发请求间共享:

document_chat_workflow  → workflow.py:773
model_handler           → model_handler.py:1228
model_config_loader     → model_config_loader.py:144
generate_model_client   → model_generate.py:825
document_chat_logger    → document_chat_logger.py:31
rerank_model            → rerank_model.py

建议: 核心服务保持单例但确保无请求级可变状态;工作流实例考虑改为工厂函数按需创建。

6.2 梳理循环导入

至少 8 处使用函数体内 import 来避免循环导入。建议:

  • 梳理模块依赖图,识别环
  • 通过引入接口层或调整包结构从根本上解决
  • 对必须保留的延迟导入添加注释说明原因

6.3 引入接口抽象

model_handler.py 全部硬编码 ChatOpenAI,没有 Protocol 或 ABC。建议:

class LLMProvider(Protocol):
    async def ainvoke(self, messages: list) -> str: ...
    def stream(self, messages: list) -> Generator[str, None, None]: ...

七、修复优先级路线图

第 1 周(P0 安全/正确性)
├── 修复 retrieval_service.py:738 运算符优先级 Bug
├── 移除硬编码 IP 地址 → 配置文件
├── 修复 prompt_loader.py 路径穿越漏洞
├── 修复异常信息泄露给客户端
└── 修复流式超时线程未回收

第 2-3 周(P1 重构)
├── 重构 model_handler.py — 数据驱动替代 15 个重复方法(减少 ~800 行)
├── 拆分 retrieval_service.py 为 4-5 个类
├── 重构 document_answer + document_modify — 模板方法模式(减少 ~100 行重复)
├── 修复 N+1 查询 → 批量查询
└── 提取 model_generate.py 重复配置加载逻辑

第 4+ 周(P2 改进)
├── state_models.py 使用 Pydantic 模型替代 Dict[str, Any]
├── 提取共享工具函数(_to_float 等)
├── Intent 使用 Enum 替代字符串
├── 魔法数字配置化
└── rerank 同步调用改 asyncio.to_thread