agent_mcp.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. # !/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :agent_mcp.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/21 10:12
  9. '''
  10. import json
  11. import trace
  12. from langgraph.prebuilt import create_react_agent
  13. from sqlalchemy.sql.functions import user
  14. from logger.loggering import server_logger
  15. from utils.common import handler_err
  16. from utils.utils import get_models
  17. from utils.yaml_utils import system_prompt_config
  18. from views import mcp_server
  19. import threading
  20. import time
  21. from typing import Dict, List, Optional, AsyncGenerator, Any, OrderedDict
  22. from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
  23. from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
  24. from langchain_core.runnables import RunnableConfig
  25. from agent.base_agent import BaseAgent
  26. from agent.session.session_manager import SessionManager
  27. from function.local_function import get_knowledge_answer
  28. from schemas.cattle_farm import FarmConfig
  29. class XiwuzcAgentMcpClient(BaseAgent):
  30. """
  31. Xiwuzc 智能助手+MCP(带完整会话管理) - 针对场景优化
  32. 添加会话锁定机制,确保同一时间只有一个客户端可以使用特定会话
  33. """
  34. # 单例实例和线程锁
  35. _instance = None
  36. _singleton_lock = threading.Lock()
  37. def __new__(cls):
  38. """线程安全的单例模式实现"""
  39. if cls._instance is None:
  40. with cls._singleton_lock:
  41. if cls._instance is None:
  42. cls._instance = super().__new__(cls)
  43. cls._instance._initialize()
  44. return cls._instance
  45. def _initialize(self):
  46. """初始化模型和会话管理"""
  47. llm, chat, embed = get_models()
  48. self.llm = llm
  49. self.chat = chat
  50. self.embed = embed
  51. self.agent_executor = None
  52. self.initialized = False
  53. self.psutil_available = True
  54. # 固定系统提示词
  55. self.system_prompt = system_prompt_config["system_prompt"]
  56. # 清理任务
  57. self.cleanup_task = None
  58. server_logger.info("mulian client initialized")
  59. async def init_agent(self):
  60. """初始化agent_executor(只需一次)"""
  61. if self.initialized:
  62. return
  63. # 获取部署的模型列表
  64. server_logger.info(f"系统提示词 system_prompt:{self.system_prompt}")
  65. # 创建提示词模板 - 使用固定的系统提示词
  66. prompt = ChatPromptTemplate.from_messages([
  67. ("system", self.system_prompt),
  68. MessagesPlaceholder(variable_name="messages"),
  69. ("placeholder", "{agent_scratchpad}")
  70. ])
  71. # 创建Agent - 不再使用MemorySaver
  72. self.agent_executor = create_react_agent(
  73. self.llm,
  74. tools=[] , # 专用工具集 + 私有知识库检索工具
  75. prompt=prompt
  76. )
  77. self.initialized = True
  78. server_logger.info("mulian agent initialized")
  79. async def handle_query(self, trace_id: str, business_scene: str, task_prompt_info: dict, input_query, context=None,
  80. supplement_info=None, header_info=None , config_param: FarmConfig = None):
  81. try:
  82. # 确保agent已初始化
  83. if not self.initialized:
  84. await self.init_agent()
  85. session_id = config_param.sessionId
  86. user_role = config_param.userRole
  87. # 构建会话管理
  88. session_manager = SessionManager(lock_key_prefix="Chat_", trace_id=trace_id , session_id=session_id)
  89. # 尝试获取会话锁(5秒超时)
  90. if not await session_manager.acquire_session_lock():
  91. return "错误:此会话已在其他设备登录,请使用新会话或等待解锁"
  92. try:
  93. # 构建输入消息
  94. input_message , input_summary_context = self.get_input_context(
  95. trace_id=trace_id,
  96. business_scene=business_scene,
  97. task_prompt_info=task_prompt_info,
  98. input_query=input_query,
  99. context=context,
  100. supplement_info=supplement_info,
  101. header_info=header_info,
  102. config_param={
  103. "session_id": session_id,
  104. "user_role": user_role
  105. }
  106. )
  107. # 用于模型对话使用
  108. input_human_message = HumanMessage(content=input_message)
  109. # 用于对话历史记录摘要
  110. input_human_summary_message = HumanMessage(content=input_summary_context)
  111. # 获取历史消息
  112. history_messages = await session_manager.get_memory_history()
  113. # 构造完整的消息列表
  114. all_messages = list(history_messages) + [input_human_message]
  115. # 配置执行上下文
  116. config = RunnableConfig(
  117. configurable={"thread_id": session_id},
  118. runnable_kwargs={"recursion_limit": 15}
  119. )
  120. # 执行智能体
  121. events = self.agent_executor.astream(
  122. {"messages": all_messages},
  123. config=config,
  124. stream_mode="values"
  125. )
  126. # 处理结果
  127. full_response = []
  128. async for event in events:
  129. if isinstance(event["messages"][-1], AIMessage):
  130. chunk = event["messages"][-1].content
  131. full_response.append(chunk)
  132. log_content = self.get_pretty_message_str(event["messages"][-1])
  133. server_logger.info("\n" + log_content.strip(), trace_id=trace_id)
  134. if full_response:
  135. full_text = "".join(full_response)
  136. server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}")
  137. # 保存并更新历史会话记录history
  138. await session_manager.save_update_memory_history(history_messages , input_human_summary_message, AIMessage(content=full_text))
  139. full_text = self.clean_json_output(full_text)
  140. return full_text
  141. finally:
  142. # 确保释放会话锁
  143. await session_manager.release_session_lock()
  144. except PermissionError as e:
  145. # 处理会话被其他设备锁定的情况
  146. return str(e)
  147. except Exception as e:
  148. handler_err(server_logger, trace_id=trace_id, err=e, err_name='query')
  149. return f"系统错误: {str(e)}"
  150. async def handle_query_stream(
  151. self,
  152. trace_id: str,
  153. business_scene: str,
  154. task_prompt_info: dict,
  155. input_query: str,
  156. context: Optional[str] = None,
  157. supplement_info: Optional[str] = None,
  158. header_info: Optional[Dict] = None,
  159. config_param: FarmConfig = None,
  160. ) -> AsyncGenerator[str, None]:
  161. """流式处理查询(优化缓冲管理)"""
  162. try:
  163. # 确保agent已初始化
  164. if not self.initialized:
  165. await self.init_agent()
  166. session_id = config_param.sessionId
  167. user_role = config_param.userRole
  168. # 构建会话管理
  169. session_manager = SessionManager(lock_key_prefix="ChatStream_", trace_id=trace_id , session_id=session_id)
  170. # 尝试获取会话锁(5秒超时)
  171. if not await session_manager.acquire_session_lock():
  172. yield json.dumps({"error": "此会话已在其他设备登录"})
  173. return
  174. try:
  175. # 构建输入消息
  176. input_message , input_summary_context = self.get_input_context(
  177. trace_id=trace_id,
  178. business_scene=business_scene,
  179. task_prompt_info=task_prompt_info,
  180. input_query=input_query,
  181. context=context,
  182. supplement_info=supplement_info,
  183. header_info=header_info,
  184. config_param={
  185. "session_id": session_id,
  186. "user_role": user_role
  187. }
  188. )
  189. server_logger.info(trace_id=trace_id, msg=f"input_context: {input_message}")
  190. # 用于模型对话使用
  191. input_human_message = HumanMessage(content=input_message)
  192. # 用于对话历史记录摘要
  193. input_human_summary_message = HumanMessage(content=input_summary_context)
  194. # 获取历史消息
  195. history_messages = await session_manager.get_memory_history()
  196. # 构造完整的消息列表
  197. all_messages = list(history_messages) + [input_human_message]
  198. # 配置执行上下文
  199. config = RunnableConfig(
  200. configurable={"thread_id": session_id},
  201. runnable_kwargs={"recursion_limit": 15}
  202. )
  203. # 流式执行
  204. events = self.agent_executor.astream_events(
  205. {"messages": all_messages},
  206. config=config,
  207. stream_mode="values"
  208. )
  209. full_response = []
  210. buffer = []
  211. last_flush_time = time.time()
  212. # 流式处理事件
  213. async for event in events:
  214. # 只在特定事件类型时打印日志
  215. self.log_stream_pretty_message(trace_id=trace_id, event=event)
  216. if 'chunk' in event['data'] and "on_chat_model_stream" in event['event']:
  217. chunk = event['data']['chunk'].content
  218. full_response.append(chunk)
  219. # 缓冲管理策略
  220. buffer.append(chunk)
  221. current_time = time.time()
  222. # 满足以下任一条件即刷新缓冲区
  223. if (len(buffer) >= 3 or # 达到最小块数
  224. (current_time - last_flush_time) > 0.5 or # 超时
  225. any(chunk.endswith((c, f"{c} ")) for c in
  226. ['.', '。', '!', '?', '\n', ';', ';'])): # 自然断点
  227. # 合并并发送缓冲内容
  228. combined = ''.join(buffer)
  229. yield combined
  230. # 重置缓冲
  231. buffer.clear()
  232. last_flush_time = current_time
  233. # 处理剩余内容
  234. if buffer:
  235. yield ''.join(buffer)
  236. # 将完整响应添加到历史并进行压缩
  237. if full_response:
  238. full_text = "".join(full_response)
  239. server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}")
  240. # 保存并更新历史会话记录history
  241. await session_manager.save_update_memory_history(history_messages , input_human_summary_message, AIMessage(content=full_text))
  242. finally:
  243. # 确保释放会话锁
  244. await session_manager.release_session_lock()
  245. except PermissionError as e:
  246. yield json.dumps({"error": str(e)})
  247. except Exception as e:
  248. handler_err(server_logger, trace_id=trace_id, err=e, err_name='query_stream')
  249. yield json.dumps({"error": f"系统错误: {str(e)}"})
  250. client = XiwuzcAgentMcpClient()