test_views.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820
  1. # !/usr/bin/ python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :cattle_farm_views.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/10 17:32
  9. '''
  10. import json
  11. from typing import Optional
  12. from fastapi import Depends, Response, Header
  13. from sse_starlette import EventSourceResponse
  14. from starlette.responses import JSONResponse
  15. from foundation.agent.test_agent import test_agent_client
  16. from foundation.agent.generate.model_generate import test_generate_model_client
  17. from foundation.logger.loggering import server_logger
  18. from foundation.schemas.test_schemas import TestForm
  19. from foundation.utils.common import return_json, handler_err
  20. from views import test_router, get_operation_id
  21. from foundation.agent.workflow.test_workflow_graph import test_workflow_graph
  22. from file_processors.pdf_processor import PDFProcessor
  23. from file_processors.bfp_pdf_processor import BfpPDFProcessor
  24. from foundation.models.silicon_flow import SiliconFlowAPI
  25. from foundation.rag.vector.pg_vector_mananger import PGVectorManager
  26. from foundation.rag.vector.pg_vector import PGVectorDB
  27. from foundation.rag.vector.milvus_vector import MilvusVectorManager
  28. @test_router.post("/generate/chat", response_model=TestForm)
  29. async def generate_chat_endpoint(
  30. param: TestForm,
  31. trace_id: str = Depends(get_operation_id)):
  32. """
  33. 生成类模型
  34. """
  35. try:
  36. server_logger.info(trace_id=trace_id, msg=f"{param}")
  37. print(trace_id)
  38. # 从字典中获取input
  39. input_query = param.input
  40. session_id = param.config.session_id
  41. context = param.context
  42. header_info = {
  43. }
  44. task_prompt_info = {"task_prompt": ""}
  45. output = test_generate_model_client.get_model_generate_invoke(trace_id , task_prompt_info,
  46. input_query, context)
  47. # 直接执行
  48. server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  49. # 返回字典格式的响应
  50. return JSONResponse(
  51. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  52. except ValueError as err:
  53. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  54. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  55. except Exception as err:
  56. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  57. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  58. @test_router.post("/generate/stream", response_model=TestForm)
  59. async def generate_stream_endpoint(
  60. param: TestForm,
  61. trace_id: str = Depends(get_operation_id)):
  62. """
  63. 生成类模型
  64. """
  65. try:
  66. server_logger.info(trace_id=trace_id, msg=f"{param}")
  67. # 从字典中获取input
  68. input_query = param.input
  69. session_id = param.config.session_id
  70. context = param.context
  71. header_info = {
  72. }
  73. task_prompt_info = {"task_prompt": ""}
  74. # 创建 SSE 流式响应
  75. async def event_generator():
  76. try:
  77. # 流式处理查询 trace_id, task_prompt_info: dict, input_query, context=None
  78. for chunk in test_generate_model_client.get_model_generate_stream(trace_id , task_prompt_info,
  79. input_query, context):
  80. # 发送数据块
  81. yield {
  82. "event": "message",
  83. "data": json.dumps({
  84. "output": chunk,
  85. "completed": False,
  86. }, ensure_ascii=False)
  87. }
  88. # 获取缓存数据
  89. result_data = {}
  90. # 发送结束事件
  91. yield {
  92. "event": "message_end",
  93. "data": json.dumps({
  94. "completed": True,
  95. "message": json.dumps(result_data, ensure_ascii=False),
  96. "code": 0,
  97. "trace_id": trace_id,
  98. }, ensure_ascii=False),
  99. }
  100. except Exception as e:
  101. # 错误处理
  102. yield {
  103. "event": "error",
  104. "data": json.dumps({
  105. "trace_id": trace_id,
  106. "message": str(e),
  107. "code": 1
  108. }, ensure_ascii=False)
  109. }
  110. finally:
  111. # 不需要关闭客户端,因为它是单例
  112. pass
  113. # 返回 SSE 响应
  114. return EventSourceResponse(
  115. event_generator(),
  116. headers={
  117. "Cache-Control": "no-cache",
  118. "Connection": "keep-alive"
  119. }
  120. )
  121. except ValueError as err:
  122. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  123. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  124. except Exception as err:
  125. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  126. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  127. # 路由
  128. @test_router.post("/agent/chat", response_model=TestForm)
  129. async def chat_endpoint(
  130. param: TestForm,
  131. trace_id: str = Depends(get_operation_id)):
  132. """
  133. 根据场景获取智能体反馈
  134. """
  135. try:
  136. server_logger.info(trace_id=trace_id, msg=f"{param}")
  137. # 验证参数
  138. # 从字典中获取input
  139. input_data = param.input
  140. session_id = param.config.session_id
  141. context = param.context
  142. header_info = {
  143. }
  144. task_prompt_info = {"task_prompt": ""}
  145. # stream 流式执行
  146. output = await test_agent_client.handle_query(trace_id , task_prompt_info, input_data, context, param.config)
  147. # 直接执行
  148. server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  149. # 返回字典格式的响应
  150. return JSONResponse(
  151. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  152. except ValueError as err:
  153. handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
  154. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  155. except Exception as err:
  156. handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
  157. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  158. @test_router.post("/agent/stream", response_class=Response)
  159. async def chat_agent_stream(param: TestForm,
  160. trace_id: str = Depends(get_operation_id)):
  161. """
  162. 根据场景获取智能体反馈 (SSE流式响应)
  163. """
  164. try:
  165. server_logger.info(trace_id=trace_id, msg=f"{param}")
  166. # 提取参数
  167. input_data = param.input
  168. context = param.context
  169. header_info = {
  170. }
  171. task_prompt_info = {"task_prompt": ""}
  172. # 如果business_scene为None,则使用大模型进行意图识别
  173. server_logger.info(trace_id=trace_id, msg=f"{param}")
  174. # 创建 SSE 流式响应
  175. async def event_generator():
  176. try:
  177. # 流式处理查询
  178. async for chunk in test_agent_client.handle_query_stream(
  179. trace_id=trace_id,
  180. config_param=param.config,
  181. task_prompt_info=task_prompt_info,
  182. input_query=input_data,
  183. context=context,
  184. header_info=header_info
  185. ):
  186. server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
  187. # 发送数据块
  188. yield {
  189. "event": "message",
  190. "data": json.dumps({
  191. "code": 0,
  192. "output": chunk,
  193. "completed": False,
  194. "trace_id": trace_id,
  195. }, ensure_ascii=False)
  196. }
  197. # 获取缓存数据
  198. result_data = {}
  199. # 发送结束事件
  200. yield {
  201. "event": "message_end",
  202. "data": json.dumps({
  203. "completed": True,
  204. "message": json.dumps(result_data, ensure_ascii=False),
  205. "code": 0,
  206. "trace_id": trace_id,
  207. }, ensure_ascii=False),
  208. }
  209. except Exception as e:
  210. # 错误处理
  211. yield {
  212. "event": "error",
  213. "data": json.dumps({
  214. "trace_id": trace_id,
  215. "message": str(e),
  216. "code": 1
  217. }, ensure_ascii=False)
  218. }
  219. finally:
  220. # 不需要关闭客户端,因为它是单例
  221. pass
  222. # 返回 SSE 响应
  223. return EventSourceResponse(
  224. event_generator(),
  225. headers={
  226. "Cache-Control": "no-cache",
  227. "Connection": "keep-alive"
  228. }
  229. )
  230. except Exception as err:
  231. # 初始错误处理
  232. handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/stream")
  233. return JSONResponse(
  234. return_json(code=1, msg=f"{err}", trace_id=trace_id),
  235. status_code=500
  236. )
  237. @test_router.post("/graph/stream", response_class=Response)
  238. async def chat_graph_stream(param: TestForm,
  239. trace_id: str = Depends(get_operation_id)):
  240. """
  241. 根据场景获取智能体反馈 (SSE流式响应)
  242. """
  243. try:
  244. server_logger.info(trace_id=trace_id, msg=f"{param}")
  245. # request_param = {
  246. # "input": param.input,
  247. # "config": param.config,
  248. # "context": param.context
  249. # }
  250. # 创建 SSE 流式响应
  251. async def event_generator():
  252. try:
  253. # 流式处理查询
  254. async for chunk in test_workflow_graph.handle_query_stream(
  255. param=param,
  256. trace_id=trace_id,
  257. ):
  258. server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
  259. # 发送数据块
  260. yield {
  261. "event": "message",
  262. "data": json.dumps({
  263. "code": 0,
  264. "output": chunk,
  265. "completed": False,
  266. "trace_id": trace_id,
  267. "dataType": "text"
  268. }, ensure_ascii=False)
  269. }
  270. # 发送结束事件
  271. yield {
  272. "event": "message_end",
  273. "data": json.dumps({
  274. "completed": True,
  275. "message": "Stream completed",
  276. "code": 0,
  277. "trace_id": trace_id,
  278. "dataType": "text"
  279. }, ensure_ascii=False),
  280. }
  281. except Exception as e:
  282. # 错误处理
  283. yield {
  284. "event": "error",
  285. "data": json.dumps({
  286. "trace_id": trace_id,
  287. "msg": str(e),
  288. "code": 1,
  289. "dataType": "text"
  290. }, ensure_ascii=False)
  291. }
  292. finally:
  293. # 不需要关闭客户端,因为它是单例
  294. pass
  295. # 返回 SSE 响应
  296. return EventSourceResponse(
  297. event_generator(),
  298. headers={
  299. "Cache-Control": "no-cache",
  300. "Connection": "keep-alive"
  301. }
  302. )
  303. except Exception as err:
  304. # 初始错误处理
  305. handler_err(server_logger, trace_id=trace_id, err=err, err_name="graph/stream")
  306. return JSONResponse(
  307. return_json(code=1, msg=f"{err}", trace_id=trace_id),
  308. status_code=500
  309. )
  310. @test_router.post("/data/governance", response_model=TestForm)
  311. async def generate_chat_endpoint(
  312. param: TestForm,
  313. trace_id: str = Depends(get_operation_id)):
  314. """
  315. 生成类模型
  316. """
  317. try:
  318. server_logger.info(trace_id=trace_id, msg=f"{param}")
  319. print(trace_id)
  320. # 从字典中获取input
  321. input_query = param.input
  322. session_id = param.config.session_id
  323. context = param.context
  324. header_info = {
  325. }
  326. task_prompt_info = {"task_prompt": ""}
  327. output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info,
  328. input_query, context)
  329. # 直接执行
  330. server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  331. # 返回字典格式的响应
  332. return JSONResponse(
  333. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  334. except ValueError as err:
  335. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  336. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  337. except Exception as err:
  338. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  339. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  340. @test_router.post("/data/pdf/governance", response_model=TestForm)
  341. async def pdf_governance_endpoint(
  342. param: TestForm,
  343. trace_id: str = Depends(get_operation_id)):
  344. """
  345. 生成类模型
  346. """
  347. try:
  348. server_logger.info(trace_id=trace_id, msg=f"{param}")
  349. print(trace_id)
  350. # 从字典中获取input
  351. input_query = param.input
  352. session_id = param.config.session_id
  353. context = param.context
  354. header_info = {
  355. }
  356. task_prompt_info = {"task_prompt": ""}
  357. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  358. file_directory= "test/pdf_files"
  359. # 初始化知识问答处理
  360. pdf_processor = PDFProcessor(directory=file_directory)
  361. file_data = pdf_processor.process_pdfs_group()
  362. server_logger.info(trace_id=trace_id, msg=f"【result】: {file_data}", log_type="agent/chat")
  363. output = None
  364. #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context)
  365. # 直接执行
  366. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  367. # 返回字典格式的响应
  368. return JSONResponse(
  369. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  370. except ValueError as err:
  371. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  372. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  373. except Exception as err:
  374. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  375. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  376. @test_router.post("/data/bfp/governance", response_model=TestForm)
  377. async def bfp_governance_endpoint(
  378. param: TestForm,
  379. trace_id: str = Depends(get_operation_id)):
  380. """
  381. 编制依据文档切分处理
  382. """
  383. try:
  384. server_logger.info(trace_id=trace_id, msg=f"{param}")
  385. print(trace_id)
  386. # 从字典中获取input
  387. input_query = param.input
  388. session_id = param.config.session_id
  389. context = param.context
  390. header_info = {
  391. }
  392. task_prompt_info = {"task_prompt": ""}
  393. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  394. file_directory= "test/bfp_files"
  395. # 初始化知识问答处理
  396. pdf_processor = BfpPDFProcessor(directory=file_directory)
  397. file_data = pdf_processor.process_pdfs_group()
  398. server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/governance")
  399. output = None
  400. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  401. # 返回字典格式的响应
  402. return JSONResponse(
  403. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  404. except ValueError as err:
  405. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  406. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  407. except Exception as err:
  408. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  409. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  410. @test_router.post("/data/embedding/test", response_model=TestForm)
  411. async def embedding_test_endpoint(
  412. param: TestForm,
  413. trace_id: str = Depends(get_operation_id)):
  414. """
  415. embedding模型测试
  416. """
  417. try:
  418. server_logger.info(trace_id=trace_id, msg=f"{param}")
  419. print(trace_id)
  420. # 从字典中获取input
  421. input_query = param.input
  422. session_id = param.config.session_id
  423. context = param.context
  424. header_info = {
  425. }
  426. task_prompt_info = {"task_prompt": ""}
  427. text = input_query
  428. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  429. from foundation.models.silicon_flow import SiliconFlowAPI
  430. base_api_platform = SiliconFlowAPI()
  431. embedding = base_api_platform.get_embeddings([text])[0]
  432. embed_dim = len(embedding)
  433. server_logger.info(trace_id=trace_id, msg=f"【result】: {embed_dim}")
  434. output = f"embed_dim={embed_dim},embedding:{embedding}"
  435. #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context)
  436. # 直接执行
  437. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="embedding")
  438. # 返回字典格式的响应
  439. return JSONResponse(
  440. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  441. except ValueError as err:
  442. handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
  443. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  444. except Exception as err:
  445. handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
  446. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  447. @test_router.post("/data/pgvector/test", response_model=TestForm)
  448. async def pgvector_test_endpoint(
  449. param: TestForm,
  450. trace_id: str = Depends(get_operation_id)):
  451. """
  452. pg_vector 向量检索测试
  453. """
  454. try:
  455. server_logger.info(trace_id=trace_id, msg=f"{param}")
  456. print(trace_id)
  457. # 从字典中获取input
  458. input_query = param.input
  459. session_id = param.config.session_id
  460. context = param.context
  461. header_info = {
  462. }
  463. task_prompt_info = {"task_prompt": ""}
  464. output = "success"
  465. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  466. client = SiliconFlowAPI()
  467. # 初始化数据库管理器
  468. # 1、原始测试
  469. # db_manager = PGVectorManager(client)
  470. # db_manager.db_test(query_text=input_query)
  471. # 2、抽象测试
  472. pg_vector_db = PGVectorDB(base_api_platform=client)
  473. output = pg_vector_db.similarity_cosine_search(param={"table_name": "test_documents"}, query_text=input_query)
  474. # 直接执行
  475. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  476. # 返回字典格式的响应
  477. return JSONResponse(
  478. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  479. except ValueError as err:
  480. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  481. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  482. except Exception as err:
  483. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  484. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  485. @test_router.post("/data/milvus/test", response_model=TestForm)
  486. async def pgvector_test_endpoint(
  487. param: TestForm,
  488. trace_id: str = Depends(get_operation_id)):
  489. """
  490. milvus 向量检索测试
  491. """
  492. try:
  493. server_logger.info(trace_id=trace_id, msg=f"{param}")
  494. print(trace_id)
  495. # 从字典中获取input
  496. input_query = param.input
  497. session_id = param.config.session_id
  498. context = param.context
  499. header_info = {
  500. }
  501. task_prompt_info = {"task_prompt": ""}
  502. output = "success"
  503. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  504. client = SiliconFlowAPI()
  505. # 初始化数据库管理器
  506. # 1、原始测试
  507. db_manager = MilvusVectorManager(client)
  508. db_manager.db_test(query_text=input_query)
  509. # 2、抽象测试
  510. # pg_vector_db = MilvusVectorManager(base_api_platform=client)
  511. # output = pg_vector_db.similarity_cosine_search(param={"table_name": "test_documents"}, query_text=input_query)
  512. # 直接执行
  513. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  514. # 返回字典格式的响应
  515. return JSONResponse(
  516. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  517. except ValueError as err:
  518. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  519. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  520. except Exception as err:
  521. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  522. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  523. @test_router.post("/data/bfp/indb", response_model=TestForm)
  524. async def bfp_indb_endpoint(
  525. param: TestForm,
  526. trace_id: str = Depends(get_operation_id)):
  527. """
  528. 编制依据文档切分处理 和 入库处理
  529. """
  530. try:
  531. server_logger.info(trace_id=trace_id, msg=f"{param}")
  532. print(trace_id)
  533. # 从字典中获取input
  534. input_query = param.input
  535. session_id = param.config.session_id
  536. context = param.context
  537. header_info = {
  538. }
  539. task_prompt_info = {"task_prompt": ""}
  540. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  541. file_directory= "test/bfp_files"
  542. # 初始化知识问答处理
  543. pdf_processor = BfpPDFProcessor(directory=file_directory)
  544. file_data_list , total_chunks = pdf_processor.get_pdfs_group_data()
  545. server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/governance")
  546. output = None
  547. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  548. client = SiliconFlowAPI()
  549. # 抽象测试
  550. pg_vector_db = PGVectorDB(base_api_platform=client)
  551. for file_data in file_data_list:
  552. #file_data = file_data[0:5]
  553. # 数据标准化处理
  554. documents = pg_vector_db.document_standard(file_data)
  555. pg_vector_db.add_tqdm_batch_documents(param={"table_name": "tv_basis_of_preparation"}, documents=documents)
  556. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  557. # 返回字典格式的响应
  558. return JSONResponse(
  559. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  560. except ValueError as err:
  561. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  562. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  563. except Exception as err:
  564. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  565. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  566. @test_router.post("/data/bfp/batch/indb", response_model=TestForm)
  567. async def bfp_batch_indb_endpoint(
  568. param: TestForm,
  569. trace_id: str = Depends(get_operation_id)):
  570. """
  571. 编制依据文档 批量切分和入库处理
  572. """
  573. try:
  574. server_logger.info(trace_id=trace_id, msg=f"{param}")
  575. # 从字典中获取input
  576. input_query = param.input
  577. session_id = param.config.session_id
  578. context = param.context
  579. header_info = {
  580. }
  581. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  582. client = SiliconFlowAPI()
  583. # 抽象测试
  584. pg_vector_db = PGVectorDB(base_api_platform=client)
  585. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  586. file_directory= "test/bfp_files"
  587. # 初始化知识问答处理
  588. pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=pg_vector_db)
  589. pdf_processor.process_tqdm_pdfs_group(key_name="table_name")
  590. server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/batch/indb")
  591. output = "success"
  592. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  593. # 返回字典格式的响应
  594. return JSONResponse(
  595. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  596. except ValueError as err:
  597. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/batch/indb")
  598. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  599. except Exception as err:
  600. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/batch/indb")
  601. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  602. @test_router.post("/data/bfp/search", response_model=TestForm)
  603. async def bfp_search_endpoint(
  604. param: TestForm,
  605. trace_id: str = Depends(get_operation_id)):
  606. """
  607. 编制依据文档切分处理 和 入库处理
  608. """
  609. try:
  610. server_logger.info(trace_id=trace_id, msg=f"{param}")
  611. print(trace_id)
  612. # 从字典中获取input
  613. input_query = param.input
  614. session_id = param.config.session_id
  615. context = param.context
  616. header_info = {
  617. }
  618. task_prompt_info = {"task_prompt": ""}
  619. top_k = int(session_id)
  620. output = None
  621. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  622. client = SiliconFlowAPI()
  623. # 抽象测试
  624. pg_vector_db = PGVectorDB(base_api_platform=client)
  625. output = pg_vector_db.retriever(param={"table_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
  626. # 返回字典格式的响应
  627. return JSONResponse(
  628. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  629. except ValueError as err:
  630. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
  631. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  632. except Exception as err:
  633. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
  634. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  635. @test_router.post("/data/bfp/milvus/batch/indb", response_model=TestForm)
  636. async def bfp_batch_indb_endpoint(
  637. param: TestForm,
  638. trace_id: str = Depends(get_operation_id)):
  639. """
  640. 编制依据文档 批量切分和入库处理
  641. """
  642. try:
  643. server_logger.info(trace_id=trace_id, msg=f"{param}")
  644. # 从字典中获取input
  645. input_query = param.input
  646. session_id = param.config.session_id
  647. context = param.context
  648. header_info = {
  649. }
  650. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  651. client = SiliconFlowAPI()
  652. # 抽象测试
  653. vector_db = MilvusVectorManager(base_api_platform=client)
  654. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  655. file_directory= "test/bfp_files"
  656. # 初始化知识问答处理
  657. pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=vector_db)
  658. pdf_processor.process_tqdm_pdfs_group(key_name="collection_name")
  659. server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/milvus/batch/indb")
  660. output = "success"
  661. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  662. # 返回字典格式的响应
  663. return JSONResponse(
  664. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  665. except ValueError as err:
  666. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/batch/indb")
  667. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  668. except Exception as err:
  669. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/batch/indb")
  670. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  671. @test_router.post("/data/bfp/milvus/search", response_model=TestForm)
  672. async def bfp_search_endpoint(
  673. param: TestForm,
  674. trace_id: str = Depends(get_operation_id)):
  675. """
  676. 编制依据文档切分处理 和 入库处理
  677. """
  678. try:
  679. server_logger.info(trace_id=trace_id, msg=f"{param}")
  680. print(trace_id)
  681. # 从字典中获取input
  682. input_query = param.input
  683. session_id = param.config.session_id
  684. context = param.context
  685. header_info = {
  686. }
  687. task_prompt_info = {"task_prompt": ""}
  688. top_k = int(session_id)
  689. output = None
  690. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  691. client = SiliconFlowAPI()
  692. # 抽象测试
  693. vector_db = MilvusVectorManager(base_api_platform=client)
  694. output = vector_db.retriever(param={"collection_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
  695. # 返回字典格式的响应
  696. return JSONResponse(
  697. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  698. except ValueError as err:
  699. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
  700. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  701. except Exception as err:
  702. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
  703. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))