相似片段检索功能步骤.md 21 KB

# 施工方案编写模块 - API 总览与新增功能步骤

一、模块架构

views/construction_write/          ← API 视图层(路由入口)
├── outline_views.py               ← 大纲生成(SSE)+ 上下文生成(SSE)+ 查询接口
├── regenerate_views.py            ← 重新生成大纲(SSE)
├── content_completion.py          ← 上下文内容生成(SSE)
├── task_cancel_views.py           ← 任务取消 + 任务状态查询
├── similar_plan_recommend.py      ← 相似片段检索 API 路由
└── 相似度推荐.md                  ← 原始需求文档

core/construction_write/           ← 核心业务层
├── workflows/outline_workflow.py  ← LangGraph 工作流(含 recommend_similar_fragments 节点)
├── component/outline_generator.py ← 各节点业务实现
├── component/similar_fragment_service.py ← 相似片段检索业务编排
└── component/state_models.py      ← 状态模型定义

统一路由前缀: /sgbx,标签: ["施工方案编写"]


二、现有 API 清单

接口 方法 文件 响应类型 说明
/sgbx/generating_outline POST outline_views.py SSE 流式大纲生成(Celery 异步任务 + 进度轮询)
/sgbx/regenerate_outline POST regenerate_views.py SSE 流式重新生成大纲(基于已有任务调整)
/sgbx/content_completion POST content_completion.py SSE 流式上下文内容生成(续写/扩写/润色/补全)
/sgbx/task_cancel POST task_cancel_views.py JSON 取消正在执行的生成任务
/sgbx/task_status GET task_cancel_views.py JSON 查询任务状态
/sgbx/active_tasks GET outline_views.py JSON 获取活跃任务列表
/sgbx/context_generate POST outline_views.py SSE 上下文生成(简化版,内置于 outline_views)
/sgbx/context_generate_health GET outline_views.py JSON 上下文生成健康检查
/sgbx/context_generate_modes GET outline_views.py JSON 获取生成模式列表
/sgbx/context_generate_api_status GET outline_views.py JSON API 配置状态
/sgbx/content_completion_health GET content_completion.py JSON 内容生成健康检查
/sgbx/content_completion_modes GET content_completion.py JSON 获取内容生成模式列表
/sgbx/content_completion_api_status GET content_completion.py JSON 内容生成 API 状态

三、通用模式总结

开发新接口时应遵循以下模式:

3.1 请求/响应模型

  • 使用 Pydantic BaseModel + Field 定义,带 descriptionexample
  • 统一响应格式:{"code": int, "message": str, "data": ...}

3.2 SSE 流式接口(长耗时操作)

请求 → 生成 callback_task_id → 建立 SSE 连接 → 提交 Celery 任务 → 轮询进度 → 返回结果
  • 事件类型: connectedprocessingcompleted/failed/cancelled
  • 使用 unified_sse_manager 管理连接,progress_manager 管理进度
  • 使用 workflow_manager 提交和查询 Celery 任务
  • 支持 Redis terminate:{task_id} 取消机制

3.3 简单 POST 接口(即时返回)

  • 使用 @auto_trace 装饰器
  • 直接返回 Pydantic 响应模型

3.4 Health/Modes/Status 辅助接口

  • 每个功能模块都有对应的 _health_modes_api_status 接口

3.5 API 调用配置

  • 项目使用阿里云 DashScope (qwen3-30b-a3b-instruct-2507)
  • CustomAPIConfig 类封装地址、Key、模型
  • 全局 HTTP 连接池 (aiohttp.TCPConnector) + Redis 连接池

四、新增功能:相似片段检索接口

4.1 需求来源

文件: 相似度推荐.md

4.2 涉及数据表

本项目 Milvus 向量库中父/子文档集合命名规范已在代码中确立:

业务表 Milvus Collection 用途 代码出处
t_kngs_construction_plan_parent(父表) rag_parent_hybrid 父文档集合,text 为最终返回的相似片段内容 编写服务相似片段检索模块
t_kngs_construction_plan_child(子表) rag_children_hybrid 子文档集合,用于初步召回检索 foundation/ai/rag/retrieval/retrieval.py:228

向量库: Milvus,数据库名 lq_db(默认配置 MILVUS_DB=lq_db

两表共享相同字段结构:pk(主键)、text(内容)、dense(向量列)、sparse(BM25)、chapter_title(章节标题)、parent_id(父段ID)、file_name(文件名称)、document_id(文档ID) 等。

parent_id 关联关系: 子表的 parent_id 字段对应父表的 parent_id 字段(非 pk),通过 parent_id == xxx 进行条件查询。已在 parent_tool.pyfetch_parent_chunks_by_parent_id() 函数中验证此关联方式。

4.3 检索流程

用户输入: 一级标题 + 二级标题 + chapter_id + project_id + search_text
  ↓
1. 子表混合搜索 top 30(hybrid_search 内部自动向量化 + BM25)
  ↓
2. 按子表 `chapter_level_1`、`chapter_level_2` 字段精确匹配过滤
  ↓
3. 按 parent_id 统计频次,重复多的排最前
  ↓
4. 通过高频 parent_id 关联父表查询(condition_query)
  ↓
5. 返回 top 5,包含: 一级标题、二级标题、chapter_id、project_id、text、file_name、相似度百分比

project_id 说明: 仅作为透传字段(前端传入 → 接口回传),告知前端该推荐是针对哪个方案的,不参与任何向量库检索或过滤逻辑。


五、开发步骤

步骤1:定义 Pydantic 模型

文件: views/construction_write/similar_plan_recommend.py

请求模型

class SimilarFragmentSearchRequest(BaseModel):
    title_level_1: Optional[str] = Field(None, description="一级标题展示文本", example="施工工艺技术")
    title_level_2: Optional[str] = Field(None, description="二级标题展示文本", example="主要施工方法概述")
    chapter_level_1: str = Field(..., description="一级章节类型,需与向量库字段匹配", example="technology")
    chapter_level_2: str = Field(..., description="二级章节类型,需与向量库字段匹配", example="MethodsOverview")
    chapter_id: str = Field(..., description="当前章节ID")
    project_id: str = Field(..., description="方案ID")
    sgbx_code: Optional[str] = Field(None, description="施工编写章节编码")
    search_text: str = Field(..., description="用户输入的检索信息")

响应模型

class SimilarFragmentItem(BaseModel):
    chapter_level_1: str = Field(..., description="一级标题(从父表 chapter_title 解析或 chapter_level_1 字段获取)")
    chapter_level_2: str = Field(..., description="二级标题(从父表 chapter_title 解析或 chapter_level_2 字段获取)")
    chapter_id: str = Field(..., description="请求传入的章节ID,原样回传(不参与检索)")
    project_id: str = Field(..., description="请求传入的方案ID,原样回传(不参与检索)")
    text: str = Field(..., description="相似片段内容(父表按 pk 排序后拼接的完整 text)")
    file_name: str = Field(..., description="文件名称(父表 file_name 字段)")
    similarity_percent: float = Field(..., description="相似度百分比(来自子表混合搜索的 similarity,按 parent_id 取最大值)", ge=0.0, le=100.0)

class SimilarFragmentSearchResponse(BaseModel):
    code: int
    message: str
    data: List[SimilarFragmentItem] = Field(default_factory=list)

步骤2:实现章节字段精确匹配

文件: views/construction_write/similar_plan_recommend.py

函数: chapter_fields_match(row, level1, level2) -> bool

当前接口以向量库字段 chapter_level_1chapter_level_2 为准,要求与请求中的同名字段精确相等。

匹配逻辑:

  1. 读取子表召回结果 metadata 中的 chapter_level_1
  2. 读取子表召回结果 metadata 中的 chapter_level_2
  3. 两者分别与请求参数 chapter_level_1chapter_level_2 去空白后精确相等才保留
  4. 若向量库集合缺少这两个字段,则不回退到无章节过滤,直接返回空结果并记录日志

返回值:是否匹配,用于保证推荐片段与当前章节类型一致。


步骤3:实现检索核心逻辑

文件: views/construction_write/similar_plan_recommend.py

复用项目已有的向量检索组件:

from foundation.database.base.vector.milvus_vector import MilvusVectorManager
from core.construction_write.component.milvus import MilvusManager, MilvusConfig

3.1 子表混合搜索

重要:需求仅要求"向量化后检索30条,按 parent_id 频次排序",不需要 LLM 重排序。应使用 MilvusVectorManager.hybrid_search(),而非 MilvusVectorManager.hybrid_search()。后者内部会调用 rerank 模型,引入不必要的延迟。

# 获取 MilvusVectorManager 单例(项目已有全局实例)
from foundation.database.base.vector.milvus_vector import MilvusVectorManager
vector_manager = MilvusVectorManager()

results = vector_manager.hybrid_search(
    param={'collection_name': 'rag_children_hybrid'},
    query_text=search_text,
    top_k=30,
    ranker_type="weighted",
    dense_weight=0.7,
    sparse_weight=0.3,
)

返回结果结构hybrid_search 直接格式化后的 dict):

{
    'id': 12345,                        # doc.metadata.get('pk')
    'text_content': '段落内容...',         # doc.page_content
    'metadata': {                         # LangChain Document.metadata,扁平dict
        'pk': 12345,
        'parent_id': 'xxx-xxx-xxx',
        'chapter_title': '第八章验收要求->8.3 验收内容',
        'file_name': 'xxx.docx',
        'document_id': 'doc-001',        # 对应上传文档ID
        'chapter_level_1': '验收要求',    # 入库时单独存储的字段
        'chapter_level_2': '8.3 验收内容',
        # 注意:如果入库时某个字段值是dict类型,_process_metadata会将其序列化为JSON字符串
    },
    'similarity': 0.85,                 # similarity = 1 / (1 + score)
    'distance': 0.15,                   # 混合搜索加权得分
}

metadata 字段说明hybrid_search 返回的 metadatadoc.metadata 原样返回(见 milvus_vector.py:517)。入库时经过 _process_metadata 处理,dict 类型值会被序列化为 JSON 字符串,list 类型 hierarchy 会被转为 " > " 连接的字符串。因此 parent_idchapter_titlefile_namedocument_id 等基础字段都是直接的字符串/整数值,无需二次 JSON 解析。

3.2 章节字段过滤 + parent_id 排序

# 1. 过滤:只保留 chapter_level_1/chapter_level_2 与请求参数精确匹配的结果
filtered = []
for r in results:
    meta = r['metadata']
    if (
        meta.get('chapter_level_1', '').strip() == level1.strip()
        and meta.get('chapter_level_2', '').strip() == level2.strip()
    ):
        filtered.append(r)

# 3. 边界处理:若 filtered 为空,直接返回空结果,不能回退到全部结果
if not filtered:
    logger.warning("章节字段匹配无结果")
    return []

# 4. 按 parent_id 统计频次,重复多的排最前
from collections import Counter
pid_counts = Counter(r['metadata'].get('parent_id', '') for r in filtered)

# 5. 取 top parent_id 列表(覆盖足够多的父表记录,如取 15 个)
top_pids = [pid for pid, _ in pid_counts.most_common(15)]

3.3 父表查询

父表集合名: rag_parent_hybrid

由于父表查询是基于 parent_id 的精确条件查询(非向量搜索),应使用 MilvusManager.condition_query() 方法(基于 pymilvus.MilvusClient):

from core.construction_write.component.milvus import MilvusManager, MilvusConfig

milvus_mgr = MilvusManager(MilvusConfig())  # 自动读取配置连接 lq_db

查询策略:遍历 top_pids,对每个 parent_id 查询父表所有记录,拼接 text 字段为完整片段。

parent_results = []
for pid in top_pids:
    rows = milvus_mgr.condition_query(
        collection_name="rag_parent_hybrid",
        filter=f"parent_id == '{pid}'",
        output_fields=["pk", "text", "parent_id", "file_name", "chapter_title", "chapter_level_1", "chapter_level_2"],
        limit=1000,  # 足够大以获取该 parent_id 下的所有片段
    )
    if not rows:
        continue
    
    # 按 pk 排序(与 parent_tool.py 中一致)
    sorted_rows = sorted(rows, key=lambda x: x.get('pk', 0))
    
    # 拼接 text 为完整片段
    full_text = "\n".join(r.get('text', '') for r in sorted_rows)
    
    # 从首条记录提取 chapter_title 和元数据
    first_row = sorted_rows[0]
    chapter_title = first_row.get('chapter_title', '')
    file_name = first_row.get('file_name', '')
    
    # 解析一级/二级标题
    matched, parsed_l1, parsed_l2 = match_chapter_title("", "", chapter_title)
    
    parent_results.append({
        'text': full_text,
        'file_name': file_name,
        'chapter_title': chapter_title,
        'chapter_level_1': parsed_l1 or first_row.get('chapter_level_1', ''),
        'chapter_level_2': parsed_l2 or first_row.get('chapter_level_2', ''),
        'parent_id': pid,
        'parent_count': pid_counts.get(pid, 0),  # 子表中的出现次数
    })

# 按 parent_count 降序排列,取前 5 条
parent_results.sort(key=lambda x: x['parent_count'], reverse=True)
final_results = parent_results[:5]

3.4 相似度百分比计算

父表通过 condition_query 查询不返回向量距离/相似度分数,因此相似度来自步骤 3.1 子表混合搜索的 similarity 字段

# 在步骤 3.2 中记录每个 parent_id 对应的最大子表相似度
pid_max_similarity = {}
for r in filtered:
    pid = r['metadata'].get('parent_id', '')
    sim = r.get('similarity', 0.0)
    if pid not in pid_max_similarity or sim > pid_max_similarity[pid]:
        pid_max_similarity[pid] = sim

# 在组装最终结果时使用
similarity_percent = round(pid_max_similarity.get(pid, 0.0) * 100, 2)

步骤4:去重与边界情况处理

4.1 父表去重

父表按单个 parent_id 查询可能返回多条记录(同一 parent_id 对应多个 chunk)。上述步骤 3.3 中已通过 按 pk 排序后拼接 text 的方式合并为一条完整片段,天然去重。

若同一 parent_idtop_pids 中出现多次(不可能,因为 Counter 的 key 唯一),无需额外处理。

4.2 边界情况

场景 处理策略
子表检索结果为空 返回空列表,记录日志 {"code": 200, "message": "未找到相似片段", "data": []}
章节字段过滤后为空 返回空列表,不回退到无章节限制,避免推荐无关章节内容
父表查询结果为空 返回空列表,记录日志 {"code": 200, "message": "标题匹配但无父表记录", "data": []}
父表记录不足 5 条 返回实际条数,不做填充
搜索文本为空或过短(< 3字) 参数校验拦截,返回 400 {"code": 400, "message": "检索信息过短"}
搜索文本超长 不截断,交由向量化模型处理(模型自带限制)

步骤5:组装 API 接口

5.1 创建路由

similar_fragment_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])

5.2 POST 接口

POST /sgbx/similar_fragment_search

完整处理流程

  1. 参数校验(chapter_level_1chapter_level_2search_text 必填,search_text 长度 ≥ 3)
  2. @auto_trace 装饰器生成 trace_id
  3. 子表混合搜索 top 30(调用 MilvusVectorManager.hybrid_search(),内部自动向量化)
  4. chapter_level_1chapter_level_2 精确匹配过滤 + parent_id 频次排序
  5. 父表条件查询 + text 拼接(调用 MilvusManager.condition_query()
  6. 计算相似度百分比 + 去重 + 截取 top 5
  7. 组装返回字段(chapter_id、project_id 原样回传) + 返回 SimilarFragmentSearchResponse

日志规范(复用项目已有 write_logger):

logger.info(f"[{trace_id}] 相似片段检索: 一级标题={level1}, 二级标题={level2}, 检索文本={search_text[:30]}...")
logger.info(f"[{trace_id}] 子表召回 {len(results)} 条, 章节字段过滤后 {len(filtered)} 条, 父表最终返回 {len(final_results)} 条")

5.3 辅助接口

GET /sgbx/similar_fragment_search_health

返回示例:

{
    "status": "healthy",
    "vector_db": "Milvus (lq_db)",
    "child_collection": "rag_children_hybrid",
    "parent_collection": "rag_parent_hybrid",
    "model": "qwen3-30b-a3b-instruct-2507"
}

步骤6:路由注册

参照现有模块的路由注册方式,在应用入口文件中 include:

from views.construction_write.similar_plan_recommend import similar_fragment_router
app.include_router(similar_fragment_router)

六、关键技术细节

6.1 复用组件映射

功能 已有组件 复用方式
文本向量化 MilvusVectorManager.text_to_vector() 已封装在 hybrid_search() 内部
子表混合检索 MilvusVectorManager.hybrid_search() rag_children_hybrid top 30(无重排序)
父表条件查询 MilvusManager.condition_query() rag_parent_hybrid 按 parent_id 精确查询
日志 write_logger / server_logger 与现有文件一致
Trace @auto_trace(generate_if_missing=True) 装饰器
配置 config_handler 读取 hybrid_search 权重等
MilvusConfig core.construction_write.component.milvus 复用已有 MilvusManager 初始化
VectorManager 实例 MilvusVectorManager() 通过编写服务轻量向量检索适配器获取实例

6.2 两种 Milvus 客户端区分

项目中有两套 Milvus 访问方式,开发时需明确区分:

客户端类 底层库 适用场景 本功能中使用的方法
MilvusVectorManager langchain-milvus (Milvus) 向量相似度搜索(dense + BM25) hybrid_search() — 子表 top 30
MilvusManager pymilvus (MilvusClient) 精确条件查询(filter 过滤) condition_query() — 父表按 parent_id 查询

注意:不要使用 MilvusVectorManager.hybrid_search(),该方法内部会调用 rerank 模型进行二次重排序,不符合本需求"只按 parent_id 频次排序"的语义,且会引入不必要的 LLM 调用延迟。

6.3 metadata 结构说明

MilvusVectorManager.hybrid_search() 返回的 metadatadoc.metadata 原样返回(见 milvus_vector.py:517)。入库时经过 _process_metadata 处理:

  • list 类型字段(如 hierarchy)被转为 " > " 连接的字符串
  • dict 类型字段被序列化为 JSON 字符串
  • None 被替换为 ""

因此 parent_idchapter_titlefile_namedocument_id 等基础字段都是直接的字符串/整数值,可直接通过 r['metadata'].get('parent_id') 访问,无需二次 JSON 解析。

6.4 chapter_id 字段映射

请求中的 chapter_id 与向量库字段关系需要确认

  • 向量库表结构中没有 chapter_id 字段
  • chapter_id 可能是业务层的章节标识,与向量库的 document_idpk 无关
  • 当前方案chapter_id 仅作为请求回传字段,不参与向量库检索过滤。如果后续需要按章节过滤,需确认 chapter_id 与向量库哪个字段对应

6.4 相似度百分比计算

子表 hybrid_search 返回的 similarity = 1 / (1 + score),按 parent_id 取最大值后转换为百分比:

# 记录每个 parent_id 对应的最大子表相似度
pid_max_similarity[pid] = max(similarity for r in results_with_same_pid)
# 最终
similarity_percent = round(pid_max_similarity.get(pid, 0.0) * 100, 2)

父表通过 condition_query 不返回向量距离,所以不使用父表单独计算相似度,而是继承子表召回时的相似度分数。

6.5 性能要求

  • 整个链路(向量化 → 子表检索 → 章节字段过滤 → 父表查询 → 返回)应在 2秒内 完成
  • 向量化为本地模型调用,约 50-200ms
  • 子表混合搜索约 100-500ms
  • 父表条件查询(最多15个 parent_id × 1次查询)约 50-200ms
  • 应用层处理(过滤、排序、拼接)约 10-50ms

6.6 错误处理

场景 HTTP 状态码 处理
参数缺失 400 {"code": 400, "message": "参数缺失: search_text"}
向量模型异常 500 记录日志,返回错误信息
Milvus 连接异常 500 记录日志,返回错误信息
无匹配结果 200 {"code": 200, "message": "success", "data": []}

七、开发优先级

优先级 步骤 说明
P0 步骤1 定义 Pydantic 请求/响应模型
P0 步骤2 chapter_level_1chapter_level_2 精确匹配过滤
P0 步骤3 子表混合搜索(hybrid_search)+ 章节字段过滤 + parent_id 排序 + 父表查询
P0 步骤4 去重逻辑 + 边界情况处理
P0 步骤5 POST 接口 + health 辅助接口
P1 步骤6 路由注册到应用入口
P1 待确认项 chapter_id 与向量库字段映射关系(见 6.4 节)
P2 错误处理完善 Milvus 异常兜底、空结果处理
P2 接口测试 验证完整检索链路 + 各边界场景