| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910 |
- # !/usr/bin/ python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :cattle_farm_views.py
- @IDE :PyCharm
- @Author :
- @Date :2025/7/10 17:32
- '''
- import json
- from typing import Optional
- from fastapi import Depends, Response, Header
- from sse_starlette import EventSourceResponse
- from starlette.responses import JSONResponse
- from foundation.agent.test_agent import test_agent_client
- from foundation.agent.generate.model_generate import test_generate_model_client
- from foundation.logger.loggering import server_logger
- from foundation.schemas.test_schemas import TestForm
- from foundation.utils.common import return_json, handler_err
- from views import test_router, get_operation_id
- from foundation.agent.workflow.test_workflow_graph import test_workflow_graph
- from file_processors.pdf_processor import PDFProcessor
- from file_processors.bfp_pdf_processor import BfpPDFProcessor
- from file_processors.pdf_mineru_md import BfpPDFMineruMdProcessor
- from file_processors.bfp_md_processor import BfpMarkdownProcessor
- from foundation.models.silicon_flow import SiliconFlowAPI
- from foundation.rag.vector.pg_vector_mananger import PGVectorManager
- from foundation.rag.vector.pg_vector import PGVectorDB
- from foundation.rag.vector.milvus_vector import MilvusVectorManager
- @test_router.post("/generate/chat", response_model=TestForm)
- async def generate_chat_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 生成类模型
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- output = test_generate_model_client.get_model_generate_invoke(trace_id , task_prompt_info,
- input_query, context)
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/generate/stream", response_model=TestForm)
- async def generate_stream_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 生成类模型
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询 trace_id, task_prompt_info: dict, input_query, context=None
- for chunk in test_generate_model_client.get_model_generate_stream(trace_id , task_prompt_info,
- input_query, context):
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "output": chunk,
- "completed": False,
- }, ensure_ascii=False)
- }
- # 获取缓存数据
- result_data = {}
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": json.dumps(result_data, ensure_ascii=False),
- "code": 0,
- "trace_id": trace_id,
- }, ensure_ascii=False),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "message": str(e),
- "code": 1
- }, ensure_ascii=False)
- }
- finally:
- # 不需要关闭客户端,因为它是单例
- pass
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- # 路由
- @test_router.post("/agent/chat", response_model=TestForm)
- async def chat_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
-
- # stream 流式执行
- output = await test_agent_client.handle_query(trace_id , task_prompt_info, input_data, context, param.config)
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/agent/stream", response_class=Response)
- async def chat_agent_stream(param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈 (SSE流式响应)
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
-
- # 提取参数
- input_data = param.input
- context = param.context
- header_info = {
-
- }
- task_prompt_info = {"task_prompt": ""}
- # 如果business_scene为None,则使用大模型进行意图识别
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询
- async for chunk in test_agent_client.handle_query_stream(
- trace_id=trace_id,
- config_param=param.config,
- task_prompt_info=task_prompt_info,
- input_query=input_data,
- context=context,
- header_info=header_info
- ):
- server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "code": 0,
- "output": chunk,
- "completed": False,
- "trace_id": trace_id,
- }, ensure_ascii=False)
- }
- # 获取缓存数据
- result_data = {}
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": json.dumps(result_data, ensure_ascii=False),
- "code": 0,
- "trace_id": trace_id,
- }, ensure_ascii=False),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "message": str(e),
- "code": 1
- }, ensure_ascii=False)
- }
- finally:
- # 不需要关闭客户端,因为它是单例
- pass
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
- except Exception as err:
- # 初始错误处理
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/stream")
- return JSONResponse(
- return_json(code=1, msg=f"{err}", trace_id=trace_id),
- status_code=500
- )
- @test_router.post("/graph/stream", response_class=Response)
- async def chat_graph_stream(param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈 (SSE流式响应)
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # request_param = {
- # "input": param.input,
- # "config": param.config,
- # "context": param.context
- # }
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询
- async for chunk in test_workflow_graph.handle_query_stream(
- param=param,
- trace_id=trace_id,
- ):
- server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "code": 0,
- "output": chunk,
- "completed": False,
- "trace_id": trace_id,
- "dataType": "text"
- }, ensure_ascii=False)
- }
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": "Stream completed",
- "code": 0,
- "trace_id": trace_id,
- "dataType": "text"
- }, ensure_ascii=False),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "msg": str(e),
- "code": 1,
- "dataType": "text"
- }, ensure_ascii=False)
- }
- finally:
- # 不需要关闭客户端,因为它是单例
- pass
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
-
- except Exception as err:
- # 初始错误处理
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="graph/stream")
- return JSONResponse(
- return_json(code=1, msg=f"{err}", trace_id=trace_id),
- status_code=500
- )
- @test_router.post("/data/governance", response_model=TestForm)
- async def generate_chat_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 生成类模型
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info,
- input_query, context)
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/data/pdf/governance", response_model=TestForm)
- async def pdf_governance_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 生成类模型
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
- file_directory= "test/pdf_files"
- # 初始化知识问答处理
- pdf_processor = PDFProcessor(directory=file_directory)
- file_data = pdf_processor.process_pdfs_group()
- server_logger.info(trace_id=trace_id, msg=f"【result】: {file_data}", log_type="agent/chat")
- output = None
- #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context)
- # 直接执行
- #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/data/bfp/governance", response_model=TestForm)
- async def bfp_governance_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据文档切分处理
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
- file_directory= "test/bfp_files"
- # 初始化知识问答处理
- pdf_processor = BfpPDFProcessor(directory=file_directory)
- file_data = pdf_processor.process_pdfs_group()
- server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/governance")
- output = None
-
- #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/data/embedding/test", response_model=TestForm)
- async def embedding_test_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- embedding模型测试
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- text = input_query
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- from foundation.models.silicon_flow import SiliconFlowAPI
- base_api_platform = SiliconFlowAPI()
- embedding = base_api_platform.get_embeddings([text])[0]
- embed_dim = len(embedding)
- server_logger.info(trace_id=trace_id, msg=f"【result】: {embed_dim}")
- output = f"embed_dim={embed_dim},embedding:{embedding}"
- #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context)
- # 直接执行
- #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="embedding")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/data/pgvector/test", response_model=TestForm)
- async def pgvector_test_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- pg_vector 向量检索测试
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- output = "success"
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 初始化数据库管理器
- # 1、原始测试
- # db_manager = PGVectorManager(client)
- # db_manager.db_test(query_text=input_query)
- # 2、抽象测试
- pg_vector_db = PGVectorDB(base_api_platform=client)
- output = pg_vector_db.similarity_cosine_search(param={"table_name": "test_documents"}, query_text=input_query)
-
- # 直接执行
- #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/data/milvus/test", response_model=TestForm)
- async def pgvector_test_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- milvus 向量检索测试
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- output = "success"
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 初始化数据库管理器
- # 1、原始测试
- db_manager = MilvusVectorManager(client)
- db_manager.db_test(query_text=input_query)
- # 2、抽象测试
- # pg_vector_db = MilvusVectorManager(base_api_platform=client)
- # output = pg_vector_db.similarity_cosine_search(param={"table_name": "test_documents"}, query_text=input_query)
-
- # 直接执行
- #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/data/bfp/pdf_md", response_model=TestForm)
- async def bfp_md_indb_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据文档切分处理 和 入库处理
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
- #file_directory= "test/bfp_files"
- file_directory= "test/test"
- output_directory = "test/bfp_md_files"
- # 初始化知识问答处理
- pdf_processor = BfpPDFMineruMdProcessor(directory=file_directory , output_directory=output_directory)
- pdf_processor.process_tqdm_pdfs_group()
- server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/pdf_md")
- output = None
-
- #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/pdf_md")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/pdf_md")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/data/bfp/indb", response_model=TestForm)
- async def bfp_indb_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据文档切分处理 和 入库处理
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
- file_directory= "test/bfp_files"
- # 初始化知识问答处理
- pdf_processor = BfpPDFProcessor(directory=file_directory)
- file_data_list , total_chunks = pdf_processor.get_pdfs_group_data()
- server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/governance")
- output = None
-
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 抽象测试
- pg_vector_db = PGVectorDB(base_api_platform=client)
- for file_data in file_data_list:
- #file_data = file_data[0:5]
- # 数据标准化处理
- documents = pg_vector_db.document_standard(file_data)
- pg_vector_db.add_tqdm_batch_documents(param={"table_name": "tv_basis_of_preparation"}, documents=documents)
- #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/data/bfp/batch/indb", response_model=TestForm)
- async def bfp_batch_indb_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据文档 批量切分和入库处理
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 抽象测试
- pg_vector_db = PGVectorDB(base_api_platform=client)
- #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
- file_directory= "test/bfp_files"
- # 初始化知识问答处理
- pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=pg_vector_db)
- pdf_processor.process_tqdm_pdfs_group(key_name="table_name")
- server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/batch/indb")
- output = "success"
-
- #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/batch/indb")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/batch/indb")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/data/bfp/search", response_model=TestForm)
- async def bfp_search_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据文档切分处理 和 入库处理
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- top_k = int(session_id)
-
- output = None
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 抽象测试
- pg_vector_db = PGVectorDB(base_api_platform=client)
- output = pg_vector_db.retriever(param={"table_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/data/bfp/milvus/batch/indb", response_model=TestForm)
- async def bfp_batch_indb_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据文档 批量切分和入库处理
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 抽象测试
- vector_db = MilvusVectorManager(base_api_platform=client)
- #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
- file_directory= "test/bfp_files"
- # 初始化知识问答处理
- pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=vector_db)
- pdf_processor.process_tqdm_pdfs_group(key_name="collection_name")
- server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/milvus/batch/indb")
- output = "success"
-
- #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/batch/indb")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/batch/indb")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/data/bfp/milvus/search", response_model=TestForm)
- async def bfp_search_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据文档切分处理 和 入库处理
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- top_k = int(session_id)
-
- output = None
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 抽象测试
- vector_db = MilvusVectorManager(base_api_platform=client)
- output = vector_db.retriever(param={"collection_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/data/bfp/md/milvus/batch/indb", response_model=TestForm)
- async def bfp_md_batch_indb_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 编制依据文档 批量切分和入库处理
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
- client = SiliconFlowAPI()
- # 抽象测试
- vector_db = MilvusVectorManager(base_api_platform=client)
- # file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
- file_directory = "test/bfp_md_files"
- # 初始化知识问答处理
- processor = BfpMarkdownProcessor(directory=file_directory, base_vector=vector_db)
- processor.process_tqdm_pdfs_group(key_name="collection_name")
- server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/md/milvus/batch/indb")
- output = "success"
- # server_logger.debug(trace_id=trace_id, ms g=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/md/milvus/batch/indb")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/md/milvus/batch/indb")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
|