test_views.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. # !/usr/bin/ python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :cattle_farm_views.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/10 17:32
  9. '''
  10. import json
  11. from typing import Optional
  12. from fastapi import Depends, Response, Header
  13. from sse_starlette import EventSourceResponse
  14. from starlette.responses import JSONResponse
  15. from agent.test_agent import test_agent_client
  16. from agent.generate.model_generate import test_generate_model_client
  17. from logger.loggering import server_logger
  18. from schemas.test_schemas import TestForm
  19. from utils.common import return_json, handler_err
  20. from views import test_router, get_operation_id
  21. @test_router.post("/generate/chat", response_model=TestForm)
  22. async def generate_chat_endpoint(
  23. param: TestForm,
  24. trace_id: str = Depends(get_operation_id)):
  25. """
  26. 生成类模型
  27. """
  28. try:
  29. server_logger.info(trace_id=trace_id, msg=f"{param}")
  30. # 从字典中获取input
  31. input_query = param.input
  32. session_id = param.config.session_id
  33. context = param.context
  34. header_info = {
  35. }
  36. task_prompt_info = {"task_prompt": ""}
  37. output = test_generate_model_client.get_model_generate_invoke(trace_id , task_prompt_info,
  38. input_query, context)
  39. # 直接执行
  40. server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  41. # 返回字典格式的响应
  42. return JSONResponse(
  43. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  44. except ValueError as err:
  45. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  46. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  47. except Exception as err:
  48. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  49. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  50. @test_router.post("/generate/stream", response_model=TestForm)
  51. async def generate_stream_endpoint(
  52. param: TestForm,
  53. trace_id: str = Depends(get_operation_id)):
  54. """
  55. 生成类模型
  56. """
  57. try:
  58. server_logger.info(trace_id=trace_id, msg=f"{param}")
  59. # 从字典中获取input
  60. input_query = param.input
  61. session_id = param.config.session_id
  62. context = param.context
  63. header_info = {
  64. }
  65. task_prompt_info = {"task_prompt": ""}
  66. # 创建 SSE 流式响应
  67. async def event_generator():
  68. try:
  69. # 流式处理查询 trace_id, task_prompt_info: dict, input_query, context=None
  70. for chunk in test_generate_model_client.get_model_generate_stream(trace_id , task_prompt_info,
  71. input_query, context):
  72. # 发送数据块
  73. yield {
  74. "event": "message",
  75. "data": json.dumps({
  76. "output": chunk,
  77. "completed": False,
  78. }, ensure_ascii=False)
  79. }
  80. # 获取缓存数据
  81. result_data = {}
  82. # 发送结束事件
  83. yield {
  84. "event": "message_end",
  85. "data": json.dumps({
  86. "completed": True,
  87. "message": json.dumps(result_data, ensure_ascii=False),
  88. "code": 0,
  89. "trace_id": trace_id,
  90. }, ensure_ascii=False),
  91. }
  92. except Exception as e:
  93. # 错误处理
  94. yield {
  95. "event": "error",
  96. "data": json.dumps({
  97. "trace_id": trace_id,
  98. "message": str(e),
  99. "code": 1
  100. }, ensure_ascii=False)
  101. }
  102. finally:
  103. # 不需要关闭客户端,因为它是单例
  104. pass
  105. # 返回 SSE 响应
  106. return EventSourceResponse(
  107. event_generator(),
  108. headers={
  109. "Cache-Control": "no-cache",
  110. "Connection": "keep-alive"
  111. }
  112. )
  113. except ValueError as err:
  114. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  115. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  116. except Exception as err:
  117. handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
  118. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  119. # 路由
  120. @test_router.post("/agent/chat", response_model=TestForm)
  121. async def chat_endpoint(
  122. param: TestForm,
  123. trace_id: str = Depends(get_operation_id)):
  124. """
  125. 根据场景获取智能体反馈
  126. """
  127. try:
  128. server_logger.info(trace_id=trace_id, msg=f"{param}")
  129. # 验证参数
  130. # 从字典中获取input
  131. input_data = param.input
  132. session_id = param.config.session_id
  133. context = param.context
  134. header_info = {
  135. }
  136. task_prompt_info = {"task_prompt": ""}
  137. # stream 流式执行
  138. output = await test_agent_client.handle_query(trace_id , task_prompt_info, input_data, context, param.config)
  139. # 直接执行
  140. server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
  141. # 返回字典格式的响应
  142. return JSONResponse(
  143. return_json(data={"output": output}, data_type="text", trace_id=trace_id))
  144. except ValueError as err:
  145. handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
  146. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  147. except Exception as err:
  148. handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
  149. return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
  150. @test_router.post("/agent/stream", response_class=Response)
  151. async def chat_agent_stream(param: TestForm,
  152. trace_id: str = Depends(get_operation_id)):
  153. """
  154. 根据场景获取智能体反馈 (SSE流式响应)
  155. """
  156. try:
  157. server_logger.info(trace_id=trace_id, msg=f"{param}")
  158. # 提取参数
  159. input_data = param.input
  160. context = param.context
  161. header_info = {
  162. }
  163. task_prompt_info = {"task_prompt": ""}
  164. # 如果business_scene为None,则使用大模型进行意图识别
  165. server_logger.info(trace_id=trace_id, msg=f"{param}")
  166. # 创建 SSE 流式响应
  167. async def event_generator():
  168. try:
  169. # 流式处理查询
  170. async for chunk in test_agent_client.handle_query_stream(
  171. trace_id=trace_id,
  172. config_param=param.config,
  173. task_prompt_info=task_prompt_info,
  174. input_query=input_data,
  175. context=context,
  176. header_info=header_info
  177. ):
  178. server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
  179. # 发送数据块
  180. yield {
  181. "event": "message",
  182. "data": json.dumps({
  183. "code": 0,
  184. "output": chunk,
  185. "completed": False,
  186. "trace_id": trace_id,
  187. }, ensure_ascii=False)
  188. }
  189. # 获取缓存数据
  190. result_data = {}
  191. # 发送结束事件
  192. yield {
  193. "event": "message_end",
  194. "data": json.dumps({
  195. "completed": True,
  196. "message": json.dumps(result_data, ensure_ascii=False),
  197. "code": 0,
  198. "trace_id": trace_id,
  199. }, ensure_ascii=False),
  200. }
  201. except Exception as e:
  202. # 错误处理
  203. yield {
  204. "event": "error",
  205. "data": json.dumps({
  206. "trace_id": trace_id,
  207. "message": str(e),
  208. "code": 1
  209. }, ensure_ascii=False)
  210. }
  211. finally:
  212. # 不需要关闭客户端,因为它是单例
  213. pass
  214. # 返回 SSE 响应
  215. return EventSourceResponse(
  216. event_generator(),
  217. headers={
  218. "Cache-Control": "no-cache",
  219. "Connection": "keep-alive"
  220. }
  221. )
  222. except Exception as err:
  223. # 初始错误处理
  224. handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/stream")
  225. return JSONResponse(
  226. return_json(code=1, msg=f"{err}", trace_id=trace_id),
  227. status_code=500
  228. )