| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- # !/usr/bin/ python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :cus_streamer.py
- @IDE :PyCharm
- @Author :
- @Date :2025/7/14 12:04
- '''
- from langchain_core.messages import HumanMessage
- from typing import AsyncGenerator
- import asyncio
- class AdaptiveStreamer:
- def __init__(self, min_chunk: int = 256, max_chunk: int = 4096, initial_chunk: int = 4):
- self.min_chunk = min_chunk
- self.max_chunk = max_chunk
- self.chunk_size = initial_chunk
- self.buffer = ""
- self.last_latency = 0.0
- async def astream(self, model, prompt: str, config, stream_mode="values") -> AsyncGenerator[bytes, None]:
- """
- 自适应流式输出
- """
- async for langchain_chunk in model.astream({"messages": [HumanMessage(content=prompt)]}, config=config, stream_mode=stream_mode):
- # 检查是否有内容属性
- if not hasattr(langchain_chunk['messages'][-1], 'content') or not langchain_chunk['messages'][-1].content:
- continue # 跳过空内容块
- if isinstance(langchain_chunk['messages'][-1], HumanMessage):
- continue
- try:
- # 添加到缓冲区
- chunk_bytes = langchain_chunk['messages'][-1].content
- self.buffer += chunk_bytes
- # 处理缓冲区
- while len(self.buffer) >= self.chunk_size:
- # 提取块
- output_chunk = self.buffer[:self.chunk_size]
- self.buffer = self.buffer[self.chunk_size:]
- # 记录发送时间
- start_time = asyncio.get_event_loop().time()
- yield output_chunk
- send_duration = asyncio.get_event_loop().time() - start_time
- # 基于发送时间调整块大小
- self.adjust_chunk_size(send_duration)
- except Exception as e:
- # 处理编码或其他错误
- error_msg = f"[错误] {str(e)}".encode('utf-8')
- yield error_msg
- continue # 继续处理后续块
- # 发送剩余内容
- if self.buffer:
- yield self.buffer
- self.buffer = ""
- def adjust_chunk_size(self, send_duration: float):
- """
- 基于发送时间调整块大小
- """
- # 计算发送速率(字节/秒)
- if send_duration > 0:
- send_rate = self.chunk_size / send_duration
- else:
- send_rate = float('inf')
- # 调整策略
- if send_rate < 10000: # 低速网络(<10KB/s)
- new_size = max(self.min_chunk, int(self.chunk_size * 0.8))
- elif send_rate > 100000: # 高速网络(>100KB/s)
- new_size = min(self.max_chunk, int(self.chunk_size * 1.2))
- else:
- new_size = self.chunk_size
- # 应用平滑过渡
- self.chunk_size = int(0.7 * self.chunk_size + 0.3 * new_size)
|