# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :cus_streamer.py @IDE :PyCharm @Author : @Date :2025/7/14 12:04 ''' from langchain_core.messages import HumanMessage from typing import AsyncGenerator import asyncio class AdaptiveStreamer: def __init__(self, min_chunk: int = 256, max_chunk: int = 4096, initial_chunk: int = 4): self.min_chunk = min_chunk self.max_chunk = max_chunk self.chunk_size = initial_chunk self.buffer = "" self.last_latency = 0.0 async def astream(self, model, prompt: str, config, stream_mode="values") -> AsyncGenerator[bytes, None]: """ 自适应流式输出 """ async for langchain_chunk in model.astream({"messages": [HumanMessage(content=prompt)]}, config=config, stream_mode=stream_mode): # 检查是否有内容属性 if not hasattr(langchain_chunk['messages'][-1], 'content') or not langchain_chunk['messages'][-1].content: continue # 跳过空内容块 if isinstance(langchain_chunk['messages'][-1], HumanMessage): continue try: # 添加到缓冲区 chunk_bytes = langchain_chunk['messages'][-1].content self.buffer += chunk_bytes # 处理缓冲区 while len(self.buffer) >= self.chunk_size: # 提取块 output_chunk = self.buffer[:self.chunk_size] self.buffer = self.buffer[self.chunk_size:] # 记录发送时间 start_time = asyncio.get_event_loop().time() yield output_chunk send_duration = asyncio.get_event_loop().time() - start_time # 基于发送时间调整块大小 self.adjust_chunk_size(send_duration) except Exception as e: # 处理编码或其他错误 error_msg = f"[错误] {str(e)}".encode('utf-8') yield error_msg continue # 继续处理后续块 # 发送剩余内容 if self.buffer: yield self.buffer self.buffer = "" def adjust_chunk_size(self, send_duration: float): """ 基于发送时间调整块大小 """ # 计算发送速率(字节/秒) if send_duration > 0: send_rate = self.chunk_size / send_duration else: send_rate = float('inf') # 调整策略 if send_rate < 10000: # 低速网络(<10KB/s) new_size = max(self.min_chunk, int(self.chunk_size * 0.8)) elif send_rate > 100000: # 高速网络(>100KB/s) new_size = min(self.max_chunk, int(self.chunk_size * 1.2)) else: new_size = self.chunk_size # 应用平滑过渡 self.chunk_size = int(0.7 * self.chunk_size + 0.3 * new_size)