# !/usr/bin/ python
# -*- coding: utf-8 -*-
'''
@Project : lq-agent-api
@File :model_generate.py
@IDE :PyCharm
@Author :
@Date :2025/7/14 14:22
'''
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
from foundation.ai.models.model_handler import model_handler
from foundation.observability.logger.loggering import review_logger as logger
import asyncio
import re
import time
from typing import Optional, Callable, Any, List, Union
# ============================================================
# 思考内容过滤(统一收敛在调用层)
#
# Qwen3.5 等模型在 enable_thinking=True 时会输出思考过程,
# 标准格式为 ...,但部分 SGLang 部署会输出
# "Thinking Process: ... "(缺 开标签)的畸形格式。
# 统一在此处去除,避免思考内容污染业务输出。
# ============================================================
_THINK_BLOCK_PATTERN = re.compile(r".*?\s*", re.DOTALL)
_DANGLING_THINK_PATTERN = re.compile(r"[\s\S]*$")
# SGLang 畸形格式:Thinking Process: ... (有闭标签无开标签)
_SGLANG_THINK_PATTERN = re.compile(r"Thinking\s+Process:\s*[\s\S]*?\s*", re.DOTALL)
def _strip_thinking_content(content: str) -> str:
"""去除完整响应中的思考内容。
- 标准 ... 块:整段去除
- SGLang Thinking Process: ... 畸形格式:整段去除
- 仅 无 (被截断):从 起全部丢弃
- 不含思考标签:原文返回
"""
if not content:
return content
cleaned = _THINK_BLOCK_PATTERN.sub("", content)
cleaned = _SGLANG_THINK_PATTERN.sub("", cleaned)
if "" in cleaned:
cleaned = _DANGLING_THINK_PATTERN.sub("", cleaned)
logger.warning("[模型调用] 响应包含未闭合的 块,已截断丢弃")
return cleaned.strip()
class _ThinkingBlockStreamFilter:
"""流式响应中过滤思考内容的状态机。
处理 chunk 边界穿过标签的情况,保证调用方拿到的流不会泄漏
任何思考片段。支持标准 ... 和 SGLang 畸形
"Thinking Process: ... " 两种格式。
"""
_OPEN = ""
_CLOSE = ""
_SGLANG_OPEN = "Thinking Process:"
def __init__(self):
self._buf = ""
self._inside = False
self._open_tag = ""
def feed(self, chunk: str) -> str:
"""喂入一个 chunk,返回此刻应输出的内容(可能为空字符串)。"""
if not chunk:
return ""
self._buf += chunk
out = []
while True:
if self._inside:
idx = self._buf.find(self._CLOSE)
if idx == -1:
keep_len = self._partial_match_len(self._buf, self._CLOSE)
self._buf = self._buf[-keep_len:] if keep_len else ""
break
self._buf = self._buf[idx + len(self._CLOSE):].lstrip()
self._inside = False
else:
idx_xml = self._buf.find(self._OPEN)
idx_sg = self._buf.find(self._SGLANG_OPEN)
if idx_xml == -1 and idx_sg == -1:
keep_xml = self._partial_match_len(self._buf, self._OPEN)
keep_sg = self._partial_match_len(self._buf, self._SGLANG_OPEN)
keep_len = max(keep_xml, keep_sg)
if keep_len:
out.append(self._buf[:-keep_len])
self._buf = self._buf[-keep_len:]
else:
out.append(self._buf)
self._buf = ""
break
if idx_xml >= 0 and (idx_sg == -1 or idx_xml <= idx_sg):
if idx_xml > 0:
out.append(self._buf[:idx_xml])
self._buf = self._buf[idx_xml + len(self._OPEN):]
self._open_tag = self._OPEN
else:
if idx_sg > 0:
out.append(self._buf[:idx_sg])
self._buf = self._buf[idx_sg + len(self._SGLANG_OPEN):]
self._open_tag = self._SGLANG_OPEN
self._inside = True
return "".join(out)
def flush(self) -> str:
"""流结束时调用,返回缓冲区剩余可输出内容。"""
if self._inside:
logger.warning("[模型流式调用] 流结束时仍在 块内,已丢弃尾部")
self._buf = ""
return ""
result = self._buf
self._buf = ""
return result
@staticmethod
def _partial_match_len(buf: str, tag: str) -> int:
"""返回 buf 末尾匹配 tag 前缀的最大长度(避免标签被切断后误输出)。"""
max_n = min(len(tag) - 1, len(buf))
for n in range(max_n, 0, -1):
if buf[-n:] == tag[:n]:
return n
return 0
def _sync_retry_with_backoff(
func: Callable,
*args,
max_retries: int = 3,
backoff_factor: float = 1.0,
trace_id: Optional[str] = None,
model_name: Optional[str] = None,
**kwargs
) -> Any:
"""
同步版本的带指数退避重试机制
注意:对于 502/503/504 等服务不可用错误,立即失败不重试
"""
model_info = model_name or "default"
def _is_server_unavailable_error(error: Exception) -> bool:
"""判断是否为服务端不可用错误(应立即失败)"""
error_str = str(error).lower()
unavailable_codes = ['502', '503', '504', 'internal server error']
return any(code in error_str for code in unavailable_codes)
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
error_str = str(e)
# 服务端不可用错误(502/503/504)立即失败,不重试
if _is_server_unavailable_error(e):
logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
raise
if attempt == max_retries:
logger.error(f"[模型调用] 达到最大重试次数 {max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
raise
wait_time = backoff_factor * (2 ** attempt)
logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
time.sleep(wait_time)
class GenerateModelClient:
"""
主要是生成式模型
"""
def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0):
# 获取默认模型
self.llm = model_handler.get_models()
self.chat = self.llm # 当前chat和llm使用相同模型
# 配置参数
self.default_timeout = default_timeout
self.max_retries = max_retries
self.backoff_factor = backoff_factor
# 保存model_handler引用,用于动态获取模型
self.model_handler = model_handler
async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, trace_id: Optional[str] = None, model_name: Optional[str] = None, **kwargs):
"""
带指数退避的重试机制,每次重试都有独立的超时控制
注意:对于 502/503/504 等服务不可用错误,立即失败不重试,
避免在服务端过载时继续加重负载。
"""
current_timeout = timeout or self.default_timeout
model_info = model_name or "default"
def _is_server_unavailable_error(error: Exception) -> bool:
"""判断是否为服务端不可用错误(应立即失败)"""
error_str = str(error).lower()
# 502: Bad Gateway, 503: Service Unavailable, 504: Gateway Timeout
unavailable_codes = ['502', '503', '504', 'internal server error']
return any(code in error_str for code in unavailable_codes)
for attempt in range(self.max_retries + 1):
try:
# 每次重试都有独立的超时时间
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=current_timeout
)
except asyncio.TimeoutError as e:
if attempt == self.max_retries:
logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终超时 | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
raise TimeoutError(f"模型调用在 {self.max_retries} 次重试后均超时")
wait_time = self.backoff_factor * (2 ** attempt)
logger.warning(f"[模型调用] 第 {attempt + 1} 次超时, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
await asyncio.sleep(wait_time)
except Exception as e:
error_str = str(e)
# 服务端不可用错误(502/503/504)立即失败,不重试
if _is_server_unavailable_error(e):
logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
raise
if attempt == self.max_retries:
logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
raise
wait_time = self.backoff_factor * (2 ** attempt)
logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
await asyncio.sleep(wait_time)
async def get_model_generate_invoke(
self,
trace_id: str,
task_prompt_info: Optional[dict] = None,
messages: Optional[List[BaseMessage]] = None,
system_prompt: Optional[str] = None,
user_prompt: Optional[str] = None,
prompt: Optional[str] = None,
timeout: Optional[int] = None,
model_name: Optional[str] = None,
enable_thinking: Optional[bool] = False,
function_name: Optional[str] = None
) -> str:
"""模型非流式生成(异步)
支持多种调用方式(优先级从高到低):
1. messages: 直接传入 LangChain Message 对象列表
2. system_prompt + user_prompt: 分别传入系统和用户提示词
3. prompt: 传入单条用户提示词字符串
4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
Args:
trace_id: 追踪ID
task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
system_prompt: 系统提示词字符串
user_prompt: 用户提示词字符串
prompt: 单条用户提示词字符串(无系统提示时使用)
timeout: 超时时间(秒),默认使用构造时的 default_timeout
model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
Returns:
str: 模型生成的文本内容
Raises:
ValueError: 参数组合错误
TimeoutError: 调用超时
Exception: 模型调用异常
Examples:
# 方式1: 使用 Message 列表(推荐)
messages = [SystemMessage(content="你是专家"), HumanMessage(content="请分析...")]
result = await client.get_model_generate_invoke("trace-001", messages=messages)
# 方式2: 分别传入系统和用户提示词
result = await client.get_model_generate_invoke(
"trace-001",
system_prompt="你是专家",
user_prompt="请分析..."
)
# 方式3: 传入单条提示词
result = await client.get_model_generate_invoke("trace-001", prompt="请分析...")
# 方式4: 兼容旧接口(使用 PromptLoader)
task_prompt_info = {"task_prompt": chat_template}
result = await client.get_model_generate_invoke("trace-001", task_prompt_info=task_prompt_info)
# 方式5: 使用功能名称从配置加载模型
result = await client.get_model_generate_invoke("trace-001", function_name="doc_classification_tertiary", system_prompt="...", user_prompt="...")
"""
start_time = time.time()
current_timeout = timeout or self.default_timeout
# 如果提供了功能名称,从配置加载模型和 thinking 模式
if function_name:
try:
from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
config_model = get_model_for_function(function_name)
config_thinking = get_thinking_mode_for_function(function_name)
if config_model:
model_name = config_model
logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
if config_thinking is not None and enable_thinking is False:
# 只有默认 False 时才覆盖,显式传入的参数优先
enable_thinking = config_thinking
logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
except Exception as e:
logger.warning(f"[模型调用] 加载功能配置失败 [{function_name}]: {e}")
# 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
if not model_name:
try:
from foundation.ai.models.model_config_loader import get_model_for_function
model_name = get_model_for_function("default")
logger.info(f"[模型调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
except Exception as e:
logger.warning(f"[模型调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
try:
# 选择模型
llm_to_use = self.model_handler.get_model_by_name(
model_name, enable_thinking=(enable_thinking or False)
) if model_name else self.llm
logger.info(f"[模型调用] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
# 构建消息列表(按优先级)
final_messages = self._build_messages(
messages=messages,
system_prompt=system_prompt,
user_prompt=user_prompt,
prompt=prompt,
task_prompt_info=task_prompt_info
)
# 针对 Qwen3.5 模型处理思考模式
model_to_invoke = llm_to_use
is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
if is_qwen35:
if enable_thinking is False:
# 显式禁用思考模式
model_to_invoke = llm_to_use.bind(
extra_body={"chat_template_kwargs": {"enable_thinking": False}}
)
logger.debug(f"[模型调用] 已禁用 Qwen3.5 思考模式: {model_name}")
elif enable_thinking is True:
# 显式启用思考模式
model_to_invoke = llm_to_use.bind(
extra_body={"chat_template_kwargs": {"enable_thinking": True}}
)
logger.debug(f"[模型调用] 已启用 Qwen3.5 思考模式: {model_name}")
else:
# enable_thinking is None,使用模型默认行为(通常是启用)
logger.debug(f"[模型调用] 使用 Qwen3.5 默认思考模式: {model_name}")
# 定义模型调用函数,使用原生 ainvoke
async def _invoke():
return await model_to_invoke.ainvoke(final_messages)
# 调用带重试机制
response = await self._retry_with_backoff(_invoke, timeout=current_timeout, trace_id=trace_id, model_name=model_name or "default")
elapsed_time = time.time() - start_time
logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
return _strip_thinking_content(response.content)
except asyncio.TimeoutError:
elapsed_time = time.time() - start_time
logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
raise TimeoutError(f"模型调用超时,trace_id: {trace_id}")
except Exception as e:
elapsed_time = time.time() - start_time
logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
raise
def _build_messages(
self,
messages: Optional[List[BaseMessage]] = None,
system_prompt: Optional[str] = None,
user_prompt: Optional[str] = None,
prompt: Optional[str] = None,
task_prompt_info: Optional[dict] = None
) -> List[BaseMessage]:
"""构建消息列表(内部方法)
优先级:messages > system_prompt+user_prompt > prompt > task_prompt_info
"""
# 方式1: 直接使用传入的 Message 列表
if messages is not None:
if not isinstance(messages, list):
raise ValueError("messages 必须是列表")
if len(messages) == 0:
raise ValueError("messages 不能为空列表")
logger.debug(f"使用传入的 messages 列表,共 {len(messages)} 条消息")
return messages
# 方式2: system_prompt + user_prompt
if system_prompt is not None and user_prompt is not None:
logger.debug("使用 system_prompt + user_prompt 构建消息")
return [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
# 方式3: 单独 system_prompt(可能是特殊情况)
if system_prompt is not None:
logger.debug("使用单独的 system_prompt 构建消息")
return [SystemMessage(content=system_prompt)]
# 方式4: 单条 prompt 字符串
if prompt is not None:
logger.debug("使用单条 prompt 字符串构建消息")
return [HumanMessage(content=prompt)]
# 方式5: 兼容旧接口 task_prompt_info
if task_prompt_info is not None:
if "task_prompt" not in task_prompt_info:
raise ValueError("task_prompt_info 必须包含 'task_prompt' 键")
task_prompt = task_prompt_info["task_prompt"]
if hasattr(task_prompt, 'format_messages'):
logger.debug("使用 task_prompt_info 中的 ChatPromptTemplate 构建消息")
return task_prompt.format_messages()
elif isinstance(task_prompt, str):
logger.debug("使用 task_prompt_info 中的字符串构建消息")
return [HumanMessage(content=task_prompt)]
else:
raise ValueError(f"task_prompt 类型不支持: {type(task_prompt)}")
# 没有提供任何有效参数
raise ValueError(
"必须提供以下参数之一: "
"messages, system_prompt+user_prompt, prompt, 或 task_prompt_info"
)
def get_model_generate_invoke_sync(
self,
trace_id: str,
task_prompt_info: Optional[dict] = None,
messages: Optional[List[BaseMessage]] = None,
system_prompt: Optional[str] = None,
user_prompt: Optional[str] = None,
prompt: Optional[str] = None,
timeout: Optional[int] = None,
model_name: Optional[str] = None,
enable_thinking: Optional[bool] = False,
function_name: Optional[str] = None
) -> str:
"""模型非流式生成(同步版本)
适用于同步上下文调用,功能与异步版本完全一致。
支持多种调用方式(优先级从高到低):
1. messages: 直接传入 LangChain Message 对象列表
2. system_prompt + user_prompt: 分别传入系统和用户提示词
3. prompt: 传入单条用户提示词字符串
4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
Args:
trace_id: 追踪ID
task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
system_prompt: 系统提示词字符串
user_prompt: 用户提示词字符串
prompt: 单条用户提示词字符串(无系统提示时使用)
timeout: 超时时间(秒),默认使用构造时的 default_timeout(同步版本忽略)
model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
Returns:
str: 模型生成的文本内容
Raises:
ValueError: 参数组合错误
Exception: 模型调用异常
Examples:
# 同步调用(用于同步上下文)
result = generate_model_client.get_model_generate_invoke_sync(
"trace-001",
system_prompt="你是专家",
user_prompt="请分析...",
function_name="doc_classification_secondary"
)
"""
start_time = time.time()
# 如果提供了功能名称,从配置加载模型和 thinking 模式
if function_name:
try:
from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
config_model = get_model_for_function(function_name)
config_thinking = get_thinking_mode_for_function(function_name)
if config_model:
model_name = config_model
logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的模型: {model_name}")
if config_thinking is not None and enable_thinking is False:
enable_thinking = config_thinking
logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
except Exception as e:
logger.warning(f"[模型调用-同步] 加载功能配置失败 [{function_name}]: {e}")
# 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
if not model_name:
try:
from foundation.ai.models.model_config_loader import get_model_for_function
model_name = get_model_for_function("default")
logger.info(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
except Exception as e:
logger.warning(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
try:
# 选择模型
llm_to_use = self.model_handler.get_model_by_name(
model_name, enable_thinking=(enable_thinking or False)
) if model_name else self.llm
logger.info(f"[模型调用-同步] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
# 构建消息列表(按优先级)
final_messages = self._build_messages(
messages=messages,
system_prompt=system_prompt,
user_prompt=user_prompt,
prompt=prompt,
task_prompt_info=task_prompt_info
)
# 针对 Qwen3.5 模型处理思考模式
model_to_invoke = llm_to_use
is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
if is_qwen35:
if enable_thinking is False:
model_to_invoke = llm_to_use.bind(
extra_body={"chat_template_kwargs": {"enable_thinking": False}}
)
logger.debug(f"[模型调用-同步] 已禁用 Qwen3.5 思考模式: {model_name}")
elif enable_thinking is True:
model_to_invoke = llm_to_use.bind(
extra_body={"chat_template_kwargs": {"enable_thinking": True}}
)
logger.debug(f"[模型调用-同步] 已启用 Qwen3.5 思考模式: {model_name}")
else:
logger.debug(f"[模型调用-同步] 使用 Qwen3.5 默认思考模式: {model_name}")
# 定义模型调用函数,使用同步 invoke
def _invoke():
return model_to_invoke.invoke(final_messages)
# 调用带重试机制(同步版本)
response = _sync_retry_with_backoff(
_invoke,
max_retries=self.max_retries,
backoff_factor=self.backoff_factor,
trace_id=trace_id,
model_name=model_name or "default"
)
elapsed_time = time.time() - start_time
logger.info(f"[模型调用-同步] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
return _strip_thinking_content(response.content)
except Exception as e:
elapsed_time = time.time() - start_time
logger.error(f"[模型调用-同步] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
raise
def get_model_generate_stream(
self,
trace_id: str,
task_prompt_info: Optional[dict] = None,
messages: Optional[List[BaseMessage]] = None,
system_prompt: Optional[str] = None,
user_prompt: Optional[str] = None,
prompt: Optional[str] = None,
timeout: Optional[int] = None,
model_name: Optional[str] = None,
enable_thinking: Optional[bool] = False,
function_name: Optional[str] = None
):
"""模型流式生成(同步生成器)
支持多种调用方式(优先级从高到低):
1. messages: 直接传入 LangChain Message 对象列表
2. system_prompt + user_prompt: 分别传入系统和用户提示词
3. prompt: 传入单条用户提示词字符串
4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
Args:
trace_id: 追踪ID
task_prompt_info: 任务提示词信息(兼容旧接口)
messages: LangChain Message 对象列表
system_prompt: 系统提示词字符串
user_prompt: 用户提示词字符串
prompt: 单条用户提示词字符串
timeout: 超时时间(秒)
model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型配置
Yields:
str: 生成的文本块
Raises:
ValueError: 参数组合错误
"""
start_time = time.time()
current_timeout = timeout or self.default_timeout
# 如果提供了功能名称,从配置加载模型
if function_name:
try:
from foundation.ai.models.model_config_loader import get_model_for_function
config_model = get_model_for_function(function_name)
if config_model:
model_name = config_model
logger.info(f"[模型流式调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
except Exception as e:
logger.warning(f"[模型流式调用] 加载功能配置失败 [{function_name}]: {e}")
# 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
if not model_name:
try:
from foundation.ai.models.model_config_loader import get_model_for_function
model_name = get_model_for_function("default")
logger.info(f"[模型流式调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
except Exception as e:
logger.warning(f"[模型流式调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
try:
# 选择模型
llm_to_use = self.model_handler.get_model_by_name(
model_name, enable_thinking=(enable_thinking or False)
) if model_name else self.llm
logger.info(f"[模型流式调用] 使用{'指定' if model_name else '默认'}模型:{model_name or 'default'}, trace_id: {trace_id}")
logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
# 构建消息列表
final_messages = self._build_messages(
messages=messages,
system_prompt=system_prompt,
user_prompt=user_prompt,
prompt=prompt,
task_prompt_info=task_prompt_info
)
response = llm_to_use.stream(final_messages)
chunk_count = 0
think_filter = _ThinkingBlockStreamFilter()
for chunk in response:
chunk_count += 1
if hasattr(chunk, 'content') and chunk.content:
cleaned = think_filter.feed(chunk.content)
if cleaned:
yield cleaned
elif chunk:
yield chunk
tail = think_filter.flush()
if tail:
yield tail
elapsed_time = time.time() - start_time
logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s")
except Exception as e:
elapsed_time = time.time() - start_time
logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
raise
generate_model_client = GenerateModelClient(default_timeout=120, max_retries=10, backoff_factor=0.5)