# !/usr/bin/python # -*- coding: utf-8 -*- ''' @Project : @File :workflow_graph.py @IDE :Cursor @Author :LINGMIN @Date :2025/08/10 18:00 ''' from foundation.agent.workflow.test_cus_state import TestCusState from foundation.agent.workflow.test_workflow_node import TestWorkflowNode from langgraph.graph import START, StateGraph, END from langgraph.checkpoint.memory import MemorySaver from foundation.logger.loggering import server_logger from typing import AsyncGenerator import time from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from foundation.utils.common import return_json, handler_err import json from foundation.schemas.test_schemas import TestForm, FormConfig class TestWorkflowGraph: """ 工作流图 """ def __init__(self): self.workflow_node = TestWorkflowNode() self.checkpoint_saver = MemorySaver() self.app = self.init_workflow_graph() # 将生成的图片保存到文件 self.write_graph() def init_workflow_graph(self): """ 初始化工作流图 使用 graph.get_state 和 get_state_history 检查状态。 启用 debug=True 查看详细日志。 使用 graph.get_graph().to_dot() 可视化状态图。 """ # 构建工作流图 创建状态图 , state_update_method="merge" workflow = StateGraph(TestCusState) ######分支2、代理Agent supervisor_agent ################################## # 节点: 代理 agent 节点 workflow.add_node("supervisor_agent", self.workflow_node.supervisor_agent) # agent节点1: 纯生成类问题 workflow.add_node("chat_box_generate", self.workflow_node.chat_box_generate) # agent节点2: workflow.add_node("common_agent", self.workflow_node.common_agent_node) ###### 节点分支线条 ################################## # 固定问题识别 workflow.add_edge(START, "supervisor_agent") # 在图状态中填充 ‘next’字段,路由到具体的某个节点或结束图的运行,从来指定如何执行接下来的任务。 workflow.add_conditional_edges(source="supervisor_agent", path=lambda state: state["route_next"], # 显式映射每个返回值到目标节点 path_map={ "chat_box_generate": "chat_box_generate", "common_agent": "common_agent", } ) supervisor_members_list = ["chat_box_generate" , "common_agent"] # 每个子代理 在完成后总是向主管 “汇报” for agent_member in supervisor_members_list: workflow.add_edge(agent_member, END) # 直接结束 #workflow.add_edge(agent_member, "supervisor_agent") # 回到路由 继续 判断执行 #编译图 app = workflow.compile(checkpointer=self.checkpoint_saver) #print(app.get_graph().draw_ascii()) server_logger.info(f"【图工作流构建完成】app={app}") return app async def handle_query_stream(self, param: TestForm, trace_id: str)-> AsyncGenerator[str, None]: """ 根据场景获取智能体反馈 (SSE流式响应) """ try: # 提取参数 user_input = param.input session_id = param.config.session_id context = param.context human_messages = [HumanMessage(content=user_input)] # 完整的初始状态 initial_state = { "messages": human_messages, "session_id": session_id, # 会话id "trace_id": trace_id, # 日志链路跟踪id "task_prompt_info": {}, "context": context , # 上下文数据 "user_input": user_input, } # 唯一的任务 ID(模拟 session_id / thread_id) config = {"configurable": {"thread_id": session_id}, "runnable_kwargs":{"recursion_limit": 50} } server_logger.info("======================== 启动新任务 ===========================") #, interrupt_before=["user_confirm_task_planning"] full_response = [] buffer = [] last_flush_time = time.time() events = self.app.astream_events(initial_state, config=config , version="v1", # 确保使用正确版本 stream_mode="values" # 或者 "updates" ) # 流式处理事件 async for event in events: #server_logger.info(trace_id=trace_id, msg=f"→ 事件类型: {event['event']}") #server_logger.info(trace_id=trace_id, msg=f"→ 事件数据: {event['data']}") # 处理聊天模型流式输出 if event['event'] == 'on_chat_model_stream': if 'chunk' in event['data']: chunk = event['data']['chunk'] if hasattr(chunk, 'content'): content = chunk.content full_response.append(content) # 缓冲管理策略 buffer.append(content) current_time = time.time() # 刷新条件 should_flush = ( len(buffer) >= 3 or # 达到最小块数 (current_time - last_flush_time) > 0.5 or # 超时 any(content.endswith(('.', '。', '!', '?', '\n', ';', ';', '?', '!')) for content in buffer) # 自然断点 ) if should_flush: combined = ''.join(buffer) yield combined buffer.clear() last_flush_time = current_time # 也可以处理其他类型的事件 # elif event['event'] == 'on_chain_stream': # server_logger.info(trace_id=trace_id, msg=f"链式处理: {event['data']}") # elif event['event'] == 'on_tool_stream': # server_logger.info(trace_id=trace_id, msg=f"工具调用: {event['data']}") # 处理剩余缓冲内容 if buffer: yield ''.join(buffer) # 将完整响应添加到历史并进行压缩 if full_response: full_text = "".join(full_response) server_logger.info(trace_id=trace_id, msg=f"full_response: {full_text}", log_type="graph/stream") except Exception as e: handler_err(server_logger, trace_id=trace_id, err=e, err_name='graph/stream') yield json.dumps({"error": f"系统错误: {str(e)}"}) def write_graph(self): """ 将图写入文件 """ # graph_png = self.app.get_graph().draw_mermaid_png() with open("build_graph_app.png", "wb") as f: f.write(graph_png) server_logger.info(f"【图工作流写入文件完成】") # 实例化 test_workflow_graph = TestWorkflowGraph()