| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593 |
- """
- RAG 链路调试 API 端点
- 提供 RAG 检索链路各环节的独立调试端点,支持单步执行和链式执行。
- 移植自 utils_test/RAG_Test/rag_pipeline_web/rag_pipeline_server.py
- 端点:
- - POST /debug/rag/step — 单环节调试
- - POST /debug/rag/chain — 链式执行
- - POST /debug/rag/pipeline — 完整 RAG 链路
- - POST /debug/rag/native — Native RAG
- - POST /debug/rag/parent-child — 父子文档模式
- - POST /debug/rag/professional-review — 专业性审查
- - POST /debug/rag/init — 初始化 Milvus
- - GET /debug/rag/data — 获取最新 pipeline 数据
- """
- import asyncio
- import json
- import logging
- import os
- import time
- from typing import Any, Dict, List, Optional
- from fastapi import APIRouter, HTTPException
- from pydantic import BaseModel, Field
- from core.construction_review.component.ai_review_engine import AIReviewEngine
- from core.construction_review.component.infrastructure.milvus import MilvusConfig, MilvusManager
- from core.construction_review.component.infrastructure.parent_tool import (
- enhance_with_parent_docs_grouped,
- extract_query_pairs_results,
- )
- from core.base.task_models import TaskFileInfo
- from foundation.ai.rag.retrieval.entities_enhance import entity_enhance
- from foundation.ai.rag.retrieval.query_rewrite import query_rewrite_manager
- from foundation.ai.rag.retrieval.retrieval import retrieval_manager
- from foundation.observability.monitoring.rag import rag_monitor
- logger = logging.getLogger(__name__)
- project_root = os.path.join(os.path.dirname(__file__), "..", "..")
- # ============ 懒加载全局实例 ============
- _milvus_manager: Optional[MilvusManager] = None
- _ai_review_engine: Optional[AIReviewEngine] = None
- def _get_milvus():
- global _milvus_manager
- if _milvus_manager is None:
- _milvus_manager = MilvusManager(MilvusConfig())
- return _milvus_manager
- def _get_ai_review_engine():
- global _ai_review_engine
- if _ai_review_engine is None:
- file_info_dict = {
- 'file_id': "test_file_id",
- 'callback_task_id': "test_task_id",
- 'user_id': "test_user",
- 'file_name': "test.docx",
- 'file_type': 'docx',
- 'file_content': b'',
- 'review_config': [],
- 'review_item_config': {}
- }
- _ai_review_engine = AIReviewEngine(TaskFileInfo(file_info_dict))
- return _ai_review_engine
- # ============ Pydantic 模型 ============
- class DebugStepRequest(BaseModel):
- step: str = Field(..., description="环节名称: query_extract, entity_enhance, multi_stage_recall, hybrid_search, parent_doc_enhance, extract_results")
- content: str = Field(default="", description="输入文本或 JSON 数据")
- params: Optional[Dict[str, Any]] = Field(default_factory=dict, description="额外参数")
- class DebugChainRequest(BaseModel):
- content: str = Field(..., description="输入文本")
- params: Optional[Dict[str, Any]] = Field(default_factory=dict, description="额外参数")
- class RagRequest(BaseModel):
- content: str = Field(..., description="输入文本")
- collection_name: Optional[str] = Field(default="rag_children_hybrid")
- hybrid_top_k: Optional[int] = Field(default=20)
- top_k: Optional[int] = Field(default=5)
- parent_score_threshold: Optional[float] = Field(default=0.3)
- max_parents: Optional[int] = Field(default=3)
- class ProfessionalReviewRequest(BaseModel):
- content: str = Field(..., description="待审查内容")
- check_type: Optional[str] = Field(default="both", description="non_parameter, parameter, both")
- # ============ 工具函数 ============
- def _serialize_results(results):
- if not results:
- return []
- out = []
- for item in results:
- if not isinstance(item, dict):
- out.append(str(item))
- continue
- d = {}
- for k, v in item.items():
- if k == 'metadata' and isinstance(v, dict):
- d[k] = {mk: str(mv) for mk, mv in v.items()}
- elif isinstance(v, (str, int, float, bool, list, type(None))):
- d[k] = v
- else:
- d[k] = str(v)
- out.append(d)
- return out
- def _serialize_parent_docs(parent_docs):
- out = []
- for p in parent_docs:
- d = {}
- for k, v in p.items():
- if k == 'metadata' and isinstance(v, dict):
- d[k] = {mk: str(mv) for mk, mv in v.items()}
- elif k == 'text_content' and isinstance(v, str):
- d[k] = v[:500] + '...' if len(v) > 500 else v
- elif isinstance(v, (str, int, float, bool, list, type(None))):
- d[k] = v
- else:
- d[k] = str(v)
- out.append(d)
- return out
- def _parse_json_param(content: str, params: dict, key: str):
- if key in params and params[key] is not None:
- return params[key]
- if content:
- try:
- parsed = json.loads(content)
- if isinstance(parsed, list):
- return parsed
- if isinstance(parsed, dict) and key in parsed:
- return parsed[key]
- except (json.JSONDecodeError, TypeError):
- pass
- return None
- # ============ 核心逻辑(同步) ============
- def _debug_step_sync(step_name: str, content: str, params: dict) -> dict:
- start_time = time.time()
- try:
- if step_name == 'query_extract':
- input_summary = {"content_length": len(content)}
- output = query_rewrite_manager.query_extract(content)
- return {"status": "success", "step": step_name, "input_summary": input_summary,
- "output": output, "execution_time": round(time.time() - start_time, 3)}
- elif step_name == 'entity_enhance':
- query_pairs = _parse_json_param(content, params, 'query_pairs')
- if query_pairs is None:
- return {"status": "error", "step": step_name, "error": "请提供 query_pairs(在 params 或 content 中传入 JSON)"}
- input_summary = {"query_pairs_count": len(query_pairs)}
- output = entity_enhance.entities_enhance_retrieval(query_pairs)
- return {"status": "success", "step": step_name, "input_summary": input_summary,
- "output": output, "execution_time": round(time.time() - start_time, 3)}
- elif step_name == 'multi_stage_recall':
- collection_name = params.get('collection_name', 'rag_children_hybrid')
- hybrid_top_k = params.get('hybrid_top_k', 50)
- top_k = params.get('top_k', 10)
- input_summary = {"content_length": len(content), "collection_name": collection_name,
- "hybrid_top_k": hybrid_top_k, "top_k": top_k}
- output = retrieval_manager.multi_stage_recall(
- collection_name=collection_name, query_text=content,
- hybrid_top_k=hybrid_top_k, top_k=top_k)
- return {"status": "success", "step": step_name, "input_summary": input_summary,
- "output": _serialize_results(output), "execution_time": round(time.time() - start_time, 3)}
- elif step_name == 'hybrid_search':
- collection_name = params.get('collection_name', 'rag_children_hybrid')
- top_k = params.get('top_k', 10)
- dense_weight = params.get('dense_weight', 0.7)
- sparse_weight = params.get('sparse_weight', 0.3)
- input_summary = {"content_length": len(content), "collection_name": collection_name,
- "top_k": top_k, "dense_weight": dense_weight, "sparse_weight": sparse_weight}
- output = retrieval_manager.hybrid_search_recall(
- collection_name=collection_name, query_text=content,
- top_k=top_k, dense_weight=dense_weight, sparse_weight=sparse_weight)
- return {"status": "success", "step": step_name, "input_summary": input_summary,
- "output": _serialize_results(output), "execution_time": round(time.time() - start_time, 3)}
- elif step_name == 'parent_doc_enhance':
- mgr = _get_milvus()
- bfp_result_lists = _parse_json_param(content, params, 'bfp_result_lists')
- if bfp_result_lists is None:
- return {"status": "error", "step": step_name, "error": "请提供 bfp_result_lists(在 params 或 content 中传入 JSON)"}
- score_threshold = params.get('score_threshold', 0.3)
- max_parents = params.get('max_parents_per_pair', 3)
- input_summary = {"bfp_lists_count": len(bfp_result_lists),
- "score_threshold": score_threshold, "max_parents_per_pair": max_parents}
- output = enhance_with_parent_docs_grouped(
- mgr, bfp_result_lists,
- score_threshold=score_threshold, max_parents_per_pair=max_parents)
- serialized = {
- "enhanced_count": output.get("enhanced_count", 0),
- "enhanced_pairs": output.get("enhanced_pairs", 0),
- "total_pairs": output.get("total_pairs", 0),
- "parent_docs": _serialize_parent_docs(output.get("parent_docs", [])),
- "enhanced_results_summary": f"{len(output.get('enhanced_results', []))} 个查询对的结果"
- }
- return {"status": "success", "step": step_name, "input_summary": input_summary,
- "output": serialized, "execution_time": round(time.time() - start_time, 3)}
- elif step_name == 'extract_results':
- bfp_result_lists = _parse_json_param(content, params, 'bfp_result_lists')
- query_pairs = params.get('query_pairs', None)
- score_threshold = params.get('score_threshold', 0.5)
- if bfp_result_lists is None:
- return {"status": "error", "step": step_name, "error": "请提供 bfp_result_lists(在 params 或 content 中传入 JSON)"}
- input_summary = {"bfp_lists_count": len(bfp_result_lists),
- "has_query_pairs": query_pairs is not None,
- "score_threshold": score_threshold}
- output = extract_query_pairs_results(bfp_result_lists, query_pairs, score_threshold=score_threshold)
- return {"status": "success", "step": step_name, "input_summary": input_summary,
- "output": _serialize_results(output), "execution_time": round(time.time() - start_time, 3)}
- else:
- return {"status": "error", "step": step_name,
- "error": f"未知环节: {step_name},可选: query_extract, entity_enhance, multi_stage_recall, hybrid_search, parent_doc_enhance, extract_results"}
- except Exception as e:
- logger.exception("[rag_debug] step=%s failed", step_name)
- return {"status": "error", "step": step_name, "error": str(e),
- "execution_time": round(time.time() - start_time, 3)}
- def _debug_chain_sync(content: str, params: dict) -> dict:
- chain_start = time.time()
- steps = {}
- # Step 1
- t0 = time.time()
- try:
- query_pairs = query_rewrite_manager.query_extract(content)
- steps["query_extract"] = {"status": "success", "execution_time": round(time.time() - t0, 3),
- "output": query_pairs, "summary": f"提取到 {len(query_pairs) if query_pairs else 0} 个查询对"}
- except Exception as e:
- steps["query_extract"] = {"status": "error", "execution_time": round(time.time() - t0, 3), "error": str(e)}
- return {"status": "error", "steps": steps, "execution_time": round(time.time() - chain_start, 3),
- "error": "query_extract 失败"}
- if not query_pairs:
- return {"status": "no_results", "steps": steps, "execution_time": round(time.time() - chain_start, 3),
- "message": "query_extract 未提取到查询对"}
- # Step 2
- t0 = time.time()
- try:
- bfp_result_lists = entity_enhance.entities_enhance_retrieval(query_pairs)
- total_bfp = sum(len(r) for r in bfp_result_lists) if bfp_result_lists else 0
- steps["entity_enhance"] = {"status": "success", "execution_time": round(time.time() - t0, 3),
- "summary": f"召回 {total_bfp} 个BFP结果"}
- except Exception as e:
- steps["entity_enhance"] = {"status": "error", "execution_time": round(time.time() - t0, 3), "error": str(e)}
- return {"status": "error", "steps": steps, "execution_time": round(time.time() - chain_start, 3),
- "error": "entity_enhance 失败"}
- if not bfp_result_lists:
- return {"status": "no_results", "steps": steps, "execution_time": round(time.time() - chain_start, 3),
- "message": "entity_enhance 未召回结果"}
- # Step 3
- t0 = time.time()
- try:
- mgr = _get_milvus()
- score_threshold = params.get('score_threshold', 0.3)
- max_parents = params.get('max_parents_per_pair', 3)
- enhancement_result = enhance_with_parent_docs_grouped(
- mgr, bfp_result_lists,
- score_threshold=score_threshold, max_parents_per_pair=max_parents)
- enhanced_results = enhancement_result.get('enhanced_results', bfp_result_lists)
- steps["parent_doc_enhance"] = {"status": "success", "execution_time": round(time.time() - t0, 3),
- "summary": f"增强 {enhancement_result.get('enhanced_pairs', 0)}/{enhancement_result.get('total_pairs', 0)} 个查询对"}
- except Exception as e:
- steps["parent_doc_enhance"] = {"status": "error", "execution_time": round(time.time() - t0, 3), "error": str(e)}
- enhanced_results = bfp_result_lists
- # Step 4
- t0 = time.time()
- try:
- extract_threshold = params.get('score_threshold', 0.5)
- entity_results = extract_query_pairs_results(enhanced_results, query_pairs, score_threshold=extract_threshold)
- steps["extract_results"] = {"status": "success", "execution_time": round(time.time() - t0, 3),
- "output": _serialize_results(entity_results),
- "summary": f"提取 {len(entity_results) if entity_results else 0} 个高分结果"}
- except Exception as e:
- steps["extract_results"] = {"status": "error", "execution_time": round(time.time() - t0, 3), "error": str(e)}
- return {"status": "success", "steps": steps, "execution_time": round(time.time() - chain_start, 3)}
- def _rag_enhanced_check_sync(query_content: str) -> dict:
- """完整 RAG 增强检查链路(同步版)"""
- trace_id = f"rag_{int(time.time() * 1000)}"
- rag_monitor.start_trace(trace_id, metadata={
- "content_length": len(query_content),
- "stage": "rag_enhanced_check"
- })
- try:
- query_pairs = query_rewrite_manager.query_extract(query_content)
- if not query_pairs:
- return {"status": "no_results", "trace_id": trace_id, "message": "query_extract 未提取到查询对"}
- bfp_result_lists = entity_enhance.entities_enhance_retrieval(query_pairs)
- if not bfp_result_lists:
- return {"status": "no_results", "trace_id": trace_id, "message": "实体检索未返回结果"}
- mgr = _get_milvus()
- try:
- enhancement_result = enhance_with_parent_docs_grouped(
- mgr, bfp_result_lists, score_threshold=0.3, max_parents_per_pair=3)
- enhanced_results = enhancement_result['enhanced_results']
- except Exception:
- enhanced_results = bfp_result_lists
- entity_results = extract_query_pairs_results(enhanced_results, query_pairs, score_threshold=0.5) if enhanced_results else []
- pipeline_data = {
- "trace_id": trace_id,
- "stage": "rag_enhanced_check",
- "total_execution_time": 0,
- "final_result": {
- "retrieval_status": "success" if entity_results else "no_results",
- "entity_results": entity_results,
- "total_entities": len(entity_results) if entity_results else 0
- }
- }
- return {"status": "success", **pipeline_data}
- except Exception as e:
- logger.exception("[rag_debug] pipeline failed")
- return {"status": "error", "trace_id": trace_id, "error": str(e)}
- finally:
- rag_monitor.end_trace(trace_id)
- def _native_rag_check_sync(query_content: str, collection_name: str = "rag_children_hybrid",
- hybrid_top_k: int = 20, top_k: int = 5) -> dict:
- results = retrieval_manager.multi_stage_recall(
- collection_name=collection_name, query_text=query_content,
- hybrid_top_k=hybrid_top_k, top_k=top_k)
- serialized = _serialize_results(results)
- return {"status": "success", "results": serialized, "total_results": len(serialized)}
- def _parent_child_rag_check_sync(query_content: str, hybrid_top_k: int = 20, top_k: int = 5,
- parent_score_threshold: float = 0.3, max_parents: int = 3) -> dict:
- child_results = retrieval_manager.multi_stage_recall(
- collection_name="rag_children_hybrid", query_text=query_content,
- hybrid_top_k=hybrid_top_k, top_k=top_k)
- if not child_results:
- return {"status": "no_results", "child_results": [], "parent_results": []}
- mgr = _get_milvus()
- bfp_formatted = [child_results]
- try:
- enhancement_result = enhance_with_parent_docs_grouped(
- mgr, bfp_formatted, score_threshold=parent_score_threshold, max_parents_per_pair=max_parents)
- enhanced_results = enhancement_result.get('enhanced_results', [[]])
- parent_docs = enhancement_result.get('parent_docs', [])
- except Exception:
- enhanced_results = bfp_formatted
- parent_docs = []
- final_results = enhanced_results[0] if enhanced_results else child_results
- return {
- "status": "success",
- "child_results": _serialize_results(child_results),
- "parent_documents": _serialize_parent_docs(parent_docs),
- "enhanced_results": _serialize_results(final_results),
- "total_children": len(child_results),
- "total_parents": len(parent_docs),
- "total_enhanced": len(final_results)
- }
- async def _professional_review_async(review_content: str, check_type: str = "both") -> dict:
- """专业性审查完整测试(异步版)"""
- engine = _get_ai_review_engine()
- trace_id = f"professional_review_{int(time.time() * 1000)}"
- rag_monitor.start_trace(trace_id, metadata={
- "content_length": len(review_content),
- "check_type": check_type,
- "stage": "professional_review_test"
- })
- try:
- # Step 1: query_extract
- query_pairs = await asyncio.to_thread(query_rewrite_manager.query_extract, review_content)
- if not query_pairs:
- return {"status": "error", "trace_id": trace_id, "error": "查询提取失败"}
- # Step 2: entity enhance
- bfp_result_lists = await asyncio.to_thread(entity_enhance.entities_enhance_retrieval, query_pairs)
- if not bfp_result_lists:
- return {"status": "no_results", "trace_id": trace_id, "message": "未获取到有效的RAG召回结果"}
- # Step 3: parent doc enhancement
- mgr = _get_milvus()
- try:
- enhancement_result = await asyncio.to_thread(
- enhance_with_parent_docs_grouped, mgr, bfp_result_lists,
- **{"score_threshold": 0.3, "max_parents_per_pair": 3})
- enhanced_results = enhancement_result['enhanced_results']
- except Exception:
- enhanced_results = bfp_result_lists
- # Step 4: extract results
- entity_results = extract_query_pairs_results(enhanced_results, query_pairs, score_threshold=0.5) if enhanced_results else []
- if not entity_results:
- return {"status": "no_results", "trace_id": trace_id, "message": "没有结果通过阈值过滤"}
- # Step 5: AI review for each entity
- review_results = []
- for idx, entity_result in enumerate(entity_results):
- entity = entity_result.get('entity', '')
- combined_query = entity_result.get('combined_query', '')
- text_content = entity_result.get('text_content', '')
- file_name = entity_result.get('file_name', '')
- trace_id_idx = f"{trace_id}_entity_{idx}"
- entity_review = {
- "entity": entity,
- "combined_query": combined_query,
- "reference_source": file_name,
- "rag_score": entity_result.get('final_score', 0),
- "non_parameter_result": None,
- "parameter_result": None
- }
- if check_type in ["non_parameter", "both"]:
- try:
- non_param_result = await engine.check_non_parameter_compliance(
- trace_id_idx=trace_id_idx,
- review_content=review_content,
- review_references=text_content,
- reference_source=file_name,
- state={"callback_task_id": "test_task_id", "progress_manager": None},
- stage_name="专业性审查测试",
- entity_query=combined_query
- )
- entity_review["non_parameter_result"] = {
- 'success': non_param_result.success,
- 'details': non_param_result.details,
- 'error_message': non_param_result.error_message,
- 'execution_time': non_param_result.execution_time
- }
- except Exception as e:
- entity_review["non_parameter_result"] = {"error": str(e)}
- if check_type in ["parameter", "both"]:
- try:
- param_result = await engine.check_parameter_compliance(
- trace_id_idx=trace_id_idx,
- review_content=review_content,
- review_references=text_content,
- reference_source=file_name,
- state={"callback_task_id": "test_task_id", "progress_manager": None},
- stage_name="专业性审查测试",
- entity_query=combined_query
- )
- entity_review["parameter_result"] = {
- 'success': param_result.success,
- 'details': param_result.details,
- 'error_message': param_result.error_message,
- 'execution_time': param_result.execution_time
- }
- except Exception as e:
- entity_review["parameter_result"] = {"error": str(e)}
- review_results.append(entity_review)
- return {
- "status": "success",
- "trace_id": trace_id,
- "check_type": check_type,
- "rag_summary": {
- "query_pairs_count": len(query_pairs),
- "entity_results_count": len(entity_results),
- "query_pairs": query_pairs,
- "entity_results": entity_results
- },
- "review_results": review_results,
- "total_entities_reviewed": len(review_results)
- }
- except Exception as e:
- logger.exception("[rag_debug] professional_review failed")
- return {"status": "error", "trace_id": trace_id, "error": str(e)}
- finally:
- rag_monitor.end_trace(trace_id)
- # ============ 路由注册 ============
- def register_routes(router: APIRouter):
- @router.post("/rag/step")
- async def debug_step_endpoint(request: DebugStepRequest):
- """执行单个 RAG 环节调试"""
- step_name = request.step
- if step_name not in ('query_extract', 'entity_enhance', 'multi_stage_recall',
- 'hybrid_search', 'parent_doc_enhance', 'extract_results'):
- raise HTTPException(status_code=400, detail=f"未知环节: {step_name}")
- result = await asyncio.to_thread(_debug_step_sync, step_name, request.content, request.params or {})
- return result
- @router.post("/rag/chain")
- async def debug_chain_endpoint(request: DebugChainRequest):
- """链式执行: query_extract → entity_enhance → parent_doc_enhance → extract_results"""
- if not request.content:
- raise HTTPException(status_code=400, detail="请提供 content 参数")
- result = await asyncio.to_thread(_debug_chain_sync, request.content, request.params or {})
- return result
- @router.post("/rag/pipeline")
- async def rag_pipeline_endpoint(request: RagRequest):
- """完整 RAG 增强检查链路"""
- if not request.content:
- raise HTTPException(status_code=400, detail="请提供 content 参数")
- result = await asyncio.to_thread(_rag_enhanced_check_sync, request.content)
- return result
- @router.post("/rag/native")
- async def native_rag_endpoint(request: RagRequest):
- """Native RAG — 基础召回 + 重排序"""
- if not request.content:
- raise HTTPException(status_code=400, detail="请提供 content 参数")
- result = await asyncio.to_thread(
- _native_rag_check_sync, request.content,
- request.collection_name or "rag_children_hybrid",
- request.hybrid_top_k or 20, request.top_k or 5
- )
- return result
- @router.post("/rag/parent-child")
- async def parent_child_rag_endpoint(request: RagRequest):
- """父子文档模式 — 子检索 → 父文档增强"""
- if not request.content:
- raise HTTPException(status_code=400, detail="请提供 content 参数")
- result = await asyncio.to_thread(
- _parent_child_rag_check_sync, request.content,
- request.hybrid_top_k or 20, request.top_k or 5,
- request.parent_score_threshold or 0.3, request.max_parents or 3
- )
- return result
- @router.post("/rag/professional-review")
- async def professional_review_endpoint(request: ProfessionalReviewRequest):
- """专业性审查完整测试(RAG + AI 审查)"""
- if not request.content:
- raise HTTPException(status_code=400, detail="请提供 content 参数")
- if request.check_type not in ('non_parameter', 'parameter', 'both'):
- raise HTTPException(status_code=400, detail="check_type 必须是 non_parameter, parameter 或 both")
- result = await _professional_review_async(request.content, request.check_type or "both")
- return result
- @router.post("/rag/init")
- async def init_milvus_endpoint():
- """初始化 Milvus"""
- try:
- _get_milvus()
- return {"status": "ok", "message": "Milvus 初始化成功"}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @router.get("/rag/data")
- async def get_rag_data():
- """获取最新的 pipeline 数据"""
- data_path = os.path.join(project_root, "temp", "rag_pipeline_server", "rag_pipeline_data.json")
- if os.path.exists(data_path):
- with open(data_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- raise HTTPException(status_code=404, detail="数据文件不存在")
|