model_generate.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. # !/usr/bin/ python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :model_generate.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/14 14:22
  9. '''
  10. from langchain_core.prompts import ChatPromptTemplate
  11. from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
  12. from foundation.ai.models.model_handler import model_handler
  13. from foundation.observability.logger.loggering import review_logger as logger
  14. import asyncio
  15. import time
  16. from typing import Optional, Callable, Any, List, Union
  17. def _sync_retry_with_backoff(
  18. func: Callable,
  19. *args,
  20. max_retries: int = 3,
  21. backoff_factor: float = 1.0,
  22. trace_id: Optional[str] = None,
  23. model_name: Optional[str] = None,
  24. **kwargs
  25. ) -> Any:
  26. """
  27. 同步版本的带指数退避重试机制
  28. 注意:对于 502/503/504 等服务不可用错误,立即失败不重试
  29. """
  30. model_info = model_name or "default"
  31. def _is_server_unavailable_error(error: Exception) -> bool:
  32. """判断是否为服务端不可用错误(应立即失败)"""
  33. error_str = str(error).lower()
  34. unavailable_codes = ['502', '503', '504', 'internal server error']
  35. return any(code in error_str for code in unavailable_codes)
  36. for attempt in range(max_retries + 1):
  37. try:
  38. return func(*args, **kwargs)
  39. except Exception as e:
  40. error_str = str(e)
  41. # 服务端不可用错误(502/503/504)立即失败,不重试
  42. if _is_server_unavailable_error(e):
  43. logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  44. raise
  45. if attempt == max_retries:
  46. logger.error(f"[模型调用] 达到最大重试次数 {max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  47. raise
  48. wait_time = backoff_factor * (2 ** attempt)
  49. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
  50. time.sleep(wait_time)
  51. class GenerateModelClient:
  52. """
  53. 主要是生成式模型
  54. """
  55. def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0):
  56. # 获取默认模型
  57. self.llm = model_handler.get_models()
  58. self.chat = self.llm # 当前chat和llm使用相同模型
  59. # 配置参数
  60. self.default_timeout = default_timeout
  61. self.max_retries = max_retries
  62. self.backoff_factor = backoff_factor
  63. # 保存model_handler引用,用于动态获取模型
  64. self.model_handler = model_handler
  65. async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, trace_id: Optional[str] = None, model_name: Optional[str] = None, **kwargs):
  66. """
  67. 带指数退避的重试机制,每次重试都有独立的超时控制
  68. 注意:对于 502/503/504 等服务不可用错误,立即失败不重试,
  69. 避免在服务端过载时继续加重负载。
  70. """
  71. current_timeout = timeout or self.default_timeout
  72. model_info = model_name or "default"
  73. def _is_server_unavailable_error(error: Exception) -> bool:
  74. """判断是否为服务端不可用错误(应立即失败)"""
  75. error_str = str(error).lower()
  76. # 502: Bad Gateway, 503: Service Unavailable, 504: Gateway Timeout
  77. unavailable_codes = ['502', '503', '504', 'internal server error']
  78. return any(code in error_str for code in unavailable_codes)
  79. for attempt in range(self.max_retries + 1):
  80. try:
  81. # 每次重试都有独立的超时时间
  82. return await asyncio.wait_for(
  83. func(*args, **kwargs),
  84. timeout=current_timeout
  85. )
  86. except asyncio.TimeoutError as e:
  87. if attempt == self.max_retries:
  88. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终超时 | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
  89. raise TimeoutError(f"模型调用在 {self.max_retries} 次重试后均超时")
  90. wait_time = self.backoff_factor * (2 ** attempt)
  91. logger.warning(f"[模型调用] 第 {attempt + 1} 次超时, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
  92. await asyncio.sleep(wait_time)
  93. except Exception as e:
  94. error_str = str(e)
  95. # 服务端不可用错误(502/503/504)立即失败,不重试
  96. if _is_server_unavailable_error(e):
  97. logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  98. raise
  99. if attempt == self.max_retries:
  100. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  101. raise
  102. wait_time = self.backoff_factor * (2 ** attempt)
  103. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
  104. await asyncio.sleep(wait_time)
  105. async def get_model_generate_invoke(
  106. self,
  107. trace_id: str,
  108. task_prompt_info: Optional[dict] = None,
  109. messages: Optional[List[BaseMessage]] = None,
  110. system_prompt: Optional[str] = None,
  111. user_prompt: Optional[str] = None,
  112. prompt: Optional[str] = None,
  113. timeout: Optional[int] = None,
  114. model_name: Optional[str] = None,
  115. enable_thinking: Optional[bool] = False,
  116. function_name: Optional[str] = None
  117. ) -> str:
  118. """模型非流式生成(异步)
  119. 支持多种调用方式(优先级从高到低):
  120. 1. messages: 直接传入 LangChain Message 对象列表
  121. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  122. 3. prompt: 传入单条用户提示词字符串
  123. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  124. Args:
  125. trace_id: 追踪ID
  126. task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
  127. messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
  128. system_prompt: 系统提示词字符串
  129. user_prompt: 用户提示词字符串
  130. prompt: 单条用户提示词字符串(无系统提示时使用)
  131. timeout: 超时时间(秒),默认使用构造时的 default_timeout
  132. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  133. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  134. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
  135. Returns:
  136. str: 模型生成的文本内容
  137. Raises:
  138. ValueError: 参数组合错误
  139. TimeoutError: 调用超时
  140. Exception: 模型调用异常
  141. Examples:
  142. # 方式1: 使用 Message 列表(推荐)
  143. messages = [SystemMessage(content="你是专家"), HumanMessage(content="请分析...")]
  144. result = await client.get_model_generate_invoke("trace-001", messages=messages)
  145. # 方式2: 分别传入系统和用户提示词
  146. result = await client.get_model_generate_invoke(
  147. "trace-001",
  148. system_prompt="你是专家",
  149. user_prompt="请分析..."
  150. )
  151. # 方式3: 传入单条提示词
  152. result = await client.get_model_generate_invoke("trace-001", prompt="请分析...")
  153. # 方式4: 兼容旧接口(使用 PromptLoader)
  154. task_prompt_info = {"task_prompt": chat_template}
  155. result = await client.get_model_generate_invoke("trace-001", task_prompt_info=task_prompt_info)
  156. # 方式5: 使用功能名称从配置加载模型
  157. result = await client.get_model_generate_invoke("trace-001", function_name="doc_classification_tertiary", system_prompt="...", user_prompt="...")
  158. """
  159. start_time = time.time()
  160. current_timeout = timeout or self.default_timeout
  161. # 如果提供了功能名称,从配置加载模型和 thinking 模式
  162. if function_name:
  163. try:
  164. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  165. config_model = get_model_for_function(function_name)
  166. config_thinking = get_thinking_mode_for_function(function_name)
  167. if config_model:
  168. model_name = config_model
  169. logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  170. if config_thinking is not None and enable_thinking is False:
  171. # 只有默认 False 时才覆盖,显式传入的参数优先
  172. enable_thinking = config_thinking
  173. logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  174. except Exception as e:
  175. logger.warning(f"[模型调用] 加载功能配置失败 [{function_name}]: {e}")
  176. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  177. if not model_name:
  178. try:
  179. from foundation.ai.models.model_config_loader import get_model_for_function
  180. model_name = get_model_for_function("default")
  181. logger.info(f"[模型调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  182. except Exception as e:
  183. logger.warning(f"[模型调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  184. try:
  185. # 选择模型
  186. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  187. logger.info(f"[模型调用] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  188. # 构建消息列表(按优先级)
  189. final_messages = self._build_messages(
  190. messages=messages,
  191. system_prompt=system_prompt,
  192. user_prompt=user_prompt,
  193. prompt=prompt,
  194. task_prompt_info=task_prompt_info
  195. )
  196. # 针对 Qwen3.5 模型处理思考模式
  197. model_to_invoke = llm_to_use
  198. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  199. if is_qwen35:
  200. if enable_thinking is False:
  201. # 显式禁用思考模式
  202. model_to_invoke = llm_to_use.bind(
  203. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  204. )
  205. logger.debug(f"[模型调用] 已禁用 Qwen3.5 思考模式: {model_name}")
  206. elif enable_thinking is True:
  207. # 显式启用思考模式
  208. model_to_invoke = llm_to_use.bind(
  209. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  210. )
  211. logger.debug(f"[模型调用] 已启用 Qwen3.5 思考模式: {model_name}")
  212. else:
  213. # enable_thinking is None,使用模型默认行为(通常是启用)
  214. logger.debug(f"[模型调用] 使用 Qwen3.5 默认思考模式: {model_name}")
  215. # 定义模型调用函数,使用原生 ainvoke
  216. async def _invoke():
  217. return await model_to_invoke.ainvoke(final_messages)
  218. # 调用带重试机制
  219. response = await self._retry_with_backoff(_invoke, timeout=current_timeout, trace_id=trace_id, model_name=model_name or "default")
  220. elapsed_time = time.time() - start_time
  221. logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  222. return response.content
  223. except asyncio.TimeoutError:
  224. elapsed_time = time.time() - start_time
  225. logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
  226. raise TimeoutError(f"模型调用超时,trace_id: {trace_id}")
  227. except Exception as e:
  228. elapsed_time = time.time() - start_time
  229. logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  230. raise
  231. def _build_messages(
  232. self,
  233. messages: Optional[List[BaseMessage]] = None,
  234. system_prompt: Optional[str] = None,
  235. user_prompt: Optional[str] = None,
  236. prompt: Optional[str] = None,
  237. task_prompt_info: Optional[dict] = None
  238. ) -> List[BaseMessage]:
  239. """构建消息列表(内部方法)
  240. 优先级:messages > system_prompt+user_prompt > prompt > task_prompt_info
  241. """
  242. # 方式1: 直接使用传入的 Message 列表
  243. if messages is not None:
  244. if not isinstance(messages, list):
  245. raise ValueError("messages 必须是列表")
  246. if len(messages) == 0:
  247. raise ValueError("messages 不能为空列表")
  248. logger.debug(f"使用传入的 messages 列表,共 {len(messages)} 条消息")
  249. return messages
  250. # 方式2: system_prompt + user_prompt
  251. if system_prompt is not None and user_prompt is not None:
  252. logger.debug("使用 system_prompt + user_prompt 构建消息")
  253. return [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
  254. # 方式3: 单独 system_prompt(可能是特殊情况)
  255. if system_prompt is not None:
  256. logger.debug("使用单独的 system_prompt 构建消息")
  257. return [SystemMessage(content=system_prompt)]
  258. # 方式4: 单条 prompt 字符串
  259. if prompt is not None:
  260. logger.debug("使用单条 prompt 字符串构建消息")
  261. return [HumanMessage(content=prompt)]
  262. # 方式5: 兼容旧接口 task_prompt_info
  263. if task_prompt_info is not None:
  264. if "task_prompt" not in task_prompt_info:
  265. raise ValueError("task_prompt_info 必须包含 'task_prompt' 键")
  266. task_prompt = task_prompt_info["task_prompt"]
  267. if hasattr(task_prompt, 'format_messages'):
  268. logger.debug("使用 task_prompt_info 中的 ChatPromptTemplate 构建消息")
  269. return task_prompt.format_messages()
  270. elif isinstance(task_prompt, str):
  271. logger.debug("使用 task_prompt_info 中的字符串构建消息")
  272. return [HumanMessage(content=task_prompt)]
  273. else:
  274. raise ValueError(f"task_prompt 类型不支持: {type(task_prompt)}")
  275. # 没有提供任何有效参数
  276. raise ValueError(
  277. "必须提供以下参数之一: "
  278. "messages, system_prompt+user_prompt, prompt, 或 task_prompt_info"
  279. )
  280. def get_model_generate_invoke_sync(
  281. self,
  282. trace_id: str,
  283. task_prompt_info: Optional[dict] = None,
  284. messages: Optional[List[BaseMessage]] = None,
  285. system_prompt: Optional[str] = None,
  286. user_prompt: Optional[str] = None,
  287. prompt: Optional[str] = None,
  288. timeout: Optional[int] = None,
  289. model_name: Optional[str] = None,
  290. enable_thinking: Optional[bool] = False,
  291. function_name: Optional[str] = None
  292. ) -> str:
  293. """模型非流式生成(同步版本)
  294. 适用于同步上下文调用,功能与异步版本完全一致。
  295. 支持多种调用方式(优先级从高到低):
  296. 1. messages: 直接传入 LangChain Message 对象列表
  297. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  298. 3. prompt: 传入单条用户提示词字符串
  299. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  300. Args:
  301. trace_id: 追踪ID
  302. task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
  303. messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
  304. system_prompt: 系统提示词字符串
  305. user_prompt: 用户提示词字符串
  306. prompt: 单条用户提示词字符串(无系统提示时使用)
  307. timeout: 超时时间(秒),默认使用构造时的 default_timeout(同步版本忽略)
  308. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  309. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  310. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
  311. Returns:
  312. str: 模型生成的文本内容
  313. Raises:
  314. ValueError: 参数组合错误
  315. Exception: 模型调用异常
  316. Examples:
  317. # 同步调用(用于同步上下文)
  318. result = generate_model_client.get_model_generate_invoke_sync(
  319. "trace-001",
  320. system_prompt="你是专家",
  321. user_prompt="请分析...",
  322. function_name="doc_classification_secondary"
  323. )
  324. """
  325. start_time = time.time()
  326. # 如果提供了功能名称,从配置加载模型和 thinking 模式
  327. if function_name:
  328. try:
  329. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  330. config_model = get_model_for_function(function_name)
  331. config_thinking = get_thinking_mode_for_function(function_name)
  332. if config_model:
  333. model_name = config_model
  334. logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  335. if config_thinking is not None and enable_thinking is False:
  336. enable_thinking = config_thinking
  337. logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  338. except Exception as e:
  339. logger.warning(f"[模型调用-同步] 加载功能配置失败 [{function_name}]: {e}")
  340. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  341. if not model_name:
  342. try:
  343. from foundation.ai.models.model_config_loader import get_model_for_function
  344. model_name = get_model_for_function("default")
  345. logger.info(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  346. except Exception as e:
  347. logger.warning(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  348. try:
  349. # 选择模型
  350. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  351. logger.info(f"[模型调用-同步] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  352. # 构建消息列表(按优先级)
  353. final_messages = self._build_messages(
  354. messages=messages,
  355. system_prompt=system_prompt,
  356. user_prompt=user_prompt,
  357. prompt=prompt,
  358. task_prompt_info=task_prompt_info
  359. )
  360. # 针对 Qwen3.5 模型处理思考模式
  361. model_to_invoke = llm_to_use
  362. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  363. if is_qwen35:
  364. if enable_thinking is False:
  365. model_to_invoke = llm_to_use.bind(
  366. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  367. )
  368. logger.debug(f"[模型调用-同步] 已禁用 Qwen3.5 思考模式: {model_name}")
  369. elif enable_thinking is True:
  370. model_to_invoke = llm_to_use.bind(
  371. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  372. )
  373. logger.debug(f"[模型调用-同步] 已启用 Qwen3.5 思考模式: {model_name}")
  374. else:
  375. logger.debug(f"[模型调用-同步] 使用 Qwen3.5 默认思考模式: {model_name}")
  376. # 定义模型调用函数,使用同步 invoke
  377. def _invoke():
  378. return model_to_invoke.invoke(final_messages)
  379. # 调用带重试机制(同步版本)
  380. response = _sync_retry_with_backoff(
  381. _invoke,
  382. max_retries=self.max_retries,
  383. backoff_factor=self.backoff_factor,
  384. trace_id=trace_id,
  385. model_name=model_name or "default"
  386. )
  387. elapsed_time = time.time() - start_time
  388. logger.info(f"[模型调用-同步] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  389. return response.content
  390. except Exception as e:
  391. elapsed_time = time.time() - start_time
  392. logger.error(f"[模型调用-同步] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  393. raise
  394. def get_model_generate_stream(
  395. self,
  396. trace_id: str,
  397. task_prompt_info: Optional[dict] = None,
  398. messages: Optional[List[BaseMessage]] = None,
  399. system_prompt: Optional[str] = None,
  400. user_prompt: Optional[str] = None,
  401. prompt: Optional[str] = None,
  402. timeout: Optional[int] = None,
  403. model_name: Optional[str] = None,
  404. function_name: Optional[str] = None
  405. ):
  406. """模型流式生成(同步生成器)
  407. 支持多种调用方式(优先级从高到低):
  408. 1. messages: 直接传入 LangChain Message 对象列表
  409. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  410. 3. prompt: 传入单条用户提示词字符串
  411. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  412. Args:
  413. trace_id: 追踪ID
  414. task_prompt_info: 任务提示词信息(兼容旧接口)
  415. messages: LangChain Message 对象列表
  416. system_prompt: 系统提示词字符串
  417. user_prompt: 用户提示词字符串
  418. prompt: 单条用户提示词字符串
  419. timeout: 超时时间(秒)
  420. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  421. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型配置
  422. Yields:
  423. str: 生成的文本块
  424. Raises:
  425. ValueError: 参数组合错误
  426. """
  427. start_time = time.time()
  428. current_timeout = timeout or self.default_timeout
  429. # 如果提供了功能名称,从配置加载模型
  430. if function_name:
  431. try:
  432. from foundation.ai.models.model_config_loader import get_model_for_function
  433. config_model = get_model_for_function(function_name)
  434. if config_model:
  435. model_name = config_model
  436. logger.info(f"[模型流式调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  437. except Exception as e:
  438. logger.warning(f"[模型流式调用] 加载功能配置失败 [{function_name}]: {e}")
  439. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  440. if not model_name:
  441. try:
  442. from foundation.ai.models.model_config_loader import get_model_for_function
  443. model_name = get_model_for_function("default")
  444. logger.info(f"[模型流式调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  445. except Exception as e:
  446. logger.warning(f"[模型流式调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  447. try:
  448. # 选择模型
  449. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  450. logger.info(f"[模型流式调用] 使用{'指定' if model_name else '默认'}模型:{model_name or 'default'}, trace_id: {trace_id}")
  451. logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
  452. # 构建消息列表
  453. final_messages = self._build_messages(
  454. messages=messages,
  455. system_prompt=system_prompt,
  456. user_prompt=user_prompt,
  457. prompt=prompt,
  458. task_prompt_info=task_prompt_info
  459. )
  460. response = llm_to_use.stream(final_messages)
  461. chunk_count = 0
  462. for chunk in response:
  463. chunk_count += 1
  464. if hasattr(chunk, 'content') and chunk.content:
  465. yield chunk.content
  466. elif chunk:
  467. yield chunk
  468. elapsed_time = time.time() - start_time
  469. logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s")
  470. except Exception as e:
  471. elapsed_time = time.time() - start_time
  472. logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  473. raise
  474. generate_model_client = GenerateModelClient(default_timeout=60, max_retries=10, backoff_factor=0.5)