| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- # !/usr/bin/ python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :cattle_farm_views.py
- @IDE :PyCharm
- @Author :
- @Date :2025/7/10 17:32
- '''
- import json
- from typing import Optional
- from fastapi import Depends, Response, Header
- from sse_starlette import EventSourceResponse
- from starlette.responses import JSONResponse
- from agent.agent_mcp import client
- from generate.model_generate import XiwuzcModelGenerateClient
- from logger.loggering import server_logger
- from schemas.cattle_farm import CattleFarm
- from utils import yaml_utils
- from utils.common import return_json, handler_err
- from views import cattle_router, get_operation_id
- from agent.intent import intent_identify_client
- def get_token(authorization: Optional[str] = Header(default=None)):
- """提取 Bearer Token (非必填)"""
- if authorization is None:
- return None
- scheme, _, token = authorization.partition(" ")
- return token
- def get_tenant_id(tenant_id: Optional[str] | None = Header(None, alias="X-lq-TENANT-ID")):
- """处理租户ID"""
- return tenant_id
- # 路由
- @cattle_router.post("/chat", response_model=CattleFarm)
- async def chat_endpoint(
- param: CattleFarm,
- token: str = Depends(get_token),
- tenant_id: str = Depends(get_tenant_id),
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.sessionId
- business_scene = param.businessScene
- context = param.context
- supplementInfo = param.supplementInfo
- header_info = {
- "token": token,
- "tenantId": tenant_id,
- }
- # 如果business_scene为None,则使用大模型进行意图识别
- if business_scene is None:
- business_scene = await intent_identify_client.recognize_intent(trace_id=trace_id , config=param.config , input=input_data)
- server_logger.info(trace_id=trace_id, msg=f"使用意图识别:business_scene={business_scene}")
- business_scene_enum, task_prompt_info = yaml_utils.get_business_scene_prompt(trace_id=trace_id, business_scene=business_scene)
- final_result_data_type = task_prompt_info["final_result_data_type"]
- server_logger.info(trace_id=trace_id, msg=f"session_id:{session_id}, business_scene:{business_scene},final_result_data_type:{final_result_data_type} ,input_data: {input_data}",
- log_type="queryex")
- # stream 流式执行
- output = await client.handle_query(trace_id , business_scene , task_prompt_info, input_data, context, supplementInfo, header_info , param.config)
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="queryex")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(business_scene=business_scene, data={"output": output}, data_type=final_result_data_type, trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- @cattle_router.post("/stream", response_class=Response)
- async def chat_agent(param: CattleFarm,
- token: str = Depends(get_token),
- tenant_id: str = Depends(get_tenant_id),
- trace_id: str = Depends(get_operation_id)):
- """
- 根据场景获取智能体反馈 (SSE流式响应)
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
-
-
- # 提取参数
- input_data = param.input
- session_id = param.config.sessionId
- user_role = param.config.userRole
- business_scene = param.businessScene
- context = param.context
- supplementInfo = param.supplementInfo
- header_info = {
- "token": token,
- "tenantId": tenant_id,
- }
- # 如果business_scene为None,则使用大模型进行意图识别
-
- # 获取任务提示信息
- from enums.common_enums import BusinessSceneEnum
- business_scene_enum, task_prompt_info = BusinessSceneEnum.COMMON_MODEL_QUERY , {"task_prompt": ""}
- final_result_data_type = "text"
- server_logger.info(trace_id=trace_id, msg=f"session_id:{session_id}, business_scene:{business_scene},final_result_data_type:{final_result_data_type} ,input_data: {input_data}",
- log_type="queryex")
-
-
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询
- async for chunk in client.handle_query_stream(
- trace_id=trace_id,
- config_param=param.config,
- business_scene=business_scene,
- task_prompt_info=task_prompt_info,
- input_query=input_data,
- context=context,
- supplement_info=supplementInfo,
- header_info=header_info
- ):
- server_logger.debug(trace_id=trace_id, msg=f"{chunk}")
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "code": 0,
- "output": chunk,
- "completed": False,
- "trace_id": trace_id,
- "dataType": final_result_data_type,
- "business_scene": business_scene,
- }, ensure_ascii=False)
- }
- # 获取缓存数据
- result_data = await client.get_redis_result_cache_data(trace_id=trace_id)
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": json.dumps(result_data, ensure_ascii=False),
- "code": 0,
- "trace_id": trace_id,
- "dataType": "text",
- "business_scene": business_scene,
- }, ensure_ascii=False),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "message": str(e),
- "code": 1,
- "dataType": "text",
- "business_scene": business_scene,
- }, ensure_ascii=False)
- }
- finally:
- # 不需要关闭客户端,因为它是单例
- pass
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
- except Exception as err:
- # 初始错误处理
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex")
- return JSONResponse(
- return_json(code=1, msg=f"{err}", trace_id=trace_id),
- status_code=500
- )
- @cattle_router.post("/generate/ai_stream", response_class=Response)
- def chat_stream(param: CattleFarm,
- token: str = Depends(get_token),
- tenant_id: str = Depends(get_tenant_id),
- trace_id: str = Depends(get_operation_id)):
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 提取参数
- input_data = param.input
- session_id = param.config.sessionId
- business_scene = param.businessScene
- context = param.context
- supplementInfo = param.supplementInfo
- header_info = {
- "token": token,
- "tenantId": tenant_id,
- }
- # 如果business_scene为None,则使用大模型进行意图识别
- if business_scene is None:
- business_scene = intent_identify_client.recognize_intent(input_data)
- server_logger.info(trace_id=trace_id, msg=f"使用意图识别:business_scene={business_scene}")
- # 获取系统提示
- business_scene_enum, task_prompt_info = yaml_utils.get_business_scene_prompt(trace_id=trace_id, business_scene=business_scene)
- server_logger.info(trace_id=trace_id, msg=f"session_id:{session_id}, business_scene:{business_scene} , business_scene_enum:{business_scene_enum} ,input_data: {input_data}",
- log_type="queryex")
- xwzc_generate_client = XiwuzcModelGenerateClient()
- # 创建 SSE 流式响应
- async def event_generator():
- try:
- # 流式处理查询
- for chunk in xwzc_generate_client.get_model_generate_stream(task_prompt_info, session_id, input_data,
- context, supplementInfo):
- # 发送数据块
- yield {
- "event": "message",
- "data": json.dumps({
- "output": chunk,
- "completed": False,
- "trace_id": trace_id,
- "dataType": business_scene_enum.data_type
- })
- }
- # 发送结束事件
- yield {
- "event": "message_end",
- "data": json.dumps({
- "completed": True,
- "message": "Stream completed",
- "code": 0,
- "trace_id": trace_id
- }),
- }
- except Exception as e:
- # 错误处理
- yield {
- "event": "error",
- "data": json.dumps({
- "trace_id": trace_id,
- "msg": str(e),
- "code": 1,
- "dataType": "text"
- })
- }
- # 返回 SSE 响应
- return EventSourceResponse(
- event_generator(),
- headers={
- "Cache-Control": "no-cache",
- "Connection": "keep-alive"
- }
- )
- except Exception as err:
- # 初始错误处理
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex")
- return JSONResponse(
- return_json(code=1, msg=f"{err}", trace_id=trace_id),
- status_code=500
- )
- @cattle_router.post("/generate/tools/execute", response_model=CattleFarm)
- async def chat_generate_tools_endpoint(
- param: CattleFarm,
- token: str = Depends(get_token),
- tenant_id: str = Depends(get_tenant_id),
- trace_id: str = Depends(get_operation_id)):
- """
- 工具调用
- """
- try:
- server_logger.info(trace_id=trace_id, msg=f"{param}")
- # 验证参数
- # 从字典中获取input
- input_data = param.input
- session_id = param.config.sessionId
- business_scene = param.businessScene
- context = param.context
- supplementInfo = param.supplementInfo
- header_info = {
- "token": token,
- "tenantId": tenant_id,
- }
- # 如果business_scene为None,则使用大模型进行意图识别
- if business_scene is None:
- business_scene = intent_identify_client.recognize_intent(input_data)
- server_logger.info(trace_id=trace_id, msg=f"使用意图识别:business_scene={business_scene}")
- business_scene_enum, task_prompt_info = yaml_utils.get_business_scene_prompt(trace_id=trace_id, business_scene=business_scene)
- server_logger.info(trace_id=trace_id, msg=f"session_id:{session_id}, business_scene:{business_scene} , business_scene_enum:{business_scene_enum} ,input_data: {input_data}",
- log_type="queryex/tools2")
- xwzc_generate_client = XiwuzcModelGenerateClient()
- # stream 流式执行
- output = await xwzc_generate_client.get_model_tools_call(trace_id, session_id, task_prompt_info, input_data, context, supplementInfo, header_info)
- # 直接执行
- server_logger.debug(trace_id=trace_id, msg=f"【result】: {output}", log_type="queryex/tools2")
- # 返回字典格式的响应
- return JSONResponse(
- return_json(data={"output": output}, data_type=business_scene_enum.data_type, trace_id=trace_id))
- except ValueError as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex/tools2")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
- except Exception as err:
- handler_err(server_logger, trace_id=trace_id, err=err, err_name="queryex/tools2")
- return JSONResponse(return_json(code=100500, msg=f"{err}", trace_id=trace_id))
|