| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- # !/usr/bin/ python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :__init__.py.py
- @IDE :PyCharm
- @Author :
- @Date :2025/7/10 16:30
- '''
- from langgraph.prebuilt import create_react_agent
- from langgraph.checkpoint.memory import MemorySaver
- from langchain_core.prompts import ChatPromptTemplate
- from langchain_core.messages import HumanMessage
- from logger.loggering import server_logger
- from utils.utils import get_models
- from function.function_call import FunctionCall
- from io import StringIO
- import sys
- class XiwuzcAgent:
- """
- Xiwuzc 智能助手+function call
- """
- def __init__(self):
- # 初始化
- self.init_agent()
- # 初始化 agent 对象
- def init_agent(self):
- # 获取部署的模型列表
- llm, chat, embed = get_models()
- self.llm = llm
- self.chat = chat
- # 初始化 工具列表
- function_call = FunctionCall()
- tools = [
- function_call.query_recently_cattle_farm_ambient_info,
- function_call.query_recently_cattle_temperature,
- function_call.query_recently_cattle_eat_water,
- ]
- # 创建系统Prompt提示语
- system_prompt = self.create_sys_prompt()
- prompt = ChatPromptTemplate.from_messages([
- ("system", system_prompt),
- ("placeholder", "{messages}"),
- ("placeholder", "{agent_scratchpad}")
- ])
- # 创建Agent
- self.agent_executor = create_react_agent(
- self.llm,
- tools=tools, #
- prompt=prompt,
- checkpointer=MemorySaver()
- )
- def handle_query(self, input_query, session_id):
- # 流式处理事件
- config = {"configurable": {"thread_id": session_id}}
- try:
- events = self.agent_executor.stream(
- {"messages": [HumanMessage(content=input_query)]},
- config=config,
- stream_mode="values",
- )
- result_list = []
- # 打印流式事件的消息
- for event in events:
- message = event["messages"][-1] # 取最后一步信息
- result_list.append(message.content)
- # 转换为字符串并写入日志文件
- log_content = self.get_pretty_message_str(message)
- server_logger.info("\n" + log_content.strip())
- final_result = event["messages"][-1].content if result_list else None
- server_logger.info("=" * 50)
- server_logger.info(f"最终结果: \n {final_result}")
- server_logger.info("=" * 50)
- return final_result
- except Exception as e:
- server_logger.error(f"处理查询时出错: {e}")
- raise e
- # agent 非流式输出
- def handle_invoke_query(self, input_query, session_id):
- config = {"configurable": {"thread_id": session_id}}
- try:
- result = self.agent_executor.invoke(
- {"messages": [HumanMessage(content=input_query)]},
- config=config,
- stream_mode="values",
- )
- server_logger.info(f"result={result}")
- for presult in result["messages"]:
- server_logger.info(f'【agent】: {presult}')
- server_logger.info("=" * 50)
- final_result_conent = result["messages"][-1].content
- server_logger.info(f"final_result_conent={final_result_conent}")
- return final_result_conent
- except Exception as e:
- server_logger.error(f"处理查询时出错: {e}")
- raise e
- def get_pretty_message_str(self, message):
- """
- 捕获 pretty_print() 输出为字符串
- """
- captured_output = StringIO()
- sys.stdout = captured_output
- server_logger.info(message.pretty_print())
- sys.stdout = sys.__stdout__
- return captured_output.getvalue()
- @staticmethod
- def create_sys_prompt():
- system_prompt = """
- 你是一个农业智能专家,请根据提供的数据信息和规则信息分析是否存在异常并进行建议。
- 请严格按照以下步骤操作:
- 1. 检查可用工具
- 2. 必要时调用工具获取数据
- 3. 结合数据进行分析
- 4. 给出专业建议
- 注意:
- - 必须通过工具获取最新数据后再分析
- - 保持回答专业且简洁
- """
- return system_prompt
|