model_generate.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. # !/usr/bin/ python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :model_generate.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/14 14:22
  9. '''
  10. from langchain_core.prompts import ChatPromptTemplate
  11. from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
  12. from foundation.ai.models.model_handler import model_handler
  13. from foundation.observability.logger.loggering import review_logger as logger
  14. import asyncio
  15. import re
  16. import time
  17. from typing import Optional, Callable, Any, List, Union
  18. # ============================================================
  19. # 思考内容过滤(统一收敛在调用层)
  20. #
  21. # Qwen3.5 等模型在 enable_thinking=True 时会先输出 <think>...</think>
  22. # 块再给出最终答案。所有业务方都不需要思考过程,统一在此处去除,
  23. # 避免每个调用点重复实现,也防止漏处理导致思考内容污染输出。
  24. # ============================================================
  25. _THINK_BLOCK_PATTERN = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
  26. _DANGLING_THINK_PATTERN = re.compile(r"<think>[\s\S]*$")
  27. def _strip_thinking_content(content: str) -> str:
  28. """去除完整响应中的 <think>...</think> 块。
  29. - 完整闭合块:整段去除(含尾随空白)
  30. - 仅 <think> 无 </think>(被截断):从 <think> 起全部丢弃,记录警告
  31. - 不含思考标签:原文返回
  32. """
  33. if not content:
  34. return content
  35. cleaned = _THINK_BLOCK_PATTERN.sub("", content)
  36. if "<think>" in cleaned:
  37. cleaned = _DANGLING_THINK_PATTERN.sub("", cleaned)
  38. logger.warning("[模型调用] 响应包含未闭合的 <think> 块,已截断丢弃")
  39. return cleaned.strip()
  40. class _ThinkingBlockStreamFilter:
  41. """流式响应中过滤 <think>...</think> 块的状态机。
  42. 处理 chunk 边界穿过标签的情况(如先收到 "<thi"、下次再到 "nk>正文"),
  43. 保证调用方拿到的流不会泄漏任何思考片段。
  44. 用法:
  45. flt = _ThinkingBlockStreamFilter()
  46. for chunk in stream:
  47. cleaned = flt.feed(chunk)
  48. if cleaned:
  49. yield cleaned
  50. tail = flt.flush()
  51. if tail:
  52. yield tail
  53. """
  54. _OPEN = "<think>"
  55. _CLOSE = "</think>"
  56. def __init__(self):
  57. self._buf = ""
  58. self._inside = False
  59. def feed(self, chunk: str) -> str:
  60. """喂入一个 chunk,返回此刻应输出的内容(可能为空字符串)。"""
  61. if not chunk:
  62. return ""
  63. self._buf += chunk
  64. out = []
  65. while True:
  66. if self._inside:
  67. idx = self._buf.find(self._CLOSE)
  68. if idx == -1:
  69. keep_len = self._partial_match_len(self._buf, self._CLOSE)
  70. self._buf = self._buf[-keep_len:] if keep_len else ""
  71. break
  72. self._buf = self._buf[idx + len(self._CLOSE):].lstrip()
  73. self._inside = False
  74. else:
  75. idx = self._buf.find(self._OPEN)
  76. if idx == -1:
  77. keep_len = self._partial_match_len(self._buf, self._OPEN)
  78. if keep_len:
  79. out.append(self._buf[:-keep_len])
  80. self._buf = self._buf[-keep_len:]
  81. else:
  82. out.append(self._buf)
  83. self._buf = ""
  84. break
  85. if idx > 0:
  86. out.append(self._buf[:idx])
  87. self._buf = self._buf[idx + len(self._OPEN):]
  88. self._inside = True
  89. return "".join(out)
  90. def flush(self) -> str:
  91. """流结束时调用,返回缓冲区剩余可输出内容。"""
  92. if self._inside:
  93. logger.warning("[模型流式调用] 流结束时仍在 <think> 块内,已丢弃尾部")
  94. self._buf = ""
  95. return ""
  96. result = self._buf
  97. self._buf = ""
  98. return result
  99. @staticmethod
  100. def _partial_match_len(buf: str, tag: str) -> int:
  101. """返回 buf 末尾匹配 tag 前缀的最大长度(避免标签被切断后误输出)。"""
  102. max_n = min(len(tag) - 1, len(buf))
  103. for n in range(max_n, 0, -1):
  104. if buf[-n:] == tag[:n]:
  105. return n
  106. return 0
  107. def _sync_retry_with_backoff(
  108. func: Callable,
  109. *args,
  110. max_retries: int = 3,
  111. backoff_factor: float = 1.0,
  112. trace_id: Optional[str] = None,
  113. model_name: Optional[str] = None,
  114. **kwargs
  115. ) -> Any:
  116. """
  117. 同步版本的带指数退避重试机制
  118. 注意:对于 502/503/504 等服务不可用错误,立即失败不重试
  119. """
  120. model_info = model_name or "default"
  121. def _is_server_unavailable_error(error: Exception) -> bool:
  122. """判断是否为服务端不可用错误(应立即失败)"""
  123. error_str = str(error).lower()
  124. unavailable_codes = ['502', '503', '504', 'internal server error']
  125. return any(code in error_str for code in unavailable_codes)
  126. for attempt in range(max_retries + 1):
  127. try:
  128. return func(*args, **kwargs)
  129. except Exception as e:
  130. error_str = str(e)
  131. # 服务端不可用错误(502/503/504)立即失败,不重试
  132. if _is_server_unavailable_error(e):
  133. logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  134. raise
  135. if attempt == max_retries:
  136. logger.error(f"[模型调用] 达到最大重试次数 {max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  137. raise
  138. wait_time = backoff_factor * (2 ** attempt)
  139. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
  140. time.sleep(wait_time)
  141. class GenerateModelClient:
  142. """
  143. 主要是生成式模型
  144. """
  145. def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0):
  146. # 获取默认模型
  147. self.llm = model_handler.get_models()
  148. self.chat = self.llm # 当前chat和llm使用相同模型
  149. # 配置参数
  150. self.default_timeout = default_timeout
  151. self.max_retries = max_retries
  152. self.backoff_factor = backoff_factor
  153. # 保存model_handler引用,用于动态获取模型
  154. self.model_handler = model_handler
  155. async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, trace_id: Optional[str] = None, model_name: Optional[str] = None, **kwargs):
  156. """
  157. 带指数退避的重试机制,每次重试都有独立的超时控制
  158. 注意:对于 502/503/504 等服务不可用错误,立即失败不重试,
  159. 避免在服务端过载时继续加重负载。
  160. """
  161. current_timeout = timeout or self.default_timeout
  162. model_info = model_name or "default"
  163. def _is_server_unavailable_error(error: Exception) -> bool:
  164. """判断是否为服务端不可用错误(应立即失败)"""
  165. error_str = str(error).lower()
  166. # 502: Bad Gateway, 503: Service Unavailable, 504: Gateway Timeout
  167. unavailable_codes = ['502', '503', '504', 'internal server error']
  168. return any(code in error_str for code in unavailable_codes)
  169. for attempt in range(self.max_retries + 1):
  170. try:
  171. # 每次重试都有独立的超时时间
  172. return await asyncio.wait_for(
  173. func(*args, **kwargs),
  174. timeout=current_timeout
  175. )
  176. except asyncio.TimeoutError as e:
  177. if attempt == self.max_retries:
  178. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终超时 | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
  179. raise TimeoutError(f"模型调用在 {self.max_retries} 次重试后均超时")
  180. wait_time = self.backoff_factor * (2 ** attempt)
  181. logger.warning(f"[模型调用] 第 {attempt + 1} 次超时, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
  182. await asyncio.sleep(wait_time)
  183. except Exception as e:
  184. error_str = str(e)
  185. # 服务端不可用错误(502/503/504)立即失败,不重试
  186. if _is_server_unavailable_error(e):
  187. logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  188. raise
  189. if attempt == self.max_retries:
  190. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  191. raise
  192. wait_time = self.backoff_factor * (2 ** attempt)
  193. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
  194. await asyncio.sleep(wait_time)
  195. async def get_model_generate_invoke(
  196. self,
  197. trace_id: str,
  198. task_prompt_info: Optional[dict] = None,
  199. messages: Optional[List[BaseMessage]] = None,
  200. system_prompt: Optional[str] = None,
  201. user_prompt: Optional[str] = None,
  202. prompt: Optional[str] = None,
  203. timeout: Optional[int] = None,
  204. model_name: Optional[str] = None,
  205. enable_thinking: Optional[bool] = False,
  206. function_name: Optional[str] = None
  207. ) -> str:
  208. """模型非流式生成(异步)
  209. 支持多种调用方式(优先级从高到低):
  210. 1. messages: 直接传入 LangChain Message 对象列表
  211. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  212. 3. prompt: 传入单条用户提示词字符串
  213. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  214. Args:
  215. trace_id: 追踪ID
  216. task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
  217. messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
  218. system_prompt: 系统提示词字符串
  219. user_prompt: 用户提示词字符串
  220. prompt: 单条用户提示词字符串(无系统提示时使用)
  221. timeout: 超时时间(秒),默认使用构造时的 default_timeout
  222. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  223. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  224. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
  225. Returns:
  226. str: 模型生成的文本内容
  227. Raises:
  228. ValueError: 参数组合错误
  229. TimeoutError: 调用超时
  230. Exception: 模型调用异常
  231. Examples:
  232. # 方式1: 使用 Message 列表(推荐)
  233. messages = [SystemMessage(content="你是专家"), HumanMessage(content="请分析...")]
  234. result = await client.get_model_generate_invoke("trace-001", messages=messages)
  235. # 方式2: 分别传入系统和用户提示词
  236. result = await client.get_model_generate_invoke(
  237. "trace-001",
  238. system_prompt="你是专家",
  239. user_prompt="请分析..."
  240. )
  241. # 方式3: 传入单条提示词
  242. result = await client.get_model_generate_invoke("trace-001", prompt="请分析...")
  243. # 方式4: 兼容旧接口(使用 PromptLoader)
  244. task_prompt_info = {"task_prompt": chat_template}
  245. result = await client.get_model_generate_invoke("trace-001", task_prompt_info=task_prompt_info)
  246. # 方式5: 使用功能名称从配置加载模型
  247. result = await client.get_model_generate_invoke("trace-001", function_name="doc_classification_tertiary", system_prompt="...", user_prompt="...")
  248. """
  249. start_time = time.time()
  250. current_timeout = timeout or self.default_timeout
  251. # 如果提供了功能名称,从配置加载模型和 thinking 模式
  252. if function_name:
  253. try:
  254. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  255. config_model = get_model_for_function(function_name)
  256. config_thinking = get_thinking_mode_for_function(function_name)
  257. if config_model:
  258. model_name = config_model
  259. logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  260. if config_thinking is not None and enable_thinking is False:
  261. # 只有默认 False 时才覆盖,显式传入的参数优先
  262. enable_thinking = config_thinking
  263. logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  264. except Exception as e:
  265. logger.warning(f"[模型调用] 加载功能配置失败 [{function_name}]: {e}")
  266. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  267. if not model_name:
  268. try:
  269. from foundation.ai.models.model_config_loader import get_model_for_function
  270. model_name = get_model_for_function("default")
  271. logger.info(f"[模型调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  272. except Exception as e:
  273. logger.warning(f"[模型调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  274. try:
  275. # 选择模型
  276. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  277. logger.info(f"[模型调用] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  278. # 构建消息列表(按优先级)
  279. final_messages = self._build_messages(
  280. messages=messages,
  281. system_prompt=system_prompt,
  282. user_prompt=user_prompt,
  283. prompt=prompt,
  284. task_prompt_info=task_prompt_info
  285. )
  286. # 针对 Qwen3.5 模型处理思考模式
  287. model_to_invoke = llm_to_use
  288. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  289. if is_qwen35:
  290. if enable_thinking is False:
  291. # 显式禁用思考模式
  292. model_to_invoke = llm_to_use.bind(
  293. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  294. )
  295. logger.debug(f"[模型调用] 已禁用 Qwen3.5 思考模式: {model_name}")
  296. elif enable_thinking is True:
  297. # 显式启用思考模式
  298. model_to_invoke = llm_to_use.bind(
  299. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  300. )
  301. logger.debug(f"[模型调用] 已启用 Qwen3.5 思考模式: {model_name}")
  302. else:
  303. # enable_thinking is None,使用模型默认行为(通常是启用)
  304. logger.debug(f"[模型调用] 使用 Qwen3.5 默认思考模式: {model_name}")
  305. # 定义模型调用函数,使用原生 ainvoke
  306. async def _invoke():
  307. return await model_to_invoke.ainvoke(final_messages)
  308. # 调用带重试机制
  309. response = await self._retry_with_backoff(_invoke, timeout=current_timeout, trace_id=trace_id, model_name=model_name or "default")
  310. elapsed_time = time.time() - start_time
  311. logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  312. return _strip_thinking_content(response.content)
  313. except asyncio.TimeoutError:
  314. elapsed_time = time.time() - start_time
  315. logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
  316. raise TimeoutError(f"模型调用超时,trace_id: {trace_id}")
  317. except Exception as e:
  318. elapsed_time = time.time() - start_time
  319. logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  320. raise
  321. def _build_messages(
  322. self,
  323. messages: Optional[List[BaseMessage]] = None,
  324. system_prompt: Optional[str] = None,
  325. user_prompt: Optional[str] = None,
  326. prompt: Optional[str] = None,
  327. task_prompt_info: Optional[dict] = None
  328. ) -> List[BaseMessage]:
  329. """构建消息列表(内部方法)
  330. 优先级:messages > system_prompt+user_prompt > prompt > task_prompt_info
  331. """
  332. # 方式1: 直接使用传入的 Message 列表
  333. if messages is not None:
  334. if not isinstance(messages, list):
  335. raise ValueError("messages 必须是列表")
  336. if len(messages) == 0:
  337. raise ValueError("messages 不能为空列表")
  338. logger.debug(f"使用传入的 messages 列表,共 {len(messages)} 条消息")
  339. return messages
  340. # 方式2: system_prompt + user_prompt
  341. if system_prompt is not None and user_prompt is not None:
  342. logger.debug("使用 system_prompt + user_prompt 构建消息")
  343. return [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
  344. # 方式3: 单独 system_prompt(可能是特殊情况)
  345. if system_prompt is not None:
  346. logger.debug("使用单独的 system_prompt 构建消息")
  347. return [SystemMessage(content=system_prompt)]
  348. # 方式4: 单条 prompt 字符串
  349. if prompt is not None:
  350. logger.debug("使用单条 prompt 字符串构建消息")
  351. return [HumanMessage(content=prompt)]
  352. # 方式5: 兼容旧接口 task_prompt_info
  353. if task_prompt_info is not None:
  354. if "task_prompt" not in task_prompt_info:
  355. raise ValueError("task_prompt_info 必须包含 'task_prompt' 键")
  356. task_prompt = task_prompt_info["task_prompt"]
  357. if hasattr(task_prompt, 'format_messages'):
  358. logger.debug("使用 task_prompt_info 中的 ChatPromptTemplate 构建消息")
  359. return task_prompt.format_messages()
  360. elif isinstance(task_prompt, str):
  361. logger.debug("使用 task_prompt_info 中的字符串构建消息")
  362. return [HumanMessage(content=task_prompt)]
  363. else:
  364. raise ValueError(f"task_prompt 类型不支持: {type(task_prompt)}")
  365. # 没有提供任何有效参数
  366. raise ValueError(
  367. "必须提供以下参数之一: "
  368. "messages, system_prompt+user_prompt, prompt, 或 task_prompt_info"
  369. )
  370. def get_model_generate_invoke_sync(
  371. self,
  372. trace_id: str,
  373. task_prompt_info: Optional[dict] = None,
  374. messages: Optional[List[BaseMessage]] = None,
  375. system_prompt: Optional[str] = None,
  376. user_prompt: Optional[str] = None,
  377. prompt: Optional[str] = None,
  378. timeout: Optional[int] = None,
  379. model_name: Optional[str] = None,
  380. enable_thinking: Optional[bool] = False,
  381. function_name: Optional[str] = None
  382. ) -> str:
  383. """模型非流式生成(同步版本)
  384. 适用于同步上下文调用,功能与异步版本完全一致。
  385. 支持多种调用方式(优先级从高到低):
  386. 1. messages: 直接传入 LangChain Message 对象列表
  387. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  388. 3. prompt: 传入单条用户提示词字符串
  389. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  390. Args:
  391. trace_id: 追踪ID
  392. task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
  393. messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
  394. system_prompt: 系统提示词字符串
  395. user_prompt: 用户提示词字符串
  396. prompt: 单条用户提示词字符串(无系统提示时使用)
  397. timeout: 超时时间(秒),默认使用构造时的 default_timeout(同步版本忽略)
  398. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  399. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  400. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
  401. Returns:
  402. str: 模型生成的文本内容
  403. Raises:
  404. ValueError: 参数组合错误
  405. Exception: 模型调用异常
  406. Examples:
  407. # 同步调用(用于同步上下文)
  408. result = generate_model_client.get_model_generate_invoke_sync(
  409. "trace-001",
  410. system_prompt="你是专家",
  411. user_prompt="请分析...",
  412. function_name="doc_classification_secondary"
  413. )
  414. """
  415. start_time = time.time()
  416. # 如果提供了功能名称,从配置加载模型和 thinking 模式
  417. if function_name:
  418. try:
  419. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  420. config_model = get_model_for_function(function_name)
  421. config_thinking = get_thinking_mode_for_function(function_name)
  422. if config_model:
  423. model_name = config_model
  424. logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  425. if config_thinking is not None and enable_thinking is False:
  426. enable_thinking = config_thinking
  427. logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  428. except Exception as e:
  429. logger.warning(f"[模型调用-同步] 加载功能配置失败 [{function_name}]: {e}")
  430. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  431. if not model_name:
  432. try:
  433. from foundation.ai.models.model_config_loader import get_model_for_function
  434. model_name = get_model_for_function("default")
  435. logger.info(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  436. except Exception as e:
  437. logger.warning(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  438. try:
  439. # 选择模型
  440. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  441. logger.info(f"[模型调用-同步] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  442. # 构建消息列表(按优先级)
  443. final_messages = self._build_messages(
  444. messages=messages,
  445. system_prompt=system_prompt,
  446. user_prompt=user_prompt,
  447. prompt=prompt,
  448. task_prompt_info=task_prompt_info
  449. )
  450. # 针对 Qwen3.5 模型处理思考模式
  451. model_to_invoke = llm_to_use
  452. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  453. if is_qwen35:
  454. if enable_thinking is False:
  455. model_to_invoke = llm_to_use.bind(
  456. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  457. )
  458. logger.debug(f"[模型调用-同步] 已禁用 Qwen3.5 思考模式: {model_name}")
  459. elif enable_thinking is True:
  460. model_to_invoke = llm_to_use.bind(
  461. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  462. )
  463. logger.debug(f"[模型调用-同步] 已启用 Qwen3.5 思考模式: {model_name}")
  464. else:
  465. logger.debug(f"[模型调用-同步] 使用 Qwen3.5 默认思考模式: {model_name}")
  466. # 定义模型调用函数,使用同步 invoke
  467. def _invoke():
  468. return model_to_invoke.invoke(final_messages)
  469. # 调用带重试机制(同步版本)
  470. response = _sync_retry_with_backoff(
  471. _invoke,
  472. max_retries=self.max_retries,
  473. backoff_factor=self.backoff_factor,
  474. trace_id=trace_id,
  475. model_name=model_name or "default"
  476. )
  477. elapsed_time = time.time() - start_time
  478. logger.info(f"[模型调用-同步] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  479. return _strip_thinking_content(response.content)
  480. except Exception as e:
  481. elapsed_time = time.time() - start_time
  482. logger.error(f"[模型调用-同步] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  483. raise
  484. def get_model_generate_stream(
  485. self,
  486. trace_id: str,
  487. task_prompt_info: Optional[dict] = None,
  488. messages: Optional[List[BaseMessage]] = None,
  489. system_prompt: Optional[str] = None,
  490. user_prompt: Optional[str] = None,
  491. prompt: Optional[str] = None,
  492. timeout: Optional[int] = None,
  493. model_name: Optional[str] = None,
  494. function_name: Optional[str] = None
  495. ):
  496. """模型流式生成(同步生成器)
  497. 支持多种调用方式(优先级从高到低):
  498. 1. messages: 直接传入 LangChain Message 对象列表
  499. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  500. 3. prompt: 传入单条用户提示词字符串
  501. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  502. Args:
  503. trace_id: 追踪ID
  504. task_prompt_info: 任务提示词信息(兼容旧接口)
  505. messages: LangChain Message 对象列表
  506. system_prompt: 系统提示词字符串
  507. user_prompt: 用户提示词字符串
  508. prompt: 单条用户提示词字符串
  509. timeout: 超时时间(秒)
  510. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  511. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型配置
  512. Yields:
  513. str: 生成的文本块
  514. Raises:
  515. ValueError: 参数组合错误
  516. """
  517. start_time = time.time()
  518. current_timeout = timeout or self.default_timeout
  519. # 如果提供了功能名称,从配置加载模型
  520. if function_name:
  521. try:
  522. from foundation.ai.models.model_config_loader import get_model_for_function
  523. config_model = get_model_for_function(function_name)
  524. if config_model:
  525. model_name = config_model
  526. logger.info(f"[模型流式调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  527. except Exception as e:
  528. logger.warning(f"[模型流式调用] 加载功能配置失败 [{function_name}]: {e}")
  529. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  530. if not model_name:
  531. try:
  532. from foundation.ai.models.model_config_loader import get_model_for_function
  533. model_name = get_model_for_function("default")
  534. logger.info(f"[模型流式调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  535. except Exception as e:
  536. logger.warning(f"[模型流式调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  537. try:
  538. # 选择模型
  539. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  540. logger.info(f"[模型流式调用] 使用{'指定' if model_name else '默认'}模型:{model_name or 'default'}, trace_id: {trace_id}")
  541. logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
  542. # 构建消息列表
  543. final_messages = self._build_messages(
  544. messages=messages,
  545. system_prompt=system_prompt,
  546. user_prompt=user_prompt,
  547. prompt=prompt,
  548. task_prompt_info=task_prompt_info
  549. )
  550. response = llm_to_use.stream(final_messages)
  551. chunk_count = 0
  552. think_filter = _ThinkingBlockStreamFilter()
  553. for chunk in response:
  554. chunk_count += 1
  555. if hasattr(chunk, 'content') and chunk.content:
  556. cleaned = think_filter.feed(chunk.content)
  557. if cleaned:
  558. yield cleaned
  559. elif chunk:
  560. yield chunk
  561. tail = think_filter.flush()
  562. if tail:
  563. yield tail
  564. elapsed_time = time.time() - start_time
  565. logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s")
  566. except Exception as e:
  567. elapsed_time = time.time() - start_time
  568. logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  569. raise
  570. generate_model_client = GenerateModelClient(default_timeout=60, max_retries=10, backoff_factor=0.5)