# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :__init__.py.py @IDE :PyCharm @Author : @Date :2025/7/10 16:30 ''' from langgraph.prebuilt import create_react_agent from langgraph.checkpoint.memory import MemorySaver from langchain_core.prompts import ChatPromptTemplate from langchain_core.messages import HumanMessage from logger.loggering import server_logger from utils.utils import get_models from function.function_call import FunctionCall from io import StringIO import sys class XiwuzcAgent: """ Xiwuzc 智能助手+function call """ def __init__(self): # 初始化 self.init_agent() # 初始化 agent 对象 def init_agent(self): # 获取部署的模型列表 llm, chat, embed = get_models() self.llm = llm self.chat = chat # 初始化 工具列表 function_call = FunctionCall() tools = [ function_call.query_recently_cattle_farm_ambient_info, function_call.query_recently_cattle_temperature, function_call.query_recently_cattle_eat_water, ] # 创建系统Prompt提示语 system_prompt = self.create_sys_prompt() prompt = ChatPromptTemplate.from_messages([ ("system", system_prompt), ("placeholder", "{messages}"), ("placeholder", "{agent_scratchpad}") ]) # 创建Agent self.agent_executor = create_react_agent( self.llm, tools=tools, # prompt=prompt, checkpointer=MemorySaver() ) def handle_query(self, input_query, session_id): # 流式处理事件 config = {"configurable": {"thread_id": session_id}} try: events = self.agent_executor.stream( {"messages": [HumanMessage(content=input_query)]}, config=config, stream_mode="values", ) result_list = [] # 打印流式事件的消息 for event in events: message = event["messages"][-1] # 取最后一步信息 result_list.append(message.content) # 转换为字符串并写入日志文件 log_content = self.get_pretty_message_str(message) server_logger.info("\n" + log_content.strip()) final_result = event["messages"][-1].content if result_list else None server_logger.info("=" * 50) server_logger.info(f"最终结果: \n {final_result}") server_logger.info("=" * 50) return final_result except Exception as e: server_logger.error(f"处理查询时出错: {e}") raise e # agent 非流式输出 def handle_invoke_query(self, input_query, session_id): config = {"configurable": {"thread_id": session_id}} try: result = self.agent_executor.invoke( {"messages": [HumanMessage(content=input_query)]}, config=config, stream_mode="values", ) server_logger.info(f"result={result}") for presult in result["messages"]: server_logger.info(f'【agent】: {presult}') server_logger.info("=" * 50) final_result_conent = result["messages"][-1].content server_logger.info(f"final_result_conent={final_result_conent}") return final_result_conent except Exception as e: server_logger.error(f"处理查询时出错: {e}") raise e def get_pretty_message_str(self, message): """ 捕获 pretty_print() 输出为字符串 """ captured_output = StringIO() sys.stdout = captured_output server_logger.info(message.pretty_print()) sys.stdout = sys.__stdout__ return captured_output.getvalue() @staticmethod def create_sys_prompt(): system_prompt = """ 你是一个农业智能专家,请根据提供的数据信息和规则信息分析是否存在异常并进行建议。 请严格按照以下步骤操作: 1. 检查可用工具 2. 必要时调用工具获取数据 3. 结合数据进行分析 4. 给出专业建议 注意: - 必须通过工具获取最新数据后再分析 - 保持回答专业且简洁 """ return system_prompt