# 施工方案编写模块 - API 总览与新增功能步骤
views/construction_write/ ← API 视图层(路由入口)
├── outline_views.py ← 大纲生成(SSE)+ 上下文生成(SSE)+ 查询接口
├── regenerate_views.py ← 重新生成大纲(SSE)
├── content_completion.py ← 上下文内容生成(SSE)
├── task_cancel_views.py ← 任务取消 + 任务状态查询
├── similar_plan_recommend.py ← 相似片段检索 API 路由
└── 相似度推荐.md ← 原始需求文档
core/construction_write/ ← 核心业务层
├── workflows/outline_workflow.py ← LangGraph 工作流(含 recommend_similar_fragments 节点)
├── component/outline_generator.py ← 各节点业务实现
├── component/similar_fragment_service.py ← 相似片段检索业务编排
└── component/state_models.py ← 状态模型定义
统一路由前缀: /sgbx,标签: ["施工方案编写"]
| 接口 | 方法 | 文件 | 响应类型 | 说明 |
|---|---|---|---|---|
/sgbx/generating_outline |
POST | outline_views.py | SSE | 流式大纲生成(Celery 异步任务 + 进度轮询) |
/sgbx/regenerate_outline |
POST | regenerate_views.py | SSE | 流式重新生成大纲(基于已有任务调整) |
/sgbx/content_completion |
POST | content_completion.py | SSE | 流式上下文内容生成(续写/扩写/润色/补全) |
/sgbx/task_cancel |
POST | task_cancel_views.py | JSON | 取消正在执行的生成任务 |
/sgbx/task_status |
GET | task_cancel_views.py | JSON | 查询任务状态 |
/sgbx/active_tasks |
GET | outline_views.py | JSON | 获取活跃任务列表 |
/sgbx/context_generate |
POST | outline_views.py | SSE | 上下文生成(简化版,内置于 outline_views) |
/sgbx/context_generate_health |
GET | outline_views.py | JSON | 上下文生成健康检查 |
/sgbx/context_generate_modes |
GET | outline_views.py | JSON | 获取生成模式列表 |
/sgbx/context_generate_api_status |
GET | outline_views.py | JSON | API 配置状态 |
/sgbx/content_completion_health |
GET | content_completion.py | JSON | 内容生成健康检查 |
/sgbx/content_completion_modes |
GET | content_completion.py | JSON | 获取内容生成模式列表 |
/sgbx/content_completion_api_status |
GET | content_completion.py | JSON | 内容生成 API 状态 |
开发新接口时应遵循以下模式:
BaseModel + Field 定义,带 description 和 example{"code": int, "message": str, "data": ...}请求 → 生成 callback_task_id → 建立 SSE 连接 → 提交 Celery 任务 → 轮询进度 → 返回结果
connected → processing → completed/failed/cancelledunified_sse_manager 管理连接,progress_manager 管理进度workflow_manager 提交和查询 Celery 任务terminate:{task_id} 取消机制@auto_trace 装饰器_health、_modes、_api_status 接口CustomAPIConfig 类封装地址、Key、模型aiohttp.TCPConnector) + Redis 连接池文件: 相似度推荐.md
本项目 Milvus 向量库中父/子文档集合命名规范已在代码中确立:
| 业务表 | Milvus Collection | 用途 | 代码出处 |
|---|---|---|---|
t_kngs_construction_plan_parent(父表) |
rag_parent_hybrid |
父文档集合,text 为最终返回的相似片段内容 |
编写服务相似片段检索模块 |
t_kngs_construction_plan_child(子表) |
rag_children_hybrid |
子文档集合,用于初步召回检索 | foundation/ai/rag/retrieval/retrieval.py:228 |
向量库: Milvus,数据库名 lq_db(默认配置 MILVUS_DB=lq_db)
两表共享相同字段结构:pk(主键)、text(内容)、dense(向量列)、sparse(BM25)、chapter_title(章节标题)、parent_id(父段ID)、file_name(文件名称)、document_id(文档ID) 等。
parent_id 关联关系: 子表的
parent_id字段对应父表的parent_id字段(非 pk),通过parent_id == xxx进行条件查询。已在parent_tool.py的fetch_parent_chunks_by_parent_id()函数中验证此关联方式。
用户输入: 一级标题 + 二级标题 + chapter_id + project_id + search_text
↓
1. 子表混合搜索 top 30(hybrid_search 内部自动向量化 + BM25)
↓
2. 按子表 `chapter_level_1`、`chapter_level_2` 字段精确匹配过滤
↓
3. 按 parent_id 统计频次,重复多的排最前
↓
4. 通过高频 parent_id 关联父表查询(condition_query)
↓
5. 返回 top 5,包含: 一级标题、二级标题、chapter_id、project_id、text、file_name、相似度百分比
project_id 说明: 仅作为透传字段(前端传入 → 接口回传),告知前端该推荐是针对哪个方案的,不参与任何向量库检索或过滤逻辑。
文件: views/construction_write/similar_plan_recommend.py
class SimilarFragmentSearchRequest(BaseModel):
title_level_1: Optional[str] = Field(None, description="一级标题展示文本", example="施工工艺技术")
title_level_2: Optional[str] = Field(None, description="二级标题展示文本", example="主要施工方法概述")
chapter_level_1: str = Field(..., description="一级章节类型,需与向量库字段匹配", example="technology")
chapter_level_2: str = Field(..., description="二级章节类型,需与向量库字段匹配", example="MethodsOverview")
chapter_id: str = Field(..., description="当前章节ID")
project_id: str = Field(..., description="方案ID")
sgbx_code: Optional[str] = Field(None, description="施工编写章节编码")
search_text: str = Field(..., description="用户输入的检索信息")
class SimilarFragmentItem(BaseModel):
chapter_level_1: str = Field(..., description="一级标题(从父表 chapter_title 解析或 chapter_level_1 字段获取)")
chapter_level_2: str = Field(..., description="二级标题(从父表 chapter_title 解析或 chapter_level_2 字段获取)")
chapter_id: str = Field(..., description="请求传入的章节ID,原样回传(不参与检索)")
project_id: str = Field(..., description="请求传入的方案ID,原样回传(不参与检索)")
text: str = Field(..., description="相似片段内容(父表按 pk 排序后拼接的完整 text)")
file_name: str = Field(..., description="文件名称(父表 file_name 字段)")
similarity_percent: float = Field(..., description="相似度百分比(来自子表混合搜索的 similarity,按 parent_id 取最大值)", ge=0.0, le=100.0)
class SimilarFragmentSearchResponse(BaseModel):
code: int
message: str
data: List[SimilarFragmentItem] = Field(default_factory=list)
文件: views/construction_write/similar_plan_recommend.py
函数: chapter_fields_match(row, level1, level2) -> bool
当前接口以向量库字段 chapter_level_1、chapter_level_2 为准,要求与请求中的同名字段精确相等。
匹配逻辑:
chapter_level_1chapter_level_2chapter_level_1、chapter_level_2 去空白后精确相等才保留返回值:是否匹配,用于保证推荐片段与当前章节类型一致。
文件: views/construction_write/similar_plan_recommend.py
复用项目已有的向量检索组件:
from foundation.database.base.vector.milvus_vector import MilvusVectorManager
from core.construction_write.component.milvus import MilvusManager, MilvusConfig
重要:需求仅要求"向量化后检索30条,按 parent_id 频次排序",不需要 LLM 重排序。应使用 MilvusVectorManager.hybrid_search(),而非 MilvusVectorManager.hybrid_search()。后者内部会调用 rerank 模型,引入不必要的延迟。
# 获取 MilvusVectorManager 单例(项目已有全局实例)
from foundation.database.base.vector.milvus_vector import MilvusVectorManager
vector_manager = MilvusVectorManager()
results = vector_manager.hybrid_search(
param={'collection_name': 'rag_children_hybrid'},
query_text=search_text,
top_k=30,
ranker_type="weighted",
dense_weight=0.7,
sparse_weight=0.3,
)
返回结果结构(hybrid_search 直接格式化后的 dict):
{
'id': 12345, # doc.metadata.get('pk')
'text_content': '段落内容...', # doc.page_content
'metadata': { # LangChain Document.metadata,扁平dict
'pk': 12345,
'parent_id': 'xxx-xxx-xxx',
'chapter_title': '第八章验收要求->8.3 验收内容',
'file_name': 'xxx.docx',
'document_id': 'doc-001', # 对应上传文档ID
'chapter_level_1': '验收要求', # 入库时单独存储的字段
'chapter_level_2': '8.3 验收内容',
# 注意:如果入库时某个字段值是dict类型,_process_metadata会将其序列化为JSON字符串
},
'similarity': 0.85, # similarity = 1 / (1 + score)
'distance': 0.15, # 混合搜索加权得分
}
metadata 字段说明:
hybrid_search返回的metadata是doc.metadata原样返回(见milvus_vector.py:517)。入库时经过_process_metadata处理,dict 类型值会被序列化为 JSON 字符串,list 类型 hierarchy 会被转为" > "连接的字符串。因此parent_id、chapter_title、file_name、document_id等基础字段都是直接的字符串/整数值,无需二次 JSON 解析。
# 1. 过滤:只保留 chapter_level_1/chapter_level_2 与请求参数精确匹配的结果
filtered = []
for r in results:
meta = r['metadata']
if (
meta.get('chapter_level_1', '').strip() == level1.strip()
and meta.get('chapter_level_2', '').strip() == level2.strip()
):
filtered.append(r)
# 3. 边界处理:若 filtered 为空,直接返回空结果,不能回退到全部结果
if not filtered:
logger.warning("章节字段匹配无结果")
return []
# 4. 按 parent_id 统计频次,重复多的排最前
from collections import Counter
pid_counts = Counter(r['metadata'].get('parent_id', '') for r in filtered)
# 5. 取 top parent_id 列表(覆盖足够多的父表记录,如取 15 个)
top_pids = [pid for pid, _ in pid_counts.most_common(15)]
父表集合名: rag_parent_hybrid
由于父表查询是基于 parent_id 的精确条件查询(非向量搜索),应使用 MilvusManager.condition_query() 方法(基于 pymilvus.MilvusClient):
from core.construction_write.component.milvus import MilvusManager, MilvusConfig
milvus_mgr = MilvusManager(MilvusConfig()) # 自动读取配置连接 lq_db
查询策略:遍历 top_pids,对每个 parent_id 查询父表所有记录,拼接 text 字段为完整片段。
parent_results = []
for pid in top_pids:
rows = milvus_mgr.condition_query(
collection_name="rag_parent_hybrid",
filter=f"parent_id == '{pid}'",
output_fields=["pk", "text", "parent_id", "file_name", "chapter_title", "chapter_level_1", "chapter_level_2"],
limit=1000, # 足够大以获取该 parent_id 下的所有片段
)
if not rows:
continue
# 按 pk 排序(与 parent_tool.py 中一致)
sorted_rows = sorted(rows, key=lambda x: x.get('pk', 0))
# 拼接 text 为完整片段
full_text = "\n".join(r.get('text', '') for r in sorted_rows)
# 从首条记录提取 chapter_title 和元数据
first_row = sorted_rows[0]
chapter_title = first_row.get('chapter_title', '')
file_name = first_row.get('file_name', '')
# 解析一级/二级标题
matched, parsed_l1, parsed_l2 = match_chapter_title("", "", chapter_title)
parent_results.append({
'text': full_text,
'file_name': file_name,
'chapter_title': chapter_title,
'chapter_level_1': parsed_l1 or first_row.get('chapter_level_1', ''),
'chapter_level_2': parsed_l2 or first_row.get('chapter_level_2', ''),
'parent_id': pid,
'parent_count': pid_counts.get(pid, 0), # 子表中的出现次数
})
# 按 parent_count 降序排列,取前 5 条
parent_results.sort(key=lambda x: x['parent_count'], reverse=True)
final_results = parent_results[:5]
父表通过 condition_query 查询不返回向量距离/相似度分数,因此相似度来自步骤 3.1 子表混合搜索的 similarity 字段:
# 在步骤 3.2 中记录每个 parent_id 对应的最大子表相似度
pid_max_similarity = {}
for r in filtered:
pid = r['metadata'].get('parent_id', '')
sim = r.get('similarity', 0.0)
if pid not in pid_max_similarity or sim > pid_max_similarity[pid]:
pid_max_similarity[pid] = sim
# 在组装最终结果时使用
similarity_percent = round(pid_max_similarity.get(pid, 0.0) * 100, 2)
父表按单个 parent_id 查询可能返回多条记录(同一 parent_id 对应多个 chunk)。上述步骤 3.3 中已通过 按 pk 排序后拼接 text 的方式合并为一条完整片段,天然去重。
若同一 parent_id 在 top_pids 中出现多次(不可能,因为 Counter 的 key 唯一),无需额外处理。
| 场景 | 处理策略 |
|---|---|
| 子表检索结果为空 | 返回空列表,记录日志 {"code": 200, "message": "未找到相似片段", "data": []} |
| 章节字段过滤后为空 | 返回空列表,不回退到无章节限制,避免推荐无关章节内容 |
| 父表查询结果为空 | 返回空列表,记录日志 {"code": 200, "message": "标题匹配但无父表记录", "data": []} |
| 父表记录不足 5 条 | 返回实际条数,不做填充 |
| 搜索文本为空或过短(< 3字) | 参数校验拦截,返回 400 {"code": 400, "message": "检索信息过短"} |
| 搜索文本超长 | 不截断,交由向量化模型处理(模型自带限制) |
similar_fragment_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"])
POST /sgbx/similar_fragment_search
完整处理流程:
chapter_level_1、chapter_level_2、search_text 必填,search_text 长度 ≥ 3)@auto_trace 装饰器生成 trace_idMilvusVectorManager.hybrid_search(),内部自动向量化)chapter_level_1、chapter_level_2 精确匹配过滤 + parent_id 频次排序MilvusManager.condition_query())SimilarFragmentSearchResponse日志规范(复用项目已有 write_logger):
logger.info(f"[{trace_id}] 相似片段检索: 一级标题={level1}, 二级标题={level2}, 检索文本={search_text[:30]}...")
logger.info(f"[{trace_id}] 子表召回 {len(results)} 条, 章节字段过滤后 {len(filtered)} 条, 父表最终返回 {len(final_results)} 条")
GET /sgbx/similar_fragment_search_health
返回示例:
{
"status": "healthy",
"vector_db": "Milvus (lq_db)",
"child_collection": "rag_children_hybrid",
"parent_collection": "rag_parent_hybrid",
"model": "qwen3-30b-a3b-instruct-2507"
}
参照现有模块的路由注册方式,在应用入口文件中 include:
from views.construction_write.similar_plan_recommend import similar_fragment_router
app.include_router(similar_fragment_router)
| 功能 | 已有组件 | 复用方式 |
|---|---|---|
| 文本向量化 | MilvusVectorManager.text_to_vector() |
已封装在 hybrid_search() 内部 |
| 子表混合检索 | MilvusVectorManager.hybrid_search() |
rag_children_hybrid top 30(无重排序) |
| 父表条件查询 | MilvusManager.condition_query() |
rag_parent_hybrid 按 parent_id 精确查询 |
| 日志 | write_logger / server_logger |
与现有文件一致 |
| Trace | @auto_trace(generate_if_missing=True) |
装饰器 |
| 配置 | config_handler |
读取 hybrid_search 权重等 |
| MilvusConfig | core.construction_write.component.milvus |
复用已有 MilvusManager 初始化 |
| VectorManager 实例 | MilvusVectorManager() |
通过编写服务轻量向量检索适配器获取实例 |
项目中有两套 Milvus 访问方式,开发时需明确区分:
| 客户端类 | 底层库 | 适用场景 | 本功能中使用的方法 |
|---|---|---|---|
MilvusVectorManager |
langchain-milvus (Milvus) |
向量相似度搜索(dense + BM25) | hybrid_search() — 子表 top 30 |
MilvusManager |
pymilvus (MilvusClient) |
精确条件查询(filter 过滤) | condition_query() — 父表按 parent_id 查询 |
注意:不要使用
MilvusVectorManager.hybrid_search(),该方法内部会调用 rerank 模型进行二次重排序,不符合本需求"只按 parent_id 频次排序"的语义,且会引入不必要的 LLM 调用延迟。
MilvusVectorManager.hybrid_search() 返回的 metadata 是 doc.metadata 原样返回(见 milvus_vector.py:517)。入库时经过 _process_metadata 处理:
list 类型字段(如 hierarchy)被转为 " > " 连接的字符串dict 类型字段被序列化为 JSON 字符串None 被替换为 ""因此 parent_id、chapter_title、file_name、document_id 等基础字段都是直接的字符串/整数值,可直接通过 r['metadata'].get('parent_id') 访问,无需二次 JSON 解析。
请求中的 chapter_id 与向量库字段关系需要确认:
chapter_id 字段chapter_id 可能是业务层的章节标识,与向量库的 document_id 或 pk 无关chapter_id 仅作为请求回传字段,不参与向量库检索过滤。如果后续需要按章节过滤,需确认 chapter_id 与向量库哪个字段对应子表 hybrid_search 返回的 similarity = 1 / (1 + score),按 parent_id 取最大值后转换为百分比:
# 记录每个 parent_id 对应的最大子表相似度
pid_max_similarity[pid] = max(similarity for r in results_with_same_pid)
# 最终
similarity_percent = round(pid_max_similarity.get(pid, 0.0) * 100, 2)
父表通过 condition_query 不返回向量距离,所以不使用父表单独计算相似度,而是继承子表召回时的相似度分数。
| 场景 | HTTP 状态码 | 处理 |
|---|---|---|
| 参数缺失 | 400 | {"code": 400, "message": "参数缺失: search_text"} |
| 向量模型异常 | 500 | 记录日志,返回错误信息 |
| Milvus 连接异常 | 500 | 记录日志,返回错误信息 |
| 无匹配结果 | 200 | {"code": 200, "message": "success", "data": []} |
| 优先级 | 步骤 | 说明 |
|---|---|---|
| P0 | 步骤1 | 定义 Pydantic 请求/响应模型 |
| P0 | 步骤2 | chapter_level_1、chapter_level_2 精确匹配过滤 |
| P0 | 步骤3 | 子表混合搜索(hybrid_search)+ 章节字段过滤 + parent_id 排序 + 父表查询 |
| P0 | 步骤4 | 去重逻辑 + 边界情况处理 |
| P0 | 步骤5 | POST 接口 + health 辅助接口 |
| P1 | 步骤6 | 路由注册到应用入口 |
| P1 | 待确认项 | chapter_id 与向量库字段映射关系(见 6.4 节) |
| P2 | 错误处理完善 | Milvus 异常兜底、空结果处理 |
| P2 | 接口测试 | 验证完整检索链路 + 各边界场景 |