# !/usr/bin/python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :agent_mcp.py @IDE :PyCharm @Author : @Date :2025/7/21 10:12 ''' import json from langgraph.prebuilt import create_react_agent from sqlalchemy.sql.functions import user from foundation.observability.logger.loggering import server_logger from foundation.utils.common import handler_err from foundation.ai.models import get_models from foundation.utils.yaml_utils import get_system_prompt_config import threading import time from typing import Dict, List, Optional, AsyncGenerator, Any, OrderedDict from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.runnables import RunnableConfig from foundation.ai.agent.base_agent import BaseAgent from foundation.schemas.test_schemas import TestForm # from foundation.agent.function.test_funciton import test_funtion class TestAgentClient(BaseAgent): """ Xiwuzc 智能助手+MCP(带完整会话管理) - 针对场景优化 添加会话锁定机制,确保同一时间只有一个客户端可以使用特定会话 """ # 单例实例和线程锁 _instance = None _singleton_lock = threading.Lock() def __new__(cls): """线程安全的单例模式实现""" if cls._instance is None: with cls._singleton_lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialize() return cls._instance def _initialize(self): """初始化模型和会话管理""" llm, chat, embed = get_models() self.llm = llm self.chat = chat self.embed = embed self.agent_executor = None self.initialized = False self.psutil_available = True # 固定系统提示词 self.system_prompt = get_system_prompt_config()["system_prompt"] # 清理任务 self.cleanup_task = None server_logger.info(" client initialized") async def init_agent(self): """初始化agent_executor(只需一次)""" if self.initialized: return # 获取部署的模型列表 server_logger.info(f"系统提示词 system_prompt:{self.system_prompt}") # 创建提示词模板 - 使用固定的系统提示词 prompt = ChatPromptTemplate.from_messages([ ("system", self.system_prompt), MessagesPlaceholder(variable_name="messages"), ("placeholder", "{agent_scratchpad}") ]) # # 创建Agent - 不再使用MemorySaver # self.agent_executor = create_react_agent( # self.llm, # tools=[test_funtion.query_info , test_funtion.execute , test_funtion.handle] , # 专用工具集 + 私有知识库检索工具 # prompt=prompt # ) self.initialized = True server_logger.info(" agent initialized") async def handle_query(self, trace_id: str, task_prompt_info: dict, input_query, context=None, config_param: TestForm = None): try: # 确保agent已初始化 if not self.initialized: await self.init_agent() session_id = config_param.session_id try: # 构建输入消息 input_message , input_summary_context = self.get_input_context( trace_id=trace_id, task_prompt_info=task_prompt_info, input_query=input_query, context=context ) # 用于模型对话使用 input_human_message = HumanMessage(content=input_message) # 用于对话历史记录摘要 input_human_summary_message = HumanMessage(content=input_summary_context) # 获取历史消息 history_messages = [] # 构造完整的消息列表 all_messages = list(history_messages) + [input_human_message] # 配置执行上下文 config = RunnableConfig( configurable={"thread_id": session_id}, runnable_kwargs={"recursion_limit": 15} ) # 执行智能体 events = self.agent_executor.astream( {"messages": all_messages}, config=config, stream_mode="values" ) # 处理结果 full_response = [] async for event in events: if isinstance(event["messages"][-1], AIMessage): chunk = event["messages"][-1].content full_response.append(chunk) log_content = self.get_pretty_message_str(event["messages"][-1]) server_logger.info("\n" + log_content.strip(), trace_id=trace_id) if full_response: full_text = "".join(full_response) server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}") full_text = self.clean_json_output(full_text) return full_text finally: # 确保释放会话锁 pass except PermissionError as e: # 处理会话被其他设备锁定的情况 return str(e) except Exception as e: handler_err(server_logger, trace_id=trace_id, err=e, err_name='agent/chat') return f"系统错误: {str(e)}" async def handle_query_stream( self, trace_id: str, task_prompt_info: dict, input_query: str, context: Optional[str] = None, header_info: Optional[Dict] = None, config_param: TestForm = None, ) -> AsyncGenerator[str, None]: """流式处理查询(优化缓冲管理)""" try: # 确保agent已初始化 if not self.initialized: await self.init_agent() session_id = config_param.session_id try: # 构建输入消息 input_message , input_summary_context = self.get_input_context( trace_id=trace_id, task_prompt_info=task_prompt_info, input_query=input_query, context=context ) server_logger.info(trace_id=trace_id, msg=f"input_context: {input_message}") # 用于模型对话使用 input_human_message = HumanMessage(content=input_message) # 用于对话历史记录摘要 input_human_summary_message = HumanMessage(content=input_summary_context) # 获取历史消息 history_messages = [] # 构造完整的消息列表 all_messages = list(history_messages) + [input_human_message] # 配置执行上下文 config = RunnableConfig( configurable={"thread_id": session_id}, runnable_kwargs={"recursion_limit": 15} ) # 流式执行 events = self.agent_executor.astream_events( {"messages": all_messages}, config=config, stream_mode="values" ) full_response = [] buffer = [] last_flush_time = time.time() # 流式处理事件 async for event in events: # 只在特定事件类型时打印日志 self.log_stream_pretty_message(trace_id=trace_id, event=event) if 'chunk' in event['data'] and "on_chat_model_stream" in event['event']: chunk = event['data']['chunk'].content full_response.append(chunk) # 缓冲管理策略 buffer.append(chunk) current_time = time.time() # 满足以下任一条件即刷新缓冲区 if (len(buffer) >= 3 or # 达到最小块数 (current_time - last_flush_time) > 0.5 or # 超时 any(chunk.endswith((c, f"{c} ")) for c in ['.', '。', '!', '?', '\n', ';', ';'])): # 自然断点 # 合并并发送缓冲内容 combined = ''.join(buffer) yield combined # 重置缓冲 buffer.clear() last_flush_time = current_time # 处理剩余内容 if buffer: yield ''.join(buffer) # 将完整响应添加到历史并进行压缩 if full_response: full_text = "".join(full_response) server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}") finally: # 确保释放会话锁 pass except PermissionError as e: yield json.dumps({"error": str(e)}) except Exception as e: handler_err(server_logger, trace_id=trace_id, err=e, err_name='test_stream') yield json.dumps({"error": f"系统错误: {str(e)}"}) test_agent_client = TestAgentClient()