# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :cattle_farm_views.py @IDE :PyCharm @Author : @Date :2025/7/10 17:32 ''' import json from typing import Optional from fastapi import Depends, Response, Header from sse_starlette import EventSourceResponse from starlette.responses import JSONResponse from agent.agent_mcp import client from generate.model_generate import XiwuzcModelGenerateClient from logger.loggering import server_logger from schemas.cattle_farm import CattleFarm from utils import yaml_utils from utils.common import return_json, handler_err from views import cattle_router, get_operation_id from agent.intent import intent_identify_client def get_token(authorization: Optional[str] = Header(default=None)): """提取 Bearer Token (非必填)""" if authorization is None: return None scheme, _, token = authorization.partition(" ") return token def get_tenant_id(tenant_id: Optional[str] | None = Header(None, alias="X-lq-TENANT-ID")): """处理租户ID""" return tenant_id # 路由 @cattle_router.post("/chat", response_model=CattleFarm) async def chat_endpoint( param: CattleFarm, token: str = Depends(get_token), tenant_id: str = Depends(get_tenant_id), trace_id: str = Depends(get_operation_id)): """ 根据场景获取智能体反馈 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") # 验证参数 # 从字典中获取input input_data = param.input session_id = param.config.sessionId business_scene = param.businessScene context = param.context supplementInfo = param.supplementInfo header_info = { "token": token, "tenantId": tenant_id, } # 如果business_scene为None,则使用大模型进行意图识别 if business_scene is None: business_scene = await intent_identify_client.recognize_intent(trace_id=trace_id , config=param.config , input=input_data) server_logger.info(trace_id=trace_id, msg=f"使用意图识别:business_scene={business_scene}") business_scene_enum, task_prompt_info = yaml_utils.get_business_scene_prompt(trace_id=trace_id, business_scene=business_scene) final_result_data_type = task_prompt_info["final_result_data_type"] server_logger.info(trace_id=trace_id, msg=f"session_id:{session_id}, business_scene:{business_scene},final_result_data_type:{final_result_data_type} ,input_data: {input_data}", log_type="queryex") # stream 流式执行 output = await client.handle_query(trace_id , business_scene , task_prompt_info, input_data, context, supplementInfo, header_info , param.config) # 直接执行 server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="queryex") # 返回字典格式的响应 return JSONResponse( return_json(business_scene=business_scene, data={"output": output}, data_type=final_result_data_type, trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) @cattle_router.post("/stream", response_class=Response) async def chat_agent(param: CattleFarm, token: str = Depends(get_token), tenant_id: str = Depends(get_tenant_id), trace_id: str = Depends(get_operation_id)): """ 根据场景获取智能体反馈 (SSE流式响应) """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") # 提取参数 input_data = param.input session_id = param.config.sessionId user_role = param.config.userRole business_scene = param.businessScene context = param.context supplementInfo = param.supplementInfo header_info = { "token": token, "tenantId": tenant_id, } # 如果business_scene为None,则使用大模型进行意图识别 # 获取任务提示信息 from enums.common_enums import BusinessSceneEnum business_scene_enum, task_prompt_info = BusinessSceneEnum.COMMON_MODEL_QUERY , {"task_prompt": ""} final_result_data_type = "text" server_logger.info(trace_id=trace_id, msg=f"session_id:{session_id}, business_scene:{business_scene},final_result_data_type:{final_result_data_type} ,input_data: {input_data}", log_type="queryex") server_logger.info(trace_id=trace_id, msg=f"{param}") # 创建 SSE 流式响应 async def event_generator(): try: # 流式处理查询 async for chunk in client.handle_query_stream( trace_id=trace_id, config_param=param.config, business_scene=business_scene, task_prompt_info=task_prompt_info, input_query=input_data, context=context, supplement_info=supplementInfo, header_info=header_info ): server_logger.debug(trace_id=trace_id, msg=f"{chunk}") # 发送数据块 yield { "event": "message", "data": json.dumps({ "code": 0, "output": chunk, "completed": False, "trace_id": trace_id, "dataType": final_result_data_type, "business_scene": business_scene, }, ensure_ascii=False) } # 获取缓存数据 result_data = await client.get_redis_result_cache_data(trace_id=trace_id) # 发送结束事件 yield { "event": "message_end", "data": json.dumps({ "completed": True, "message": json.dumps(result_data, ensure_ascii=False), "code": 0, "trace_id": trace_id, "dataType": "text", "business_scene": business_scene, }, ensure_ascii=False), } except Exception as e: # 错误处理 yield { "event": "error", "data": json.dumps({ "trace_id": trace_id, "message": str(e), "code": 1, "dataType": "text", "business_scene": business_scene, }, ensure_ascii=False) } finally: # 不需要关闭客户端,因为它是单例 pass # 返回 SSE 响应 return EventSourceResponse( event_generator(), headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } ) except Exception as err: # 初始错误处理 handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex") return JSONResponse( return_json(code=1, msg=f"{err}", trace_id=trace_id), status_code=500 ) @cattle_router.post("/generate/ai_stream", response_class=Response) def chat_stream(param: CattleFarm, token: str = Depends(get_token), tenant_id: str = Depends(get_tenant_id), trace_id: str = Depends(get_operation_id)): try: server_logger.info(trace_id=trace_id, msg=f"{param}") # 提取参数 input_data = param.input session_id = param.config.sessionId business_scene = param.businessScene context = param.context supplementInfo = param.supplementInfo header_info = { "token": token, "tenantId": tenant_id, } # 如果business_scene为None,则使用大模型进行意图识别 if business_scene is None: business_scene = intent_identify_client.recognize_intent(input_data) server_logger.info(trace_id=trace_id, msg=f"使用意图识别:business_scene={business_scene}") # 获取系统提示 business_scene_enum, task_prompt_info = yaml_utils.get_business_scene_prompt(trace_id=trace_id, business_scene=business_scene) server_logger.info(trace_id=trace_id, msg=f"session_id:{session_id}, business_scene:{business_scene} , business_scene_enum:{business_scene_enum} ,input_data: {input_data}", log_type="queryex") xwzc_generate_client = XiwuzcModelGenerateClient() # 创建 SSE 流式响应 async def event_generator(): try: # 流式处理查询 for chunk in xwzc_generate_client.get_model_generate_stream(task_prompt_info, session_id, input_data, context, supplementInfo): # 发送数据块 yield { "event": "message", "data": json.dumps({ "output": chunk, "completed": False, "trace_id": trace_id, "dataType": business_scene_enum.data_type }) } # 发送结束事件 yield { "event": "message_end", "data": json.dumps({ "completed": True, "message": "Stream completed", "code": 0, "trace_id": trace_id }), } except Exception as e: # 错误处理 yield { "event": "error", "data": json.dumps({ "trace_id": trace_id, "msg": str(e), "code": 1, "dataType": "text" }) } # 返回 SSE 响应 return EventSourceResponse( event_generator(), headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } ) except Exception as err: # 初始错误处理 handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex") return JSONResponse( return_json(code=1, msg=f"{err}", trace_id=trace_id), status_code=500 ) @cattle_router.post("/generate/tools/execute", response_model=CattleFarm) async def chat_generate_tools_endpoint( param: CattleFarm, token: str = Depends(get_token), tenant_id: str = Depends(get_tenant_id), trace_id: str = Depends(get_operation_id)): """ 工具调用 """ try: server_logger.info(trace_id=trace_id, msg=f"{param}") # 验证参数 # 从字典中获取input input_data = param.input session_id = param.config.sessionId business_scene = param.businessScene context = param.context supplementInfo = param.supplementInfo header_info = { "token": token, "tenantId": tenant_id, } # 如果business_scene为None,则使用大模型进行意图识别 if business_scene is None: business_scene = intent_identify_client.recognize_intent(input_data) server_logger.info(trace_id=trace_id, msg=f"使用意图识别:business_scene={business_scene}") business_scene_enum, task_prompt_info = yaml_utils.get_business_scene_prompt(trace_id=trace_id, business_scene=business_scene) server_logger.info(trace_id=trace_id, msg=f"session_id:{session_id}, business_scene:{business_scene} , business_scene_enum:{business_scene_enum} ,input_data: {input_data}", log_type="queryex/tools2") xwzc_generate_client = XiwuzcModelGenerateClient() # stream 流式执行 output = await xwzc_generate_client.get_model_tools_call(trace_id, session_id, task_prompt_info, input_data, context, supplementInfo, header_info) # 直接执行 server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="queryex/tools2") # 返回字典格式的响应 return JSONResponse( return_json(data={"output": output}, data_type=business_scene_enum.data_type, trace_id=trace_id)) except ValueError as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex/tools2") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id)) except Exception as err: handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex/tools2") return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))