# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :cattle_farm_views.py @IDE :PyCharm @Author : @Date :2025/7/10 17:32 ''' import json from typing import Optional from fastapi import Depends, Response, Header from sse_starlette import EventSourceResponse from starlette.responses import JSONResponse from foundation.agent.test_agent import test_agent_client from foundation.agent.generate.model_generate import test_generate_model_client from foundation.logger.loggering import server_logger from foundation.schemas.test_schemas import TestForm from foundation.utils.common import return_json, handler_err from views import test_router, get_operation_id from foundation.agent.workflow.test_workflow_graph import test_workflow_graph from file_processors.pdf_processor import PDFProcessor from file_processors.bfp_pdf_processor import BfpPDFProcessor from file_processors.pdf_mineru_md import BfpPDFMineruMdProcessor from file_processors.bfp_md_processor import BfpMarkdownProcessor from foundation.models.silicon_flow import SiliconFlowAPI from foundation.rag.vector.pg_vector_mananger import PGVectorManager from foundation.rag.vector.pg_vector import PGVectorDB from foundation.rag.vector.milvus_vector import MilvusVectorManager @test_router.post("/generate/chat", response_model=TestForm) async def generate_chat_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 生成类模型 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} output = test_generate_model_client.get_model_generate_invoke(trace_id , task_prompt_info, input_query, context) # 直接执行 server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/generate/stream", response_model=TestForm) async def generate_stream_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 生成类模型 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} # 创建 SSE 流式响应 async def event_generator(): try: # 流式处理查询 trace_id, task_prompt_info: dict, input_query, context=None for chunk in test_generate_model_client.get_model_generate_stream(trace_id , task_prompt_info, input_query, context): # 发送数据块 yield { "event": "message", "data": json.dumps({ "output": chunk, "completed": False, }, ensure_ascii=False) } # 获取缓存数据 result_data = {} # 发送结束事件 yield { "event": "message_end", "data": json.dumps({ "completed": True, "message": json.dumps(result_data, ensure_ascii=False), "code": 0, "trace_id": trace_id, }, ensure_ascii=False), } except Exception as e: # 错误处理 yield { "event": "error", "data": json.dumps({ "trace_id": trace_id, "message": str(e), "code": 1 }, ensure_ascii=False) } finally: # 不需要关闭客户端,因为它是单例 pass # 返回 SSE 响应 return EventSourceResponse( event_generator(), headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } ) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) # 路由 @test_router.post("/agent/chat", response_model=TestForm) async def chat_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 根据场景获取智能体反馈 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") # 验证参数 # 从字典中获取input input_data = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} # stream 流式执行 output = await test_agent_client.handle_query(trace_id , task_prompt_info, input_data, context, param.config) # 直接执行 server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/agent/stream", response_class=Response) async def chat_agent_stream(param: TestForm, trace_id: str = Depends(get_operation_id)): """ 根据场景获取智能体反馈 (SSE流式响应) """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") # 提取参数 input_data = param.input context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} # 如果business_scene为None,则使用大模型进行意图识别 server_logger.info(trace_id=trace_id, msg=f"{param}") # 创建 SSE 流式响应 async def event_generator(): try: # 流式处理查询 async for chunk in test_agent_client.handle_query_stream( trace_id=trace_id, config_param=param.config, task_prompt_info=task_prompt_info, input_query=input_data, context=context, header_info=header_info ): server_logger.debug(trace_id=trace_id, msg=f"{chunk}") # 发送数据块 yield { "event": "message", "data": json.dumps({ "code": 0, "output": chunk, "completed": False, "trace_id": trace_id, }, ensure_ascii=False) } # 获取缓存数据 result_data = {} # 发送结束事件 yield { "event": "message_end", "data": json.dumps({ "completed": True, "message": json.dumps(result_data, ensure_ascii=False), "code": 0, "trace_id": trace_id, }, ensure_ascii=False), } except Exception as e: # 错误处理 yield { "event": "error", "data": json.dumps({ "trace_id": trace_id, "message": str(e), "code": 1 }, ensure_ascii=False) } finally: # 不需要关闭客户端,因为它是单例 pass # 返回 SSE 响应 return EventSourceResponse( event_generator(), headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } ) except Exception as err: # 初始错误处理 handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/stream") return JSONResponse( return_json(code=1, msg=f"{err}", trace_id=trace_id), status_code=500 ) @test_router.post("/graph/stream", response_class=Response) async def chat_graph_stream(param: TestForm, trace_id: str = Depends(get_operation_id)): """ 根据场景获取智能体反馈 (SSE流式响应) """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") # request_param = { # "input": param.input, # "config": param.config, # "context": param.context # } # 创建 SSE 流式响应 async def event_generator(): try: # 流式处理查询 async for chunk in test_workflow_graph.handle_query_stream( param=param, trace_id=trace_id, ): server_logger.debug(trace_id=trace_id, msg=f"{chunk}") # 发送数据块 yield { "event": "message", "data": json.dumps({ "code": 0, "output": chunk, "completed": False, "trace_id": trace_id, "dataType": "text" }, ensure_ascii=False) } # 发送结束事件 yield { "event": "message_end", "data": json.dumps({ "completed": True, "message": "Stream completed", "code": 0, "trace_id": trace_id, "dataType": "text" }, ensure_ascii=False), } except Exception as e: # 错误处理 yield { "event": "error", "data": json.dumps({ "trace_id": trace_id, "msg": str(e), "code": 1, "dataType": "text" }, ensure_ascii=False) } finally: # 不需要关闭客户端,因为它是单例 pass # 返回 SSE 响应 return EventSourceResponse( event_generator(), headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } ) except Exception as err: # 初始错误处理 handler_err(server_logger, trace_id=trace_id, err=err, err_name="graph/stream") return JSONResponse( return_json(code=1, msg=f"{err}", trace_id=trace_id), status_code=500 ) @test_router.post("/data/governance", response_model=TestForm) async def generate_chat_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 生成类模型 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context) # 直接执行 server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/pdf/governance", response_model=TestForm) async def pdf_governance_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 生成类模型 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files" file_directory= "test/pdf_files" # 初始化知识问答处理 pdf_processor = PDFProcessor(directory=file_directory) file_data = pdf_processor.process_pdfs_group() server_logger.info(trace_id=trace_id, msg=f"【result】: {file_data}", log_type="agent/chat") output = None #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context) # 直接执行 #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/bfp/governance", response_model=TestForm) async def bfp_governance_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 编制依据文档切分处理 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files" file_directory= "test/bfp_files" # 初始化知识问答处理 pdf_processor = BfpPDFProcessor(directory=file_directory) file_data = pdf_processor.process_pdfs_group() server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/governance") output = None #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/embedding/test", response_model=TestForm) async def embedding_test_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ embedding模型测试 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} text = input_query # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY) from foundation.models.silicon_flow import SiliconFlowAPI base_api_platform = SiliconFlowAPI() embedding = base_api_platform.get_embeddings([text])[0] embed_dim = len(embedding) server_logger.info(trace_id=trace_id, msg=f"【result】: {embed_dim}") output = f"embed_dim={embed_dim},embedding:{embedding}" #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context) # 直接执行 #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="embedding") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/pgvector/test", response_model=TestForm) async def pgvector_test_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ pg_vector 向量检索测试 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} output = "success" # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY) client = SiliconFlowAPI() # 初始化数据库管理器 # 1、原始测试 # db_manager = PGVectorManager(client) # db_manager.db_test(query_text=input_query) # 2、抽象测试 pg_vector_db = PGVectorDB(base_api_platform=client) output = pg_vector_db.similarity_cosine_search(param={"table_name": "test_documents"}, query_text=input_query) # 直接执行 #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/milvus/test", response_model=TestForm) async def pgvector_test_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ milvus 向量检索测试 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} output = "success" # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY) client = SiliconFlowAPI() # 初始化数据库管理器 # 1、原始测试 db_manager = MilvusVectorManager(client) db_manager.db_test(query_text=input_query) # 2、抽象测试 # pg_vector_db = MilvusVectorManager(base_api_platform=client) # output = pg_vector_db.similarity_cosine_search(param={"table_name": "test_documents"}, query_text=input_query) # 直接执行 #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/bfp/pdf_md", response_model=TestForm) async def bfp_md_indb_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 编制依据文档切分处理 和 入库处理 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files" #file_directory= "test/bfp_files" file_directory= "test/test" output_directory = "test/bfp_md_files" # 初始化知识问答处理 pdf_processor = BfpPDFMineruMdProcessor(directory=file_directory , output_directory=output_directory) pdf_processor.process_tqdm_pdfs_group() server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/pdf_md") output = None #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/pdf_md") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/pdf_md") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/bfp/indb", response_model=TestForm) async def bfp_indb_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 编制依据文档切分处理 和 入库处理 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files" file_directory= "test/bfp_files" # 初始化知识问答处理 pdf_processor = BfpPDFProcessor(directory=file_directory) file_data_list , total_chunks = pdf_processor.get_pdfs_group_data() server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/governance") output = None # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY) client = SiliconFlowAPI() # 抽象测试 pg_vector_db = PGVectorDB(base_api_platform=client) for file_data in file_data_list: #file_data = file_data[0:5] # 数据标准化处理 documents = pg_vector_db.document_standard(file_data) pg_vector_db.add_tqdm_batch_documents(param={"table_name": "tv_basis_of_preparation"}, documents=documents) #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/bfp/batch/indb", response_model=TestForm) async def bfp_batch_indb_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 编制依据文档 批量切分和入库处理 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY) client = SiliconFlowAPI() # 抽象测试 pg_vector_db = PGVectorDB(base_api_platform=client) #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files" file_directory= "test/bfp_files" # 初始化知识问答处理 pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=pg_vector_db) pdf_processor.process_tqdm_pdfs_group(key_name="table_name") server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/batch/indb") output = "success" #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/batch/indb") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/batch/indb") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/bfp/search", response_model=TestForm) async def bfp_search_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 编制依据文档切分处理 和 入库处理 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} top_k = int(session_id) output = None # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY) client = SiliconFlowAPI() # 抽象测试 pg_vector_db = PGVectorDB(base_api_platform=client) output = pg_vector_db.retriever(param={"table_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k) # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/bfp/milvus/batch/indb", response_model=TestForm) async def bfp_batch_indb_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 编制依据文档 批量切分和入库处理 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY) client = SiliconFlowAPI() # 抽象测试 vector_db = MilvusVectorManager(base_api_platform=client) #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files" file_directory= "test/bfp_files" # 初始化知识问答处理 pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=vector_db) pdf_processor.process_tqdm_pdfs_group(key_name="collection_name") server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/milvus/batch/indb") output = "success" #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/batch/indb") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/batch/indb") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/bfp/milvus/search", response_model=TestForm) async def bfp_search_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 编制依据文档切分处理 和 入库处理 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") print(trace_id) # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } task_prompt_info = {"task_prompt": ""} top_k = int(session_id) output = None # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY) client = SiliconFlowAPI() # 抽象测试 vector_db = MilvusVectorManager(base_api_platform=client) output = vector_db.retriever(param={"collection_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k) # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @test_router.post("/data/bfp/md/milvus/batch/indb", response_model=TestForm) async def bfp_md_batch_indb_endpoint( param: TestForm, trace_id: str = Depends(get_operation_id)): """ 编制依据文档 批量切分和入库处理 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") # 从字典中获取input input_query = param.input session_id = param.config.session_id context = param.context header_info = { } # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY) client = SiliconFlowAPI() # 抽象测试 vector_db = MilvusVectorManager(base_api_platform=client) # file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files" file_directory = "test/bfp_md_files" # 初始化知识问答处理 processor = BfpMarkdownProcessor(directory=file_directory, base_vector=vector_db) processor.process_tqdm_pdfs_group(key_name="collection_name") server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/md/milvus/batch/indb") output = "success" # server_logger.debug(trace_id=trace_id, ms g=f"【result】: {output}", log_type="agent/chat") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type="text", trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/md/milvus/batch/indb") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/md/milvus/batch/indb") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))