test_views.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910
  1. # !/usr/bin/ python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :cattle_farm_views.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/10 17:32
  9. '''
  10. import json
  11. from typing import Optional
  12. from fastapi import Depends, Response, Header
  13. from sse_starlette import EventSourceResponse
  14. from starlette.responses import JSONResponse
  15. from foundation.agent.test_agent import test_agent_client
  16. from foundation.agent.generate.model_generate import test_generate_model_client
  17. from foundation.logger.loggering import server_logger
  18. from foundation.schemas.test_schemas import TestForm
  19. from foundation.utils.common import return_json, handler_err
  20. from views import test_router, get_operation_id
  21. from foundation.agent.workflow.test_workflow_graph import test_workflow_graph
  22. from file_processors.pdf_processor import PDFProcessor
  23. from file_processors.bfp_pdf_processor import BfpPDFProcessor
  24. from file_processors.pdf_mineru_md import BfpPDFMineruMdProcessor
  25. from file_processors.bfp_md_processor import BfpMarkdownProcessor
  26. from foundation.models.silicon_flow import SiliconFlowAPI
  27. from foundation.rag.vector.pg_vector_mananger import PGVectorManager
  28. from foundation.rag.vector.pg_vector import PGVectorDB
  29. from foundation.rag.vector.milvus_vector import MilvusVectorManager
  30. @test_router.post("/generate/chat", response_model=TestForm)
  31. async def generate_chat_endpoint(
  32. param: TestForm,
  33. trace_id: str = Depends(get_operation_id)):
  34. """
  35. 生成类模型
  36. """
  37. try:
  38. server_logger.info(trace_id=trace_id, msg=f"{param}")
  39. print(trace_id)
  40. # 从字典中获取input
  41. input_query = param.input
  42. session_id = param.config.session_id
  43. context = param.context
  44. header_info = {
  45. }
  46. task_prompt_info = {"task_prompt": ""}
  47. output = test_generate_model_client.get_model_generate_invoke(trace_id , task_prompt_info,
  48. input_query, context)
  49. # 直接执行
  50. server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  51. # 返回字典格式的响应
  52. return JSONResponse(
  53. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  54. except ValueError as err:
  55. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  56. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  57. except Exception as err:
  58. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  59. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  60. @test_router.post("/generate/stream", response_model=TestForm)
  61. async def generate_stream_endpoint(
  62. param: TestForm,
  63. trace_id: str = Depends(get_operation_id)):
  64. """
  65. 生成类模型
  66. """
  67. try:
  68. server_logger.info(trace_id=trace_id, msg=f"{param}")
  69. # 从字典中获取input
  70. input_query = param.input
  71. session_id = param.config.session_id
  72. context = param.context
  73. header_info = {
  74. }
  75. task_prompt_info = {"task_prompt": ""}
  76. # 创建 SSE 流式响应
  77. async def event_generator():
  78. try:
  79. # 流式处理查询 trace_id, task_prompt_info: dict, input_query, context=None
  80. for chunk in test_generate_model_client.get_model_generate_stream(trace_id , task_prompt_info,
  81. input_query, context):
  82. # 发送数据块
  83. yield {
  84. "event": "message",
  85. "data": json.dumps({
  86. "output": chunk,
  87. "completed": False,
  88. }, ensure_ascii=False)
  89. }
  90. # 获取缓存数据
  91. result_data = {}
  92. # 发送结束事件
  93. yield {
  94. "event": "message_end",
  95. "data": json.dumps({
  96. "completed": True,
  97. "message": json.dumps(result_data, ensure_ascii=False),
  98. "code": 0,
  99. "trace_id": trace_id,
  100. }, ensure_ascii=False),
  101. }
  102. except Exception as e:
  103. # 错误处理
  104. yield {
  105. "event": "error",
  106. "data": json.dumps({
  107. "trace_id": trace_id,
  108. "message": str(e),
  109. "code": 1
  110. }, ensure_ascii=False)
  111. }
  112. finally:
  113. # 不需要关闭客户端,因为它是单例
  114. pass
  115. # 返回 SSE 响应
  116. return EventSourceResponse(
  117. event_generator(),
  118. headers={
  119. "Cache-Control": "no-cache",
  120. "Connection": "keep-alive"
  121. }
  122. )
  123. except ValueError as err:
  124. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  125. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  126. except Exception as err:
  127. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  128. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  129. # 路由
  130. @test_router.post("/agent/chat", response_model=TestForm)
  131. async def chat_endpoint(
  132. param: TestForm,
  133. trace_id: str = Depends(get_operation_id)):
  134. """
  135. 根据场景获取智能体反馈
  136. """
  137. try:
  138. server_logger.info(trace_id=trace_id, msg=f"{param}")
  139. # 验证参数
  140. # 从字典中获取input
  141. input_data = param.input
  142. session_id = param.config.session_id
  143. context = param.context
  144. header_info = {
  145. }
  146. task_prompt_info = {"task_prompt": ""}
  147. # stream 流式执行
  148. output = await test_agent_client.handle_query(trace_id , task_prompt_info, input_data, context, param.config)
  149. # 直接执行
  150. server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  151. # 返回字典格式的响应
  152. return JSONResponse(
  153. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  154. except ValueError as err:
  155. handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
  156. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  157. except Exception as err:
  158. handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
  159. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  160. @test_router.post("/agent/stream", response_class=Response)
  161. async def chat_agent_stream(param: TestForm,
  162. trace_id: str = Depends(get_operation_id)):
  163. """
  164. 根据场景获取智能体反馈 (SSE流式响应)
  165. """
  166. try:
  167. server_logger.info(trace_id=trace_id, msg=f"{param}")
  168. # 提取参数
  169. input_data = param.input
  170. context = param.context
  171. header_info = {
  172. }
  173. task_prompt_info = {"task_prompt": ""}
  174. # 如果business_scene为None,则使用大模型进行意图识别
  175. server_logger.info(trace_id=trace_id, msg=f"{param}")
  176. # 创建 SSE 流式响应
  177. async def event_generator():
  178. try:
  179. # 流式处理查询
  180. async for chunk in test_agent_client.handle_query_stream(
  181. trace_id=trace_id,
  182. config_param=param.config,
  183. task_prompt_info=task_prompt_info,
  184. input_query=input_data,
  185. context=context,
  186. header_info=header_info
  187. ):
  188. server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
  189. # 发送数据块
  190. yield {
  191. "event": "message",
  192. "data": json.dumps({
  193. "code": 0,
  194. "output": chunk,
  195. "completed": False,
  196. "trace_id": trace_id,
  197. }, ensure_ascii=False)
  198. }
  199. # 获取缓存数据
  200. result_data = {}
  201. # 发送结束事件
  202. yield {
  203. "event": "message_end",
  204. "data": json.dumps({
  205. "completed": True,
  206. "message": json.dumps(result_data, ensure_ascii=False),
  207. "code": 0,
  208. "trace_id": trace_id,
  209. }, ensure_ascii=False),
  210. }
  211. except Exception as e:
  212. # 错误处理
  213. yield {
  214. "event": "error",
  215. "data": json.dumps({
  216. "trace_id": trace_id,
  217. "message": str(e),
  218. "code": 1
  219. }, ensure_ascii=False)
  220. }
  221. finally:
  222. # 不需要关闭客户端,因为它是单例
  223. pass
  224. # 返回 SSE 响应
  225. return EventSourceResponse(
  226. event_generator(),
  227. headers={
  228. "Cache-Control": "no-cache",
  229. "Connection": "keep-alive"
  230. }
  231. )
  232. except Exception as err:
  233. # 初始错误处理
  234. handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/stream")
  235. return JSONResponse(
  236. return_json(code=1, msg=f"{err}", trace_id=trace_id),
  237. status_code=500
  238. )
  239. @test_router.post("/graph/stream", response_class=Response)
  240. async def chat_graph_stream(param: TestForm,
  241. trace_id: str = Depends(get_operation_id)):
  242. """
  243. 根据场景获取智能体反馈 (SSE流式响应)
  244. """
  245. try:
  246. server_logger.info(trace_id=trace_id, msg=f"{param}")
  247. # request_param = {
  248. # "input": param.input,
  249. # "config": param.config,
  250. # "context": param.context
  251. # }
  252. # 创建 SSE 流式响应
  253. async def event_generator():
  254. try:
  255. # 流式处理查询
  256. async for chunk in test_workflow_graph.handle_query_stream(
  257. param=param,
  258. trace_id=trace_id,
  259. ):
  260. server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
  261. # 发送数据块
  262. yield {
  263. "event": "message",
  264. "data": json.dumps({
  265. "code": 0,
  266. "output": chunk,
  267. "completed": False,
  268. "trace_id": trace_id,
  269. "dataType": "text"
  270. }, ensure_ascii=False)
  271. }
  272. # 发送结束事件
  273. yield {
  274. "event": "message_end",
  275. "data": json.dumps({
  276. "completed": True,
  277. "message": "Stream completed",
  278. "code": 0,
  279. "trace_id": trace_id,
  280. "dataType": "text"
  281. }, ensure_ascii=False),
  282. }
  283. except Exception as e:
  284. # 错误处理
  285. yield {
  286. "event": "error",
  287. "data": json.dumps({
  288. "trace_id": trace_id,
  289. "msg": str(e),
  290. "code": 1,
  291. "dataType": "text"
  292. }, ensure_ascii=False)
  293. }
  294. finally:
  295. # 不需要关闭客户端,因为它是单例
  296. pass
  297. # 返回 SSE 响应
  298. return EventSourceResponse(
  299. event_generator(),
  300. headers={
  301. "Cache-Control": "no-cache",
  302. "Connection": "keep-alive"
  303. }
  304. )
  305. except Exception as err:
  306. # 初始错误处理
  307. handler_err(server_logger, trace_id=trace_id, err=err, err_name="graph/stream")
  308. return JSONResponse(
  309. return_json(code=1, msg=f"{err}", trace_id=trace_id),
  310. status_code=500
  311. )
  312. @test_router.post("/data/governance", response_model=TestForm)
  313. async def generate_chat_endpoint(
  314. param: TestForm,
  315. trace_id: str = Depends(get_operation_id)):
  316. """
  317. 生成类模型
  318. """
  319. try:
  320. server_logger.info(trace_id=trace_id, msg=f"{param}")
  321. print(trace_id)
  322. # 从字典中获取input
  323. input_query = param.input
  324. session_id = param.config.session_id
  325. context = param.context
  326. header_info = {
  327. }
  328. task_prompt_info = {"task_prompt": ""}
  329. output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info,
  330. input_query, context)
  331. # 直接执行
  332. server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  333. # 返回字典格式的响应
  334. return JSONResponse(
  335. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  336. except ValueError as err:
  337. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  338. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  339. except Exception as err:
  340. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  341. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  342. @test_router.post("/data/pdf/governance", response_model=TestForm)
  343. async def pdf_governance_endpoint(
  344. param: TestForm,
  345. trace_id: str = Depends(get_operation_id)):
  346. """
  347. 生成类模型
  348. """
  349. try:
  350. server_logger.info(trace_id=trace_id, msg=f"{param}")
  351. print(trace_id)
  352. # 从字典中获取input
  353. input_query = param.input
  354. session_id = param.config.session_id
  355. context = param.context
  356. header_info = {
  357. }
  358. task_prompt_info = {"task_prompt": ""}
  359. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  360. file_directory= "test/pdf_files"
  361. # 初始化知识问答处理
  362. pdf_processor = PDFProcessor(directory=file_directory)
  363. file_data = pdf_processor.process_pdfs_group()
  364. server_logger.info(trace_id=trace_id, msg=f"【result】: {file_data}", log_type="agent/chat")
  365. output = None
  366. #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context)
  367. # 直接执行
  368. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  369. # 返回字典格式的响应
  370. return JSONResponse(
  371. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  372. except ValueError as err:
  373. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  374. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  375. except Exception as err:
  376. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  377. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  378. @test_router.post("/data/bfp/governance", response_model=TestForm)
  379. async def bfp_governance_endpoint(
  380. param: TestForm,
  381. trace_id: str = Depends(get_operation_id)):
  382. """
  383. 编制依据文档切分处理
  384. """
  385. try:
  386. server_logger.info(trace_id=trace_id, msg=f"{param}")
  387. print(trace_id)
  388. # 从字典中获取input
  389. input_query = param.input
  390. session_id = param.config.session_id
  391. context = param.context
  392. header_info = {
  393. }
  394. task_prompt_info = {"task_prompt": ""}
  395. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  396. file_directory= "test/bfp_files"
  397. # 初始化知识问答处理
  398. pdf_processor = BfpPDFProcessor(directory=file_directory)
  399. file_data = pdf_processor.process_pdfs_group()
  400. server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/governance")
  401. output = None
  402. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  403. # 返回字典格式的响应
  404. return JSONResponse(
  405. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  406. except ValueError as err:
  407. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  408. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  409. except Exception as err:
  410. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  411. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  412. @test_router.post("/data/embedding/test", response_model=TestForm)
  413. async def embedding_test_endpoint(
  414. param: TestForm,
  415. trace_id: str = Depends(get_operation_id)):
  416. """
  417. embedding模型测试
  418. """
  419. try:
  420. server_logger.info(trace_id=trace_id, msg=f"{param}")
  421. print(trace_id)
  422. # 从字典中获取input
  423. input_query = param.input
  424. session_id = param.config.session_id
  425. context = param.context
  426. header_info = {
  427. }
  428. task_prompt_info = {"task_prompt": ""}
  429. text = input_query
  430. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  431. from foundation.models.silicon_flow import SiliconFlowAPI
  432. base_api_platform = SiliconFlowAPI()
  433. embedding = base_api_platform.get_embeddings([text])[0]
  434. embed_dim = len(embedding)
  435. server_logger.info(trace_id=trace_id, msg=f"【result】: {embed_dim}")
  436. output = f"embed_dim={embed_dim},embedding:{embedding}"
  437. #output = test_generate_model_client.get_model_data_governance_invoke(trace_id , task_prompt_info, input_query, context)
  438. # 直接执行
  439. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="embedding")
  440. # 返回字典格式的响应
  441. return JSONResponse(
  442. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  443. except ValueError as err:
  444. handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
  445. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  446. except Exception as err:
  447. handler_err(server_logger, trace_id=trace_id, err=err, err_name="embedding")
  448. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  449. @test_router.post("/data/pgvector/test", response_model=TestForm)
  450. async def pgvector_test_endpoint(
  451. param: TestForm,
  452. trace_id: str = Depends(get_operation_id)):
  453. """
  454. pg_vector 向量检索测试
  455. """
  456. try:
  457. server_logger.info(trace_id=trace_id, msg=f"{param}")
  458. print(trace_id)
  459. # 从字典中获取input
  460. input_query = param.input
  461. session_id = param.config.session_id
  462. context = param.context
  463. header_info = {
  464. }
  465. task_prompt_info = {"task_prompt": ""}
  466. output = "success"
  467. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  468. client = SiliconFlowAPI()
  469. # 初始化数据库管理器
  470. # 1、原始测试
  471. # db_manager = PGVectorManager(client)
  472. # db_manager.db_test(query_text=input_query)
  473. # 2、抽象测试
  474. pg_vector_db = PGVectorDB(base_api_platform=client)
  475. output = pg_vector_db.similarity_cosine_search(param={"table_name": "test_documents"}, query_text=input_query)
  476. # 直接执行
  477. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  478. # 返回字典格式的响应
  479. return JSONResponse(
  480. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  481. except ValueError as err:
  482. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  483. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  484. except Exception as err:
  485. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  486. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  487. @test_router.post("/data/milvus/test", response_model=TestForm)
  488. async def pgvector_test_endpoint(
  489. param: TestForm,
  490. trace_id: str = Depends(get_operation_id)):
  491. """
  492. milvus 向量检索测试
  493. """
  494. try:
  495. server_logger.info(trace_id=trace_id, msg=f"{param}")
  496. print(trace_id)
  497. # 从字典中获取input
  498. input_query = param.input
  499. session_id = param.config.session_id
  500. context = param.context
  501. header_info = {
  502. }
  503. task_prompt_info = {"task_prompt": ""}
  504. output = "success"
  505. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  506. client = SiliconFlowAPI()
  507. # 初始化数据库管理器
  508. # 1、原始测试
  509. db_manager = MilvusVectorManager(client)
  510. db_manager.db_test(query_text=input_query)
  511. # 2、抽象测试
  512. # pg_vector_db = MilvusVectorManager(base_api_platform=client)
  513. # output = pg_vector_db.similarity_cosine_search(param={"table_name": "test_documents"}, query_text=input_query)
  514. # 直接执行
  515. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  516. # 返回字典格式的响应
  517. return JSONResponse(
  518. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  519. except ValueError as err:
  520. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  521. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  522. except Exception as err:
  523. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  524. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  525. @test_router.post("/data/bfp/pdf_md", response_model=TestForm)
  526. async def bfp_md_indb_endpoint(
  527. param: TestForm,
  528. trace_id: str = Depends(get_operation_id)):
  529. """
  530. 编制依据文档切分处理 和 入库处理
  531. """
  532. try:
  533. server_logger.info(trace_id=trace_id, msg=f"{param}")
  534. print(trace_id)
  535. # 从字典中获取input
  536. input_query = param.input
  537. session_id = param.config.session_id
  538. context = param.context
  539. header_info = {
  540. }
  541. task_prompt_info = {"task_prompt": ""}
  542. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  543. #file_directory= "test/bfp_files"
  544. file_directory= "test/test"
  545. output_directory = "test/bfp_md_files"
  546. # 初始化知识问答处理
  547. pdf_processor = BfpPDFMineruMdProcessor(directory=file_directory , output_directory=output_directory)
  548. pdf_processor.process_tqdm_pdfs_group()
  549. server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/pdf_md")
  550. output = None
  551. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  552. # 返回字典格式的响应
  553. return JSONResponse(
  554. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  555. except ValueError as err:
  556. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/pdf_md")
  557. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  558. except Exception as err:
  559. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/pdf_md")
  560. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  561. @test_router.post("/data/bfp/indb", response_model=TestForm)
  562. async def bfp_indb_endpoint(
  563. param: TestForm,
  564. trace_id: str = Depends(get_operation_id)):
  565. """
  566. 编制依据文档切分处理 和 入库处理
  567. """
  568. try:
  569. server_logger.info(trace_id=trace_id, msg=f"{param}")
  570. print(trace_id)
  571. # 从字典中获取input
  572. input_query = param.input
  573. session_id = param.config.session_id
  574. context = param.context
  575. header_info = {
  576. }
  577. task_prompt_info = {"task_prompt": ""}
  578. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  579. file_directory= "test/bfp_files"
  580. # 初始化知识问答处理
  581. pdf_processor = BfpPDFProcessor(directory=file_directory)
  582. file_data_list , total_chunks = pdf_processor.get_pdfs_group_data()
  583. server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/governance")
  584. output = None
  585. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  586. client = SiliconFlowAPI()
  587. # 抽象测试
  588. pg_vector_db = PGVectorDB(base_api_platform=client)
  589. for file_data in file_data_list:
  590. #file_data = file_data[0:5]
  591. # 数据标准化处理
  592. documents = pg_vector_db.document_standard(file_data)
  593. pg_vector_db.add_tqdm_batch_documents(param={"table_name": "tv_basis_of_preparation"}, documents=documents)
  594. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  595. # 返回字典格式的响应
  596. return JSONResponse(
  597. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  598. except ValueError as err:
  599. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  600. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  601. except Exception as err:
  602. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/governance")
  603. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  604. @test_router.post("/data/bfp/batch/indb", response_model=TestForm)
  605. async def bfp_batch_indb_endpoint(
  606. param: TestForm,
  607. trace_id: str = Depends(get_operation_id)):
  608. """
  609. 编制依据文档 批量切分和入库处理
  610. """
  611. try:
  612. server_logger.info(trace_id=trace_id, msg=f"{param}")
  613. # 从字典中获取input
  614. input_query = param.input
  615. session_id = param.config.session_id
  616. context = param.context
  617. header_info = {
  618. }
  619. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  620. client = SiliconFlowAPI()
  621. # 抽象测试
  622. pg_vector_db = PGVectorDB(base_api_platform=client)
  623. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  624. file_directory= "test/bfp_files"
  625. # 初始化知识问答处理
  626. pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=pg_vector_db)
  627. pdf_processor.process_tqdm_pdfs_group(key_name="table_name")
  628. server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/batch/indb")
  629. output = "success"
  630. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  631. # 返回字典格式的响应
  632. return JSONResponse(
  633. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  634. except ValueError as err:
  635. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/batch/indb")
  636. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  637. except Exception as err:
  638. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/batch/indb")
  639. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  640. @test_router.post("/data/bfp/search", response_model=TestForm)
  641. async def bfp_search_endpoint(
  642. param: TestForm,
  643. trace_id: str = Depends(get_operation_id)):
  644. """
  645. 编制依据文档切分处理 和 入库处理
  646. """
  647. try:
  648. server_logger.info(trace_id=trace_id, msg=f"{param}")
  649. print(trace_id)
  650. # 从字典中获取input
  651. input_query = param.input
  652. session_id = param.config.session_id
  653. context = param.context
  654. header_info = {
  655. }
  656. task_prompt_info = {"task_prompt": ""}
  657. top_k = int(session_id)
  658. output = None
  659. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  660. client = SiliconFlowAPI()
  661. # 抽象测试
  662. pg_vector_db = PGVectorDB(base_api_platform=client)
  663. output = pg_vector_db.retriever(param={"table_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
  664. # 返回字典格式的响应
  665. return JSONResponse(
  666. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  667. except ValueError as err:
  668. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
  669. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  670. except Exception as err:
  671. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/search")
  672. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  673. @test_router.post("/data/bfp/milvus/batch/indb", response_model=TestForm)
  674. async def bfp_batch_indb_endpoint(
  675. param: TestForm,
  676. trace_id: str = Depends(get_operation_id)):
  677. """
  678. 编制依据文档 批量切分和入库处理
  679. """
  680. try:
  681. server_logger.info(trace_id=trace_id, msg=f"{param}")
  682. # 从字典中获取input
  683. input_query = param.input
  684. session_id = param.config.session_id
  685. context = param.context
  686. header_info = {
  687. }
  688. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  689. client = SiliconFlowAPI()
  690. # 抽象测试
  691. vector_db = MilvusVectorManager(base_api_platform=client)
  692. #file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  693. file_directory= "test/bfp_files"
  694. # 初始化知识问答处理
  695. pdf_processor = BfpPDFProcessor(directory=file_directory, base_vector=vector_db)
  696. pdf_processor.process_tqdm_pdfs_group(key_name="collection_name")
  697. server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/milvus/batch/indb")
  698. output = "success"
  699. #server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  700. # 返回字典格式的响应
  701. return JSONResponse(
  702. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  703. except ValueError as err:
  704. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/batch/indb")
  705. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  706. except Exception as err:
  707. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/batch/indb")
  708. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  709. @test_router.post("/data/bfp/milvus/search", response_model=TestForm)
  710. async def bfp_search_endpoint(
  711. param: TestForm,
  712. trace_id: str = Depends(get_operation_id)):
  713. """
  714. 编制依据文档切分处理 和 入库处理
  715. """
  716. try:
  717. server_logger.info(trace_id=trace_id, msg=f"{param}")
  718. print(trace_id)
  719. # 从字典中获取input
  720. input_query = param.input
  721. session_id = param.config.session_id
  722. context = param.context
  723. header_info = {
  724. }
  725. task_prompt_info = {"task_prompt": ""}
  726. top_k = int(session_id)
  727. output = None
  728. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  729. client = SiliconFlowAPI()
  730. # 抽象测试
  731. vector_db = MilvusVectorManager(base_api_platform=client)
  732. output = vector_db.retriever(param={"collection_name": "tv_basis_of_preparation"}, query_text=input_query , top_k=top_k)
  733. # 返回字典格式的响应
  734. return JSONResponse(
  735. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  736. except ValueError as err:
  737. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
  738. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  739. except Exception as err:
  740. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/milvus/search")
  741. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  742. @test_router.post("/data/bfp/md/milvus/batch/indb", response_model=TestForm)
  743. async def bfp_md_batch_indb_endpoint(
  744. param: TestForm,
  745. trace_id: str = Depends(get_operation_id)):
  746. """
  747. 编制依据文档 批量切分和入库处理
  748. """
  749. try:
  750. server_logger.info(trace_id=trace_id, msg=f"{param}")
  751. # 从字典中获取input
  752. input_query = param.input
  753. session_id = param.config.session_id
  754. context = param.context
  755. header_info = {
  756. }
  757. # 初始化客户端(需提前设置环境变量 SILICONFLOW_API_KEY)
  758. client = SiliconFlowAPI()
  759. # 抽象测试
  760. vector_db = MilvusVectorManager(base_api_platform=client)
  761. # file_directory= "I:/wangxun_dev_workspace/lq_workspace/LQDataGovernance/test/pdf_files"
  762. file_directory = "test/bfp_md_files"
  763. # 初始化知识问答处理
  764. processor = BfpMarkdownProcessor(directory=file_directory, base_vector=vector_db)
  765. processor.process_tqdm_pdfs_group(key_name="collection_name")
  766. server_logger.info(trace_id=trace_id, msg=f"【result】: ", log_type="bfp/md/milvus/batch/indb")
  767. output = "success"
  768. # server_logger.debug(trace_id=trace_id, ms g=f"【result】: {output}", log_type="agent/chat")
  769. # 返回字典格式的响应
  770. return JSONResponse(
  771. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  772. except ValueError as err:
  773. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/md/milvus/batch/indb")
  774. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  775. except Exception as err:
  776. handler_err(server_logger, trace_id=trace_id, err=err, err_name="bfp/md/milvus/batch/indb")
  777. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))