# 施工方案编写模块 - API 总览与新增功能步骤 ## 一、模块架构 ``` views/construction_write/ ← API 视图层(路由入口) ├── outline_views.py ← 大纲生成(SSE)+ 上下文生成(SSE)+ 查询接口 ├── regenerate_views.py ← 重新生成大纲(SSE) ├── content_completion.py ← 上下文内容生成(SSE) ├── task_cancel_views.py ← 任务取消 + 任务状态查询 ├── similar_plan_recommend.py ← 相似片段检索 API 路由 └── 相似度推荐.md ← 原始需求文档 core/construction_write/ ← 核心业务层 ├── workflows/outline_workflow.py ← LangGraph 工作流(含 recommend_similar_fragments 节点) ├── component/outline_generator.py ← 各节点业务实现 ├── component/similar_fragment_service.py ← 相似片段检索业务编排 └── component/state_models.py ← 状态模型定义 ``` **统一路由前缀**: `/sgbx`,标签: `["施工方案编写"]` --- ## 二、现有 API 清单 | 接口 | 方法 | 文件 | 响应类型 | 说明 | |------|------|------|----------|------| | `/sgbx/generating_outline` | POST | outline_views.py | SSE | 流式大纲生成(Celery 异步任务 + 进度轮询) | | `/sgbx/regenerate_outline` | POST | regenerate_views.py | SSE | 流式重新生成大纲(基于已有任务调整) | | `/sgbx/content_completion` | POST | content_completion.py | SSE | 流式上下文内容生成(续写/扩写/润色/补全) | | `/sgbx/task_cancel` | POST | task_cancel_views.py | JSON | 取消正在执行的生成任务 | | `/sgbx/task_status` | GET | task_cancel_views.py | JSON | 查询任务状态 | | `/sgbx/active_tasks` | GET | outline_views.py | JSON | 获取活跃任务列表 | | `/sgbx/context_generate` | POST | outline_views.py | SSE | 上下文生成(简化版,内置于 outline_views) | | `/sgbx/context_generate_health` | GET | outline_views.py | JSON | 上下文生成健康检查 | | `/sgbx/context_generate_modes` | GET | outline_views.py | JSON | 获取生成模式列表 | | `/sgbx/context_generate_api_status` | GET | outline_views.py | JSON | API 配置状态 | | `/sgbx/content_completion_health` | GET | content_completion.py | JSON | 内容生成健康检查 | | `/sgbx/content_completion_modes` | GET | content_completion.py | JSON | 获取内容生成模式列表 | | `/sgbx/content_completion_api_status` | GET | content_completion.py | JSON | 内容生成 API 状态 | --- ## 三、通用模式总结 开发新接口时应遵循以下模式: ### 3.1 请求/响应模型 - 使用 Pydantic `BaseModel` + `Field` 定义,带 `description` 和 `example` - 统一响应格式:`{"code": int, "message": str, "data": ...}` ### 3.2 SSE 流式接口(长耗时操作) ``` 请求 → 生成 callback_task_id → 建立 SSE 连接 → 提交 Celery 任务 → 轮询进度 → 返回结果 ``` - 事件类型: `connected` → `processing` → `completed`/`failed`/`cancelled` - 使用 `unified_sse_manager` 管理连接,`progress_manager` 管理进度 - 使用 `workflow_manager` 提交和查询 Celery 任务 - 支持 Redis `terminate:{task_id}` 取消机制 ### 3.3 简单 POST 接口(即时返回) - 使用 `@auto_trace` 装饰器 - 直接返回 Pydantic 响应模型 ### 3.4 Health/Modes/Status 辅助接口 - 每个功能模块都有对应的 `_health`、`_modes`、`_api_status` 接口 ### 3.5 API 调用配置 - 项目使用阿里云 DashScope (qwen3-30b-a3b-instruct-2507) - `CustomAPIConfig` 类封装地址、Key、模型 - 全局 HTTP 连接池 (`aiohttp.TCPConnector`) + Redis 连接池 --- ## 四、新增功能:相似片段检索接口 ### 4.1 需求来源 文件: [相似度推荐.md](相似度推荐.md) ### 4.2 涉及数据表 本项目 Milvus 向量库中父/子文档集合命名规范已在代码中确立: | 业务表 | Milvus Collection | 用途 | 代码出处 | |--------|-------------------|------|---------| | `t_kngs_construction_plan_parent`(父表) | `rag_parent_hybrid` | 父文档集合,`text` 为最终返回的相似片段内容 | 编写服务相似片段检索模块 | | `t_kngs_construction_plan_child`(子表) | `rag_children_hybrid` | 子文档集合,用于初步召回检索 | `foundation/ai/rag/retrieval/retrieval.py:228` | **向量库**: Milvus,数据库名 `lq_db`(默认配置 `MILVUS_DB=lq_db`) 两表共享相同字段结构:`pk`(主键)、`text`(内容)、`dense`(向量列)、`sparse`(BM25)、`chapter_title`(章节标题)、`parent_id`(父段ID)、`file_name`(文件名称)、`document_id`(文档ID) 等。 > **parent_id 关联关系**: 子表的 `parent_id` 字段对应父表的 `parent_id` 字段(非 pk),通过 `parent_id == xxx` 进行条件查询。已在 `parent_tool.py` 的 `fetch_parent_chunks_by_parent_id()` 函数中验证此关联方式。 ### 4.3 检索流程 ``` 用户输入: 一级标题 + 二级标题 + chapter_id + project_id + search_text ↓ 1. 子表混合搜索 top 30(hybrid_search 内部自动向量化 + BM25) ↓ 2. 按子表 `chapter_level_1`、`chapter_level_2` 字段精确匹配过滤 ↓ 3. 按 parent_id 统计频次,重复多的排最前 ↓ 4. 通过高频 parent_id 关联父表查询(condition_query) ↓ 5. 返回 top 5,包含: 一级标题、二级标题、chapter_id、project_id、text、file_name、相似度百分比 ``` > **project_id 说明**: 仅作为透传字段(前端传入 → 接口回传),告知前端该推荐是针对哪个方案的,不参与任何向量库检索或过滤逻辑。 --- ## 五、开发步骤 ### 步骤1:定义 Pydantic 模型 **文件**: `views/construction_write/similar_plan_recommend.py` #### 请求模型 ```python class SimilarFragmentSearchRequest(BaseModel): title_level_1: Optional[str] = Field(None, description="一级标题展示文本", example="施工工艺技术") title_level_2: Optional[str] = Field(None, description="二级标题展示文本", example="主要施工方法概述") chapter_level_1: str = Field(..., description="一级章节类型,需与向量库字段匹配", example="technology") chapter_level_2: str = Field(..., description="二级章节类型,需与向量库字段匹配", example="MethodsOverview") chapter_id: str = Field(..., description="当前章节ID") project_id: str = Field(..., description="方案ID") sgbx_code: Optional[str] = Field(None, description="施工编写章节编码") search_text: str = Field(..., description="用户输入的检索信息") ``` #### 响应模型 ```python class SimilarFragmentItem(BaseModel): chapter_level_1: str = Field(..., description="一级标题(从父表 chapter_title 解析或 chapter_level_1 字段获取)") chapter_level_2: str = Field(..., description="二级标题(从父表 chapter_title 解析或 chapter_level_2 字段获取)") chapter_id: str = Field(..., description="请求传入的章节ID,原样回传(不参与检索)") project_id: str = Field(..., description="请求传入的方案ID,原样回传(不参与检索)") text: str = Field(..., description="相似片段内容(父表按 pk 排序后拼接的完整 text)") file_name: str = Field(..., description="文件名称(父表 file_name 字段)") similarity_percent: float = Field(..., description="相似度百分比(来自子表混合搜索的 similarity,按 parent_id 取最大值)", ge=0.0, le=100.0) class SimilarFragmentSearchResponse(BaseModel): code: int message: str data: List[SimilarFragmentItem] = Field(default_factory=list) ``` ### 步骤2:实现章节字段精确匹配 **文件**: `views/construction_write/similar_plan_recommend.py` **函数**: `chapter_fields_match(row, level1, level2) -> bool` 当前接口以向量库字段 `chapter_level_1`、`chapter_level_2` 为准,要求与请求中的同名字段精确相等。 匹配逻辑: 1. 读取子表召回结果 metadata 中的 `chapter_level_1` 2. 读取子表召回结果 metadata 中的 `chapter_level_2` 3. 两者分别与请求参数 `chapter_level_1`、`chapter_level_2` 去空白后精确相等才保留 4. 若向量库集合缺少这两个字段,则不回退到无章节过滤,直接返回空结果并记录日志 返回值:是否匹配,用于保证推荐片段与当前章节类型一致。 --- ### 步骤3:实现检索核心逻辑 **文件**: `views/construction_write/similar_plan_recommend.py` 复用项目已有的向量检索组件: ```python from foundation.database.base.vector.milvus_vector import MilvusVectorManager from core.construction_write.component.milvus import MilvusManager, MilvusConfig ``` #### 3.1 子表混合搜索 **重要**:需求仅要求"向量化后检索30条,按 parent_id 频次排序",**不需要 LLM 重排序**。应使用 `MilvusVectorManager.hybrid_search()`,而非 `MilvusVectorManager.hybrid_search()`。后者内部会调用 rerank 模型,引入不必要的延迟。 ```python # 获取 MilvusVectorManager 单例(项目已有全局实例) from foundation.database.base.vector.milvus_vector import MilvusVectorManager vector_manager = MilvusVectorManager() results = vector_manager.hybrid_search( param={'collection_name': 'rag_children_hybrid'}, query_text=search_text, top_k=30, ranker_type="weighted", dense_weight=0.7, sparse_weight=0.3, ) ``` **返回结果结构**(`hybrid_search` 直接格式化后的 dict): ```python { 'id': 12345, # doc.metadata.get('pk') 'text_content': '段落内容...', # doc.page_content 'metadata': { # LangChain Document.metadata,扁平dict 'pk': 12345, 'parent_id': 'xxx-xxx-xxx', 'chapter_title': '第八章验收要求->8.3 验收内容', 'file_name': 'xxx.docx', 'document_id': 'doc-001', # 对应上传文档ID 'chapter_level_1': '验收要求', # 入库时单独存储的字段 'chapter_level_2': '8.3 验收内容', # 注意:如果入库时某个字段值是dict类型,_process_metadata会将其序列化为JSON字符串 }, 'similarity': 0.85, # similarity = 1 / (1 + score) 'distance': 0.15, # 混合搜索加权得分 } ``` > **metadata 字段说明**:`hybrid_search` 返回的 `metadata` 是 `doc.metadata` 原样返回(见 `milvus_vector.py:517`)。入库时经过 `_process_metadata` 处理,dict 类型值会被序列化为 JSON 字符串,list 类型 hierarchy 会被转为 `" > "` 连接的字符串。因此 `parent_id`、`chapter_title`、`file_name`、`document_id` 等基础字段都是**直接的字符串/整数值**,无需二次 JSON 解析。 #### 3.2 章节字段过滤 + parent_id 排序 ```python # 1. 过滤:只保留 chapter_level_1/chapter_level_2 与请求参数精确匹配的结果 filtered = [] for r in results: meta = r['metadata'] if ( meta.get('chapter_level_1', '').strip() == level1.strip() and meta.get('chapter_level_2', '').strip() == level2.strip() ): filtered.append(r) # 3. 边界处理:若 filtered 为空,直接返回空结果,不能回退到全部结果 if not filtered: logger.warning("章节字段匹配无结果") return [] # 4. 按 parent_id 统计频次,重复多的排最前 from collections import Counter pid_counts = Counter(r['metadata'].get('parent_id', '') for r in filtered) # 5. 取 top parent_id 列表(覆盖足够多的父表记录,如取 15 个) top_pids = [pid for pid, _ in pid_counts.most_common(15)] ``` #### 3.3 父表查询 父表集合名: `rag_parent_hybrid` 由于父表查询是基于 `parent_id` 的精确条件查询(非向量搜索),应使用 `MilvusManager.condition_query()` 方法(基于 `pymilvus.MilvusClient`): ```python from core.construction_write.component.milvus import MilvusManager, MilvusConfig milvus_mgr = MilvusManager(MilvusConfig()) # 自动读取配置连接 lq_db ``` **查询策略**:遍历 `top_pids`,对每个 `parent_id` 查询父表所有记录,拼接 `text` 字段为完整片段。 ```python parent_results = [] for pid in top_pids: rows = milvus_mgr.condition_query( collection_name="rag_parent_hybrid", filter=f"parent_id == '{pid}'", output_fields=["pk", "text", "parent_id", "file_name", "chapter_title", "chapter_level_1", "chapter_level_2"], limit=1000, # 足够大以获取该 parent_id 下的所有片段 ) if not rows: continue # 按 pk 排序(与 parent_tool.py 中一致) sorted_rows = sorted(rows, key=lambda x: x.get('pk', 0)) # 拼接 text 为完整片段 full_text = "\n".join(r.get('text', '') for r in sorted_rows) # 从首条记录提取 chapter_title 和元数据 first_row = sorted_rows[0] chapter_title = first_row.get('chapter_title', '') file_name = first_row.get('file_name', '') # 解析一级/二级标题 matched, parsed_l1, parsed_l2 = match_chapter_title("", "", chapter_title) parent_results.append({ 'text': full_text, 'file_name': file_name, 'chapter_title': chapter_title, 'chapter_level_1': parsed_l1 or first_row.get('chapter_level_1', ''), 'chapter_level_2': parsed_l2 or first_row.get('chapter_level_2', ''), 'parent_id': pid, 'parent_count': pid_counts.get(pid, 0), # 子表中的出现次数 }) # 按 parent_count 降序排列,取前 5 条 parent_results.sort(key=lambda x: x['parent_count'], reverse=True) final_results = parent_results[:5] ``` #### 3.4 相似度百分比计算 父表通过 `condition_query` 查询不返回向量距离/相似度分数,因此相似度来自**步骤 3.1 子表混合搜索的 similarity 字段**: ```python # 在步骤 3.2 中记录每个 parent_id 对应的最大子表相似度 pid_max_similarity = {} for r in filtered: pid = r['metadata'].get('parent_id', '') sim = r.get('similarity', 0.0) if pid not in pid_max_similarity or sim > pid_max_similarity[pid]: pid_max_similarity[pid] = sim # 在组装最终结果时使用 similarity_percent = round(pid_max_similarity.get(pid, 0.0) * 100, 2) ``` --- ### 步骤4:去重与边界情况处理 #### 4.1 父表去重 父表按单个 `parent_id` 查询可能返回多条记录(同一 `parent_id` 对应多个 chunk)。上述步骤 3.3 中已通过 **按 pk 排序后拼接 text** 的方式合并为一条完整片段,天然去重。 若同一 `parent_id` 在 `top_pids` 中出现多次(不可能,因为 Counter 的 key 唯一),无需额外处理。 #### 4.2 边界情况 | 场景 | 处理策略 | |------|---------| | 子表检索结果为空 | 返回空列表,记录日志 `{"code": 200, "message": "未找到相似片段", "data": []}` | | 章节字段过滤后为空 | 返回空列表,不回退到无章节限制,避免推荐无关章节内容 | | 父表查询结果为空 | 返回空列表,记录日志 `{"code": 200, "message": "标题匹配但无父表记录", "data": []}` | | 父表记录不足 5 条 | 返回实际条数,不做填充 | | 搜索文本为空或过短(< 3字) | 参数校验拦截,返回 400 `{"code": 400, "message": "检索信息过短"}` | | 搜索文本超长 | 不截断,交由向量化模型处理(模型自带限制) | --- ### 步骤5:组装 API 接口 #### 5.1 创建路由 ```python similar_fragment_router = APIRouter(prefix="/sgbx", tags=["施工方案编写"]) ``` #### 5.2 POST 接口 ``` POST /sgbx/similar_fragment_search ``` **完整处理流程**: 1. 参数校验(`chapter_level_1`、`chapter_level_2`、`search_text` 必填,`search_text` 长度 ≥ 3) 2. `@auto_trace` 装饰器生成 trace_id 3. 子表混合搜索 top 30(调用 `MilvusVectorManager.hybrid_search()`,内部自动向量化) 4. `chapter_level_1`、`chapter_level_2` 精确匹配过滤 + parent_id 频次排序 5. 父表条件查询 + text 拼接(调用 `MilvusManager.condition_query()`) 6. 计算相似度百分比 + 去重 + 截取 top 5 7. 组装返回字段(chapter_id、project_id 原样回传) + 返回 `SimilarFragmentSearchResponse` **日志规范**(复用项目已有 `write_logger`): ```python logger.info(f"[{trace_id}] 相似片段检索: 一级标题={level1}, 二级标题={level2}, 检索文本={search_text[:30]}...") logger.info(f"[{trace_id}] 子表召回 {len(results)} 条, 章节字段过滤后 {len(filtered)} 条, 父表最终返回 {len(final_results)} 条") ``` #### 5.3 辅助接口 ``` GET /sgbx/similar_fragment_search_health ``` 返回示例: ```python { "status": "healthy", "vector_db": "Milvus (lq_db)", "child_collection": "rag_children_hybrid", "parent_collection": "rag_parent_hybrid", "model": "qwen3-30b-a3b-instruct-2507" } ``` --- ### 步骤6:路由注册 参照现有模块的路由注册方式,在应用入口文件中 include: ```python from views.construction_write.similar_plan_recommend import similar_fragment_router app.include_router(similar_fragment_router) ``` --- ## 六、关键技术细节 ### 6.1 复用组件映射 | 功能 | 已有组件 | 复用方式 | |------|---------|---------| | 文本向量化 | `MilvusVectorManager.text_to_vector()` | 已封装在 `hybrid_search()` 内部 | | 子表混合检索 | `MilvusVectorManager.hybrid_search()` | rag_children_hybrid top 30(无重排序) | | 父表条件查询 | `MilvusManager.condition_query()` | rag_parent_hybrid 按 parent_id 精确查询 | | 日志 | `write_logger` / `server_logger` | 与现有文件一致 | | Trace | `@auto_trace(generate_if_missing=True)` | 装饰器 | | 配置 | `config_handler` | 读取 hybrid_search 权重等 | | MilvusConfig | `core.construction_write.component.milvus` | 复用已有 MilvusManager 初始化 | | VectorManager 实例 | `MilvusVectorManager()` | 通过编写服务轻量向量检索适配器获取实例 | ### 6.2 两种 Milvus 客户端区分 项目中有两套 Milvus 访问方式,开发时需明确区分: | 客户端类 | 底层库 | 适用场景 | 本功能中使用的方法 | |---------|--------|---------|-----------------| | `MilvusVectorManager` | langchain-milvus (`Milvus`) | 向量相似度搜索(dense + BM25) | `hybrid_search()` — 子表 top 30 | | `MilvusManager` | pymilvus (`MilvusClient`) | 精确条件查询(filter 过滤) | `condition_query()` — 父表按 parent_id 查询 | > **注意**:不要使用 `MilvusVectorManager.hybrid_search()`,该方法内部会调用 rerank 模型进行二次重排序,不符合本需求"只按 parent_id 频次排序"的语义,且会引入不必要的 LLM 调用延迟。 ### 6.3 metadata 结构说明 `MilvusVectorManager.hybrid_search()` 返回的 `metadata` 是 `doc.metadata` 原样返回(见 `milvus_vector.py:517`)。入库时经过 `_process_metadata` 处理: - `list` 类型字段(如 `hierarchy`)被转为 `" > "` 连接的字符串 - `dict` 类型字段被序列化为 JSON 字符串 - `None` 被替换为 `""` 因此 `parent_id`、`chapter_title`、`file_name`、`document_id` 等基础字段都是**直接的字符串/整数值**,可直接通过 `r['metadata'].get('parent_id')` 访问,无需二次 JSON 解析。 ### 6.4 chapter_id 字段映射 请求中的 `chapter_id` 与向量库字段关系**需要确认**: - 向量库表结构中没有 `chapter_id` 字段 - `chapter_id` 可能是业务层的章节标识,与向量库的 `document_id` 或 `pk` 无关 - **当前方案**:`chapter_id` 仅作为请求回传字段,不参与向量库检索过滤。如果后续需要按章节过滤,需确认 `chapter_id` 与向量库哪个字段对应 ### 6.4 相似度百分比计算 子表 `hybrid_search` 返回的 `similarity = 1 / (1 + score)`,按 parent_id 取最大值后转换为百分比: ```python # 记录每个 parent_id 对应的最大子表相似度 pid_max_similarity[pid] = max(similarity for r in results_with_same_pid) # 最终 similarity_percent = round(pid_max_similarity.get(pid, 0.0) * 100, 2) ``` 父表通过 `condition_query` 不返回向量距离,所以**不使用父表单独计算相似度**,而是继承子表召回时的相似度分数。 ### 6.5 性能要求 - 整个链路(向量化 → 子表检索 → 章节字段过滤 → 父表查询 → 返回)应在 **2秒内** 完成 - 向量化为本地模型调用,约 50-200ms - 子表混合搜索约 100-500ms - 父表条件查询(最多15个 parent_id × 1次查询)约 50-200ms - 应用层处理(过滤、排序、拼接)约 10-50ms ### 6.6 错误处理 | 场景 | HTTP 状态码 | 处理 | |------|------------|------| | 参数缺失 | 400 | `{"code": 400, "message": "参数缺失: search_text"}` | | 向量模型异常 | 500 | 记录日志,返回错误信息 | | Milvus 连接异常 | 500 | 记录日志,返回错误信息 | | 无匹配结果 | 200 | `{"code": 200, "message": "success", "data": []}` | --- ## 七、开发优先级 | 优先级 | 步骤 | 说明 | |--------|------|------| | P0 | 步骤1 | 定义 Pydantic 请求/响应模型 | | P0 | 步骤2 | `chapter_level_1`、`chapter_level_2` 精确匹配过滤 | | P0 | 步骤3 | 子表混合搜索(hybrid_search)+ 章节字段过滤 + parent_id 排序 + 父表查询 | | P0 | 步骤4 | 去重逻辑 + 边界情况处理 | | P0 | 步骤5 | POST 接口 + health 辅助接口 | | P1 | 步骤6 | 路由注册到应用入口 | | P1 | 待确认项 | `chapter_id` 与向量库字段映射关系(见 6.4 节) | | P2 | 错误处理完善 | Milvus 异常兜底、空结果处理 | | P2 | 接口测试 | 验证完整检索链路 + 各边界场景 |