| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- # !/usr/bin/python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :agent_mcp.py
- @IDE :PyCharm
- @Author :
- @Date :2025/7/21 10:12
- '''
- import json
- from langgraph.prebuilt import create_react_agent
- from sqlalchemy.sql.functions import user
- from foundation.logger.loggering import server_logger
- from foundation.utils.common import handler_err
- from foundation.utils.utils import get_models
- from foundation.utils.yaml_utils import system_prompt_config
- import threading
- import time
- from typing import Dict, List, Optional, AsyncGenerator, Any, OrderedDict
- from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
- from langchain_core.runnables import RunnableConfig
- from foundation.agent.base_agent import BaseAgent
- from foundation.schemas.test_schemas import FormConfig
- from foundation.agent.function.test_funciton import test_funtion
- class TestAgentClient(BaseAgent):
- """
- Xiwuzc 智能助手+MCP(带完整会话管理) - 针对场景优化
- 添加会话锁定机制,确保同一时间只有一个客户端可以使用特定会话
- """
- # 单例实例和线程锁
- _instance = None
- _singleton_lock = threading.Lock()
- def __new__(cls):
- """线程安全的单例模式实现"""
- if cls._instance is None:
- with cls._singleton_lock:
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- cls._instance._initialize()
- return cls._instance
- def _initialize(self):
- """初始化模型和会话管理"""
- llm, chat, embed = get_models()
- self.llm = llm
- self.chat = chat
- self.embed = embed
- self.agent_executor = None
- self.initialized = False
- self.psutil_available = True
- # 固定系统提示词
- self.system_prompt = system_prompt_config["system_prompt"]
- # 清理任务
- self.cleanup_task = None
- server_logger.info(" client initialized")
- async def init_agent(self):
- """初始化agent_executor(只需一次)"""
- if self.initialized:
- return
- # 获取部署的模型列表
- server_logger.info(f"系统提示词 system_prompt:{self.system_prompt}")
- # 创建提示词模板 - 使用固定的系统提示词
- prompt = ChatPromptTemplate.from_messages([
- ("system", self.system_prompt),
- MessagesPlaceholder(variable_name="messages"),
- ("placeholder", "{agent_scratchpad}")
- ])
- # 创建Agent - 不再使用MemorySaver
- self.agent_executor = create_react_agent(
- self.llm,
- tools=[test_funtion.query_info , test_funtion.execute , test_funtion.handle] , # 专用工具集 + 私有知识库检索工具
- prompt=prompt
- )
- self.initialized = True
- server_logger.info(" agent initialized")
- async def handle_query(self, trace_id: str, task_prompt_info: dict, input_query, context=None,
- config_param: FormConfig = None):
- try:
- # 确保agent已初始化
- if not self.initialized:
- await self.init_agent()
-
- session_id = config_param.session_id
-
- try:
- # 构建输入消息
- input_message , input_summary_context = self.get_input_context(
- trace_id=trace_id,
- task_prompt_info=task_prompt_info,
- input_query=input_query,
- context=context
- )
- # 用于模型对话使用
- input_human_message = HumanMessage(content=input_message)
- # 用于对话历史记录摘要
- input_human_summary_message = HumanMessage(content=input_summary_context)
- # 获取历史消息
- history_messages = []
- # 构造完整的消息列表
- all_messages = list(history_messages) + [input_human_message]
- # 配置执行上下文
- config = RunnableConfig(
- configurable={"thread_id": session_id},
- runnable_kwargs={"recursion_limit": 15}
- )
- # 执行智能体
- events = self.agent_executor.astream(
- {"messages": all_messages},
- config=config,
- stream_mode="values"
- )
- # 处理结果
- full_response = []
- async for event in events:
- if isinstance(event["messages"][-1], AIMessage):
- chunk = event["messages"][-1].content
- full_response.append(chunk)
- log_content = self.get_pretty_message_str(event["messages"][-1])
- server_logger.info("\n" + log_content.strip(), trace_id=trace_id)
- if full_response:
- full_text = "".join(full_response)
- server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}")
- full_text = self.clean_json_output(full_text)
- return full_text
- finally:
- # 确保释放会话锁
- pass
- except PermissionError as e:
- # 处理会话被其他设备锁定的情况
- return str(e)
- except Exception as e:
- handler_err(server_logger, trace_id=trace_id, err=e, err_name='agent/chat')
- return f"系统错误: {str(e)}"
- async def handle_query_stream(
- self,
- trace_id: str,
- task_prompt_info: dict,
- input_query: str,
- context: Optional[str] = None,
- header_info: Optional[Dict] = None,
- config_param: FormConfig = None,
- ) -> AsyncGenerator[str, None]:
- """流式处理查询(优化缓冲管理)"""
- try:
- # 确保agent已初始化
- if not self.initialized:
- await self.init_agent()
-
- session_id = config_param.session_id
-
- try:
- # 构建输入消息
- input_message , input_summary_context = self.get_input_context(
- trace_id=trace_id,
- task_prompt_info=task_prompt_info,
- input_query=input_query,
- context=context
- )
- server_logger.info(trace_id=trace_id, msg=f"input_context: {input_message}")
- # 用于模型对话使用
- input_human_message = HumanMessage(content=input_message)
- # 用于对话历史记录摘要
- input_human_summary_message = HumanMessage(content=input_summary_context)
- # 获取历史消息
- history_messages = []
- # 构造完整的消息列表
- all_messages = list(history_messages) + [input_human_message]
- # 配置执行上下文
- config = RunnableConfig(
- configurable={"thread_id": session_id},
- runnable_kwargs={"recursion_limit": 15}
- )
- # 流式执行
- events = self.agent_executor.astream_events(
- {"messages": all_messages},
- config=config,
- stream_mode="values"
- )
- full_response = []
- buffer = []
- last_flush_time = time.time()
- # 流式处理事件
- async for event in events:
- # 只在特定事件类型时打印日志
- self.log_stream_pretty_message(trace_id=trace_id, event=event)
-
- if 'chunk' in event['data'] and "on_chat_model_stream" in event['event']:
- chunk = event['data']['chunk'].content
- full_response.append(chunk)
- # 缓冲管理策略
- buffer.append(chunk)
- current_time = time.time()
- # 满足以下任一条件即刷新缓冲区
- if (len(buffer) >= 3 or # 达到最小块数
- (current_time - last_flush_time) > 0.5 or # 超时
- any(chunk.endswith((c, f"{c} ")) for c in
- ['.', '。', '!', '?', '\n', ';', ';'])): # 自然断点
- # 合并并发送缓冲内容
- combined = ''.join(buffer)
- yield combined
- # 重置缓冲
- buffer.clear()
- last_flush_time = current_time
- # 处理剩余内容
- if buffer:
- yield ''.join(buffer)
- # 将完整响应添加到历史并进行压缩
- if full_response:
- full_text = "".join(full_response)
- server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}")
- finally:
- # 确保释放会话锁
- pass
- except PermissionError as e:
- yield json.dumps({"error": str(e)})
- except Exception as e:
- handler_err(server_logger, trace_id=trace_id, err=e, err_name='test_stream')
- yield json.dumps({"error": f"系统错误: {str(e)}"})
- test_agent_client = TestAgentClient()
|