| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- # !/usr/bin/python
- # -*- coding: utf-8 -*-
- '''
- @Project :
- @File :workflow_graph.py
- @IDE :Cursor
- @Author :LINGMIN
- @Date :2025/08/10 18:00
- '''
- from foundation.agent.workflow.test_cus_state import TestCusState
- from foundation.agent.workflow.test_workflow_node import TestWorkflowNode
- from langgraph.graph import START, StateGraph, END
- from langgraph.checkpoint.memory import MemorySaver
- from foundation.logger.loggering import server_logger
- from typing import AsyncGenerator
- import time
- from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
- from foundation.utils.common import return_json, handler_err
- import json
- from foundation.schemas.test_schemas import TestForm, FormConfig
- class TestWorkflowGraph:
- """
- 工作流图
- """
- def __init__(self):
- self.workflow_node = TestWorkflowNode()
- self.checkpoint_saver = MemorySaver()
- self.app = self.init_workflow_graph()
- # 将生成的图片保存到文件
- self.write_graph()
- def init_workflow_graph(self):
- """
- 初始化工作流图
- 使用 graph.get_state 和 get_state_history 检查状态。
- 启用 debug=True 查看详细日志。
- 使用 graph.get_graph().to_dot() 可视化状态图。
- """
- # 构建工作流图 创建状态图 , state_update_method="merge"
- workflow = StateGraph(TestCusState)
- ######分支2、代理Agent supervisor_agent ##################################
- # 节点: 代理 agent 节点
- workflow.add_node("supervisor_agent", self.workflow_node.supervisor_agent)
- # agent节点1: 纯生成类问题
- workflow.add_node("chat_box_generate", self.workflow_node.chat_box_generate)
- # agent节点2:
- workflow.add_node("common_agent", self.workflow_node.common_agent_node)
- ###### 节点分支线条 ##################################
- # 固定问题识别
- workflow.add_edge(START, "supervisor_agent")
- # 在图状态中填充 ‘next’字段,路由到具体的某个节点或结束图的运行,从来指定如何执行接下来的任务。
- workflow.add_conditional_edges(source="supervisor_agent",
- path=lambda state: state["route_next"],
- # 显式映射每个返回值到目标节点
- path_map={
- "chat_box_generate": "chat_box_generate",
- "common_agent": "common_agent",
-
- }
- )
- supervisor_members_list = ["chat_box_generate" , "common_agent"]
- # 每个子代理 在完成后总是向主管 “汇报”
- for agent_member in supervisor_members_list:
- workflow.add_edge(agent_member, END) # 直接结束
- #workflow.add_edge(agent_member, "supervisor_agent") # 回到路由 继续 判断执行
-
- #编译图
- app = workflow.compile(checkpointer=self.checkpoint_saver)
- #print(app.get_graph().draw_ascii())
- server_logger.info(f"【图工作流构建完成】app={app}")
- return app
- async def handle_query_stream(self, param: TestForm, trace_id: str)-> AsyncGenerator[str, None]:
- """
- 根据场景获取智能体反馈 (SSE流式响应)
- """
- try:
- # 提取参数
- user_input = param.input
- session_id = param.config.session_id
- context = param.context
-
- human_messages = [HumanMessage(content=user_input)]
- # 完整的初始状态
- initial_state = {
- "messages": human_messages,
- "session_id": session_id, # 会话id
- "trace_id": trace_id, # 日志链路跟踪id
- "task_prompt_info": {},
- "context": context , # 上下文数据
- "user_input": user_input,
- }
- # 唯一的任务 ID(模拟 session_id / thread_id)
- config = {"configurable": {"thread_id": session_id},
- "runnable_kwargs":{"recursion_limit": 50}
- }
- server_logger.info("======================== 启动新任务 ===========================") #, interrupt_before=["user_confirm_task_planning"]
- full_response = []
- buffer = []
- last_flush_time = time.time()
- events = self.app.astream_events(initial_state,
- config=config ,
- version="v1", # 确保使用正确版本
- stream_mode="values" # 或者 "updates"
- )
- # 流式处理事件
- async for event in events:
- #server_logger.info(trace_id=trace_id, msg=f"→ 事件类型: {event['event']}")
- #server_logger.info(trace_id=trace_id, msg=f"→ 事件数据: {event['data']}")
-
- # 处理聊天模型流式输出
- if event['event'] == 'on_chat_model_stream':
- if 'chunk' in event['data']:
- chunk = event['data']['chunk']
- if hasattr(chunk, 'content'):
- content = chunk.content
- full_response.append(content)
-
- # 缓冲管理策略
- buffer.append(content)
- current_time = time.time()
-
- # 刷新条件
- should_flush = (
- len(buffer) >= 3 or # 达到最小块数
- (current_time - last_flush_time) > 0.5 or # 超时
- any(content.endswith(('.', '。', '!', '?', '\n', ';', ';', '?', '!')) for content in buffer) # 自然断点
- )
-
- if should_flush:
- combined = ''.join(buffer)
- yield combined
-
- buffer.clear()
- last_flush_time = current_time
-
- # 也可以处理其他类型的事件
- # elif event['event'] == 'on_chain_stream':
- # server_logger.info(trace_id=trace_id, msg=f"链式处理: {event['data']}")
-
- # elif event['event'] == 'on_tool_stream':
- # server_logger.info(trace_id=trace_id, msg=f"工具调用: {event['data']}")
-
- # 处理剩余缓冲内容
- if buffer:
- yield ''.join(buffer)
-
- # 将完整响应添加到历史并进行压缩
- if full_response:
- full_text = "".join(full_response)
- server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}", log_type="graph/stream")
-
- except Exception as e:
- handler_err(server_logger, trace_id=trace_id, err=e, err_name='graph/stream')
- yield json.dumps({"error": f"系统错误: {str(e)}"})
- def write_graph(self):
- """
- 将图写入文件
- """
- #
- graph_png = self.app.get_graph().draw_mermaid_png()
- with open("build_graph_app.png", "wb") as f:
- f.write(graph_png)
- server_logger.info(f"【图工作流写入文件完成】")
- # 实例化
- test_workflow_graph = TestWorkflowGraph()
|