| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513 |
- # !/usr/bin/ python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :cattle_farm_views.py
- @IDE :PyCharm
- @Author :
- @Date :2025/7/10 17:32
- '''
- import json
- from typing import Optional
- from fastapi import Depends, Response, Header
- from sse_starlette import EventSourceResponse
- from starlette.responses import JSONResponse
- from fastapi import Depends, Request, Response, Header
- from foundation.agent.test_agent import test_agent_client
- from foundation.agent.generate.model_generate import test_generate_model_client
- from foundation.logger.loggering import server_logger
- from foundation.schemas.test_schemas import TestForm
- from foundation.utils.common import return_json, handler_err
- from views import test_router, get_operation_id
- from foundation.agent.workflow.test_workflow_graph import test_workflow_graph
- from foundation.base.mysql.async_mysql_base_dao import TestTabDAO
- from foundation.utils.tool_utils import DateTimeEncoder
- @test_router.post("/generate/chat", response_model=TestForm)
- async def generate_chat_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 生成类模型
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- print(trace_id)
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- output = test_generate_model_client.get_model_generate_invoke(trace_id , task_prompt_info,
- input_query, context)
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/generate/stream", response_model=TestForm)
- async def generate_stream_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 生成类模型
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 从字典中获取input
- input_query = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询 trace_id, task_prompt_info: dict, input_query, context=None
- for chunk in test_generate_model_client.get_model_generate_stream(trace_id , task_prompt_info,
- input_query, context):
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "output": chunk,
- "completed": False,
- }, ensure_ascii=False)
- }
- # 获取缓存数据
- result_data = {}
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": json.dumps(result_data, ensure_ascii=False),
- "code": 0,
- "trace_id": trace_id,
- }, ensure_ascii=False),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "message": str(e),
- "code": 1
- }, ensure_ascii=False)
- }
- finally:
- # 不需要关闭客户端,因为它是单例
- pass
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="generate/stream")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- # 路由
- @test_router.post("/agent/chat", response_model=TestForm)
- async def chat_endpoint(
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- task_prompt_info = {"task_prompt": ""}
-
- # stream 流式执行
- output = await test_agent_client.handle_query(trace_id , task_prompt_info, input_data, context, param.config)
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="agent/chat")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/chat")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/agent/stream", response_class=Response)
- async def chat_agent_stream(param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈 (SSE流式响应)
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
-
- # 提取参数
- input_data = param.input
- context = param.context
- header_info = {
-
- }
- task_prompt_info = {"task_prompt": ""}
- # 如果business_scene为None,则使用大模型进行意图识别
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询
- async for chunk in test_agent_client.handle_query_stream(
- trace_id=trace_id,
- config_param=param.config,
- task_prompt_info=task_prompt_info,
- input_query=input_data,
- context=context,
- header_info=header_info
- ):
- server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "code": 0,
- "output": chunk,
- "completed": False,
- "trace_id": trace_id,
- }, ensure_ascii=False)
- }
- # 获取缓存数据
- result_data = {}
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": json.dumps(result_data, ensure_ascii=False),
- "code": 0,
- "trace_id": trace_id,
- }, ensure_ascii=False),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "message": str(e),
- "code": 1
- }, ensure_ascii=False)
- }
- finally:
- # 不需要关闭客户端,因为它是单例
- pass
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
- except Exception as err:
- # 初始错误处理
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="agent/stream")
- return JSONResponse(
- return_json(code=1, msg=f"{err}", trace_id=trace_id),
- status_code=500
- )
- @test_router.post("/graph/stream", response_class=Response)
- async def chat_graph_stream(param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈 (SSE流式响应)
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # request_param = {
- # "input": param.input,
- # "config": param.config,
- # "context": param.context
- # }
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询
- async for chunk in test_workflow_graph.handle_query_stream(
- param=param,
- trace_id=trace_id,
- ):
- server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "code": 0,
- "output": chunk,
- "completed": False,
- "trace_id": trace_id,
- "dataType": "text"
- }, ensure_ascii=False)
- }
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": "Stream completed",
- "code": 0,
- "trace_id": trace_id,
- "dataType": "text"
- }, ensure_ascii=False),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "msg": str(e),
- "code": 1,
- "dataType": "text"
- }, ensure_ascii=False)
- }
- finally:
- # 不需要关闭客户端,因为它是单例
- pass
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
-
- except Exception as err:
- # 初始错误处理
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="graph/stream")
- return JSONResponse(
- return_json(code=1, msg=f"{err}", trace_id=trace_id),
- status_code=500
- )
-
- @test_router.post("/mysql/add", response_model=TestForm)
- async def test_mysql_add(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 从app.state中获取数据库连接池
- async_db_pool = request.app.state.async_db_pool
- from foundation.base.mysql.async_mysql_base_dao import TestTabDAO
- test_tab_dao = TestTabDAO(async_db_pool)
- # name: str, email: str, age: int
- name = input_data
- email = session_id
- age = 18
- test_id = await test_tab_dao.insert_user(name=name, email=email, age=age)
- output = f"【test_id】: {test_id}"
-
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/add")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/add")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/add")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/mysql/get", response_model=TestForm)
- async def test_mysql_add(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 从app.state中获取数据库连接池
- async_db_pool = request.app.state.async_db_pool
- test_tab_dao = TestTabDAO(async_db_pool)
- test_id = input_data;
- data = await test_tab_dao.get_user_by_id(user_id=test_id)
- server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/mysql/get")
- json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
- output = json_str
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/get")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/get")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/get")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
- @test_router.post("/mysql/list", response_model=TestForm)
- async def test_mysql_add(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 从app.state中获取数据库连接池
- async_db_pool = request.app.state.async_db_pool
- from foundation.base.mysql.async_mysql_base_dao import TestTabDAO
- test_tab_dao = TestTabDAO(async_db_pool)
- test_id = input_data;
- data = await test_tab_dao.get_all_users()
- server_logger.info(trace_id=trace_id, msg=f"【result】: {data}", log_type="/mysql/list")
- json_str = json.dumps(data , cls=DateTimeEncoder, ensure_ascii=False, indent=2)
- output = json_str
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/list")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/list")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/list")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @test_router.post("/mysql/update", response_model=TestForm)
- async def test_mysql_add(
- request: Request,
- param: TestForm,
- trace_id: str = Depends(get_operation_id)):
- """
- 根据MySQL应用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.session_id
- context = param.context
- header_info = {
- }
- # 从app.state中获取数据库连接池
- async_db_pool = request.app.state.async_db_pool
- test_tab_dao = TestTabDAO(async_db_pool)
- test_id = session_id;
- updates = {
- "name": input_data,
- "email": "test_email——upt",
- "age": 22
- }
- success = await test_tab_dao.update_user(user_id=test_id , **updates)
- server_logger.info(trace_id=trace_id, msg=f"【result】: {success}", log_type="/mysql/update")
- output = success
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="/mysql/update")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type="text", trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/update")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="/mysql/update")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
-
|