model_generate.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. # !/usr/bin/ python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :model_generate.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/14 14:22
  9. '''
  10. from langchain_core.prompts import ChatPromptTemplate
  11. from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
  12. from foundation.ai.models.model_handler import model_handler
  13. from foundation.observability.logger.loggering import review_logger as logger
  14. import asyncio
  15. import re
  16. import time
  17. from typing import Optional, Callable, Any, List, Union
  18. # ============================================================
  19. # 思考内容过滤(统一收敛在调用层)
  20. #
  21. # Qwen3.5 等模型在 enable_thinking=True 时会输出思考过程,
  22. # 标准格式为 <think>...</think>,但部分 SGLang 部署会输出
  23. # "Thinking Process: ... </think>"(缺 <think> 开标签)的畸形格式。
  24. # 统一在此处去除,避免思考内容污染业务输出。
  25. # ============================================================
  26. _THINK_BLOCK_PATTERN = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
  27. _DANGLING_THINK_PATTERN = re.compile(r"<think>[\s\S]*$")
  28. # SGLang 畸形格式:Thinking Process: ... </think>(有闭标签无开标签)
  29. _SGLANG_THINK_PATTERN = re.compile(r"Thinking\s+Process:\s*[\s\S]*?</think>\s*", re.DOTALL)
  30. def _strip_thinking_content(content: str) -> str:
  31. """去除完整响应中的思考内容。
  32. - 标准 <think>...</think> 块:整段去除
  33. - SGLang Thinking Process: ... </think> 畸形格式:整段去除
  34. - 仅 <think> 无 </think>(被截断):从 <think> 起全部丢弃
  35. - 不含思考标签:原文返回
  36. """
  37. if not content:
  38. return content
  39. cleaned = _THINK_BLOCK_PATTERN.sub("", content)
  40. cleaned = _SGLANG_THINK_PATTERN.sub("", cleaned)
  41. if "<think>" in cleaned:
  42. cleaned = _DANGLING_THINK_PATTERN.sub("", cleaned)
  43. logger.warning("[模型调用] 响应包含未闭合的 <think> 块,已截断丢弃")
  44. return cleaned.strip()
  45. class _ThinkingBlockStreamFilter:
  46. """流式响应中过滤思考内容的状态机。
  47. 处理 chunk 边界穿过标签的情况,保证调用方拿到的流不会泄漏
  48. 任何思考片段。支持标准 <think>...</think> 和 SGLang 畸形
  49. "Thinking Process: ... </think>" 两种格式。
  50. """
  51. _OPEN = "<think>"
  52. _CLOSE = "</think>"
  53. _SGLANG_OPEN = "Thinking Process:"
  54. def __init__(self):
  55. self._buf = ""
  56. self._inside = False
  57. self._open_tag = ""
  58. def feed(self, chunk: str) -> str:
  59. """喂入一个 chunk,返回此刻应输出的内容(可能为空字符串)。"""
  60. if not chunk:
  61. return ""
  62. self._buf += chunk
  63. out = []
  64. while True:
  65. if self._inside:
  66. idx = self._buf.find(self._CLOSE)
  67. if idx == -1:
  68. keep_len = self._partial_match_len(self._buf, self._CLOSE)
  69. self._buf = self._buf[-keep_len:] if keep_len else ""
  70. break
  71. self._buf = self._buf[idx + len(self._CLOSE):].lstrip()
  72. self._inside = False
  73. else:
  74. idx_xml = self._buf.find(self._OPEN)
  75. idx_sg = self._buf.find(self._SGLANG_OPEN)
  76. if idx_xml == -1 and idx_sg == -1:
  77. keep_xml = self._partial_match_len(self._buf, self._OPEN)
  78. keep_sg = self._partial_match_len(self._buf, self._SGLANG_OPEN)
  79. keep_len = max(keep_xml, keep_sg)
  80. if keep_len:
  81. out.append(self._buf[:-keep_len])
  82. self._buf = self._buf[-keep_len:]
  83. else:
  84. out.append(self._buf)
  85. self._buf = ""
  86. break
  87. if idx_xml >= 0 and (idx_sg == -1 or idx_xml <= idx_sg):
  88. if idx_xml > 0:
  89. out.append(self._buf[:idx_xml])
  90. self._buf = self._buf[idx_xml + len(self._OPEN):]
  91. self._open_tag = self._OPEN
  92. else:
  93. if idx_sg > 0:
  94. out.append(self._buf[:idx_sg])
  95. self._buf = self._buf[idx_sg + len(self._SGLANG_OPEN):]
  96. self._open_tag = self._SGLANG_OPEN
  97. self._inside = True
  98. return "".join(out)
  99. def flush(self) -> str:
  100. """流结束时调用,返回缓冲区剩余可输出内容。"""
  101. if self._inside:
  102. logger.warning("[模型流式调用] 流结束时仍在 <think> 块内,已丢弃尾部")
  103. self._buf = ""
  104. return ""
  105. result = self._buf
  106. self._buf = ""
  107. return result
  108. @staticmethod
  109. def _partial_match_len(buf: str, tag: str) -> int:
  110. """返回 buf 末尾匹配 tag 前缀的最大长度(避免标签被切断后误输出)。"""
  111. max_n = min(len(tag) - 1, len(buf))
  112. for n in range(max_n, 0, -1):
  113. if buf[-n:] == tag[:n]:
  114. return n
  115. return 0
  116. def _sync_retry_with_backoff(
  117. func: Callable,
  118. *args,
  119. max_retries: int = 3,
  120. backoff_factor: float = 1.0,
  121. trace_id: Optional[str] = None,
  122. model_name: Optional[str] = None,
  123. **kwargs
  124. ) -> Any:
  125. """
  126. 同步版本的带指数退避重试机制
  127. 注意:对于 502/503/504 等服务不可用错误,立即失败不重试
  128. """
  129. model_info = model_name or "default"
  130. def _is_server_unavailable_error(error: Exception) -> bool:
  131. """判断是否为服务端不可用错误(应立即失败)"""
  132. error_str = str(error).lower()
  133. unavailable_codes = ['502', '503', '504', 'internal server error']
  134. return any(code in error_str for code in unavailable_codes)
  135. for attempt in range(max_retries + 1):
  136. try:
  137. return func(*args, **kwargs)
  138. except Exception as e:
  139. error_str = str(e)
  140. # 服务端不可用错误(502/503/504)立即失败,不重试
  141. if _is_server_unavailable_error(e):
  142. logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  143. raise
  144. if attempt == max_retries:
  145. logger.error(f"[模型调用] 达到最大重试次数 {max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  146. raise
  147. wait_time = backoff_factor * (2 ** attempt)
  148. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
  149. time.sleep(wait_time)
  150. class GenerateModelClient:
  151. """
  152. 主要是生成式模型
  153. """
  154. def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0):
  155. # 获取默认模型
  156. self.llm = model_handler.get_models()
  157. self.chat = self.llm # 当前chat和llm使用相同模型
  158. # 配置参数
  159. self.default_timeout = default_timeout
  160. self.max_retries = max_retries
  161. self.backoff_factor = backoff_factor
  162. # 保存model_handler引用,用于动态获取模型
  163. self.model_handler = model_handler
  164. async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, trace_id: Optional[str] = None, model_name: Optional[str] = None, **kwargs):
  165. """
  166. 带指数退避的重试机制,每次重试都有独立的超时控制
  167. 注意:对于 502/503/504 等服务不可用错误,立即失败不重试,
  168. 避免在服务端过载时继续加重负载。
  169. """
  170. current_timeout = timeout or self.default_timeout
  171. model_info = model_name or "default"
  172. def _is_server_unavailable_error(error: Exception) -> bool:
  173. """判断是否为服务端不可用错误(应立即失败)"""
  174. error_str = str(error).lower()
  175. # 502: Bad Gateway, 503: Service Unavailable, 504: Gateway Timeout
  176. unavailable_codes = ['502', '503', '504', 'internal server error']
  177. return any(code in error_str for code in unavailable_codes)
  178. for attempt in range(self.max_retries + 1):
  179. try:
  180. # 每次重试都有独立的超时时间
  181. return await asyncio.wait_for(
  182. func(*args, **kwargs),
  183. timeout=current_timeout
  184. )
  185. except asyncio.TimeoutError as e:
  186. if attempt == self.max_retries:
  187. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终超时 | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
  188. raise TimeoutError(f"模型调用在 {self.max_retries} 次重试后均超时")
  189. wait_time = self.backoff_factor * (2 ** attempt)
  190. logger.warning(f"[模型调用] 第 {attempt + 1} 次超时, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
  191. await asyncio.sleep(wait_time)
  192. except Exception as e:
  193. error_str = str(e)
  194. # 服务端不可用错误(502/503/504)立即失败,不重试
  195. if _is_server_unavailable_error(e):
  196. logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  197. raise
  198. if attempt == self.max_retries:
  199. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  200. raise
  201. wait_time = self.backoff_factor * (2 ** attempt)
  202. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
  203. await asyncio.sleep(wait_time)
  204. async def get_model_generate_invoke(
  205. self,
  206. trace_id: str,
  207. task_prompt_info: Optional[dict] = None,
  208. messages: Optional[List[BaseMessage]] = None,
  209. system_prompt: Optional[str] = None,
  210. user_prompt: Optional[str] = None,
  211. prompt: Optional[str] = None,
  212. timeout: Optional[int] = None,
  213. model_name: Optional[str] = None,
  214. enable_thinking: Optional[bool] = False,
  215. function_name: Optional[str] = None
  216. ) -> str:
  217. """模型非流式生成(异步)
  218. 支持多种调用方式(优先级从高到低):
  219. 1. messages: 直接传入 LangChain Message 对象列表
  220. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  221. 3. prompt: 传入单条用户提示词字符串
  222. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  223. Args:
  224. trace_id: 追踪ID
  225. task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
  226. messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
  227. system_prompt: 系统提示词字符串
  228. user_prompt: 用户提示词字符串
  229. prompt: 单条用户提示词字符串(无系统提示时使用)
  230. timeout: 超时时间(秒),默认使用构造时的 default_timeout
  231. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  232. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  233. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
  234. Returns:
  235. str: 模型生成的文本内容
  236. Raises:
  237. ValueError: 参数组合错误
  238. TimeoutError: 调用超时
  239. Exception: 模型调用异常
  240. Examples:
  241. # 方式1: 使用 Message 列表(推荐)
  242. messages = [SystemMessage(content="你是专家"), HumanMessage(content="请分析...")]
  243. result = await client.get_model_generate_invoke("trace-001", messages=messages)
  244. # 方式2: 分别传入系统和用户提示词
  245. result = await client.get_model_generate_invoke(
  246. "trace-001",
  247. system_prompt="你是专家",
  248. user_prompt="请分析..."
  249. )
  250. # 方式3: 传入单条提示词
  251. result = await client.get_model_generate_invoke("trace-001", prompt="请分析...")
  252. # 方式4: 兼容旧接口(使用 PromptLoader)
  253. task_prompt_info = {"task_prompt": chat_template}
  254. result = await client.get_model_generate_invoke("trace-001", task_prompt_info=task_prompt_info)
  255. # 方式5: 使用功能名称从配置加载模型
  256. result = await client.get_model_generate_invoke("trace-001", function_name="doc_classification_tertiary", system_prompt="...", user_prompt="...")
  257. """
  258. start_time = time.time()
  259. current_timeout = timeout or self.default_timeout
  260. # 如果提供了功能名称,从配置加载模型和 thinking 模式
  261. if function_name:
  262. try:
  263. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  264. config_model = get_model_for_function(function_name)
  265. config_thinking = get_thinking_mode_for_function(function_name)
  266. if config_model:
  267. model_name = config_model
  268. logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  269. if config_thinking is not None and enable_thinking is False:
  270. # 只有默认 False 时才覆盖,显式传入的参数优先
  271. enable_thinking = config_thinking
  272. logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  273. except Exception as e:
  274. logger.warning(f"[模型调用] 加载功能配置失败 [{function_name}]: {e}")
  275. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  276. if not model_name:
  277. try:
  278. from foundation.ai.models.model_config_loader import get_model_for_function
  279. model_name = get_model_for_function("default")
  280. logger.info(f"[模型调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  281. except Exception as e:
  282. logger.warning(f"[模型调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  283. try:
  284. # 选择模型
  285. llm_to_use = self.model_handler.get_model_by_name(
  286. model_name, enable_thinking=(enable_thinking or False)
  287. ) if model_name else self.llm
  288. logger.info(f"[模型调用] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  289. # 构建消息列表(按优先级)
  290. final_messages = self._build_messages(
  291. messages=messages,
  292. system_prompt=system_prompt,
  293. user_prompt=user_prompt,
  294. prompt=prompt,
  295. task_prompt_info=task_prompt_info
  296. )
  297. # 针对 Qwen3.5 模型处理思考模式
  298. model_to_invoke = llm_to_use
  299. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  300. if is_qwen35:
  301. if enable_thinking is False:
  302. # 显式禁用思考模式
  303. model_to_invoke = llm_to_use.bind(
  304. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  305. )
  306. logger.debug(f"[模型调用] 已禁用 Qwen3.5 思考模式: {model_name}")
  307. elif enable_thinking is True:
  308. # 显式启用思考模式
  309. model_to_invoke = llm_to_use.bind(
  310. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  311. )
  312. logger.debug(f"[模型调用] 已启用 Qwen3.5 思考模式: {model_name}")
  313. else:
  314. # enable_thinking is None,使用模型默认行为(通常是启用)
  315. logger.debug(f"[模型调用] 使用 Qwen3.5 默认思考模式: {model_name}")
  316. # 定义模型调用函数,使用原生 ainvoke
  317. async def _invoke():
  318. return await model_to_invoke.ainvoke(final_messages)
  319. # 调用带重试机制
  320. response = await self._retry_with_backoff(_invoke, timeout=current_timeout, trace_id=trace_id, model_name=model_name or "default")
  321. elapsed_time = time.time() - start_time
  322. logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  323. return _strip_thinking_content(response.content)
  324. except asyncio.TimeoutError:
  325. elapsed_time = time.time() - start_time
  326. logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
  327. raise TimeoutError(f"模型调用超时,trace_id: {trace_id}")
  328. except Exception as e:
  329. elapsed_time = time.time() - start_time
  330. logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  331. raise
  332. def _build_messages(
  333. self,
  334. messages: Optional[List[BaseMessage]] = None,
  335. system_prompt: Optional[str] = None,
  336. user_prompt: Optional[str] = None,
  337. prompt: Optional[str] = None,
  338. task_prompt_info: Optional[dict] = None
  339. ) -> List[BaseMessage]:
  340. """构建消息列表(内部方法)
  341. 优先级:messages > system_prompt+user_prompt > prompt > task_prompt_info
  342. """
  343. # 方式1: 直接使用传入的 Message 列表
  344. if messages is not None:
  345. if not isinstance(messages, list):
  346. raise ValueError("messages 必须是列表")
  347. if len(messages) == 0:
  348. raise ValueError("messages 不能为空列表")
  349. logger.debug(f"使用传入的 messages 列表,共 {len(messages)} 条消息")
  350. return messages
  351. # 方式2: system_prompt + user_prompt
  352. if system_prompt is not None and user_prompt is not None:
  353. logger.debug("使用 system_prompt + user_prompt 构建消息")
  354. return [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
  355. # 方式3: 单独 system_prompt(可能是特殊情况)
  356. if system_prompt is not None:
  357. logger.debug("使用单独的 system_prompt 构建消息")
  358. return [SystemMessage(content=system_prompt)]
  359. # 方式4: 单条 prompt 字符串
  360. if prompt is not None:
  361. logger.debug("使用单条 prompt 字符串构建消息")
  362. return [HumanMessage(content=prompt)]
  363. # 方式5: 兼容旧接口 task_prompt_info
  364. if task_prompt_info is not None:
  365. if "task_prompt" not in task_prompt_info:
  366. raise ValueError("task_prompt_info 必须包含 'task_prompt' 键")
  367. task_prompt = task_prompt_info["task_prompt"]
  368. if hasattr(task_prompt, 'format_messages'):
  369. logger.debug("使用 task_prompt_info 中的 ChatPromptTemplate 构建消息")
  370. return task_prompt.format_messages()
  371. elif isinstance(task_prompt, str):
  372. logger.debug("使用 task_prompt_info 中的字符串构建消息")
  373. return [HumanMessage(content=task_prompt)]
  374. else:
  375. raise ValueError(f"task_prompt 类型不支持: {type(task_prompt)}")
  376. # 没有提供任何有效参数
  377. raise ValueError(
  378. "必须提供以下参数之一: "
  379. "messages, system_prompt+user_prompt, prompt, 或 task_prompt_info"
  380. )
  381. def get_model_generate_invoke_sync(
  382. self,
  383. trace_id: str,
  384. task_prompt_info: Optional[dict] = None,
  385. messages: Optional[List[BaseMessage]] = None,
  386. system_prompt: Optional[str] = None,
  387. user_prompt: Optional[str] = None,
  388. prompt: Optional[str] = None,
  389. timeout: Optional[int] = None,
  390. model_name: Optional[str] = None,
  391. enable_thinking: Optional[bool] = False,
  392. function_name: Optional[str] = None
  393. ) -> str:
  394. """模型非流式生成(同步版本)
  395. 适用于同步上下文调用,功能与异步版本完全一致。
  396. 支持多种调用方式(优先级从高到低):
  397. 1. messages: 直接传入 LangChain Message 对象列表
  398. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  399. 3. prompt: 传入单条用户提示词字符串
  400. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  401. Args:
  402. trace_id: 追踪ID
  403. task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
  404. messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
  405. system_prompt: 系统提示词字符串
  406. user_prompt: 用户提示词字符串
  407. prompt: 单条用户提示词字符串(无系统提示时使用)
  408. timeout: 超时时间(秒),默认使用构造时的 default_timeout(同步版本忽略)
  409. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  410. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  411. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
  412. Returns:
  413. str: 模型生成的文本内容
  414. Raises:
  415. ValueError: 参数组合错误
  416. Exception: 模型调用异常
  417. Examples:
  418. # 同步调用(用于同步上下文)
  419. result = generate_model_client.get_model_generate_invoke_sync(
  420. "trace-001",
  421. system_prompt="你是专家",
  422. user_prompt="请分析...",
  423. function_name="doc_classification_secondary"
  424. )
  425. """
  426. start_time = time.time()
  427. # 如果提供了功能名称,从配置加载模型和 thinking 模式
  428. if function_name:
  429. try:
  430. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  431. config_model = get_model_for_function(function_name)
  432. config_thinking = get_thinking_mode_for_function(function_name)
  433. if config_model:
  434. model_name = config_model
  435. logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  436. if config_thinking is not None and enable_thinking is False:
  437. enable_thinking = config_thinking
  438. logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  439. except Exception as e:
  440. logger.warning(f"[模型调用-同步] 加载功能配置失败 [{function_name}]: {e}")
  441. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  442. if not model_name:
  443. try:
  444. from foundation.ai.models.model_config_loader import get_model_for_function
  445. model_name = get_model_for_function("default")
  446. logger.info(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  447. except Exception as e:
  448. logger.warning(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  449. try:
  450. # 选择模型
  451. llm_to_use = self.model_handler.get_model_by_name(
  452. model_name, enable_thinking=(enable_thinking or False)
  453. ) if model_name else self.llm
  454. logger.info(f"[模型调用-同步] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  455. # 构建消息列表(按优先级)
  456. final_messages = self._build_messages(
  457. messages=messages,
  458. system_prompt=system_prompt,
  459. user_prompt=user_prompt,
  460. prompt=prompt,
  461. task_prompt_info=task_prompt_info
  462. )
  463. # 针对 Qwen3.5 模型处理思考模式
  464. model_to_invoke = llm_to_use
  465. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  466. if is_qwen35:
  467. if enable_thinking is False:
  468. model_to_invoke = llm_to_use.bind(
  469. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  470. )
  471. logger.debug(f"[模型调用-同步] 已禁用 Qwen3.5 思考模式: {model_name}")
  472. elif enable_thinking is True:
  473. model_to_invoke = llm_to_use.bind(
  474. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  475. )
  476. logger.debug(f"[模型调用-同步] 已启用 Qwen3.5 思考模式: {model_name}")
  477. else:
  478. logger.debug(f"[模型调用-同步] 使用 Qwen3.5 默认思考模式: {model_name}")
  479. # 定义模型调用函数,使用同步 invoke
  480. def _invoke():
  481. return model_to_invoke.invoke(final_messages)
  482. # 调用带重试机制(同步版本)
  483. response = _sync_retry_with_backoff(
  484. _invoke,
  485. max_retries=self.max_retries,
  486. backoff_factor=self.backoff_factor,
  487. trace_id=trace_id,
  488. model_name=model_name or "default"
  489. )
  490. elapsed_time = time.time() - start_time
  491. logger.info(f"[模型调用-同步] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  492. return _strip_thinking_content(response.content)
  493. except Exception as e:
  494. elapsed_time = time.time() - start_time
  495. logger.error(f"[模型调用-同步] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  496. raise
  497. def get_model_generate_stream(
  498. self,
  499. trace_id: str,
  500. task_prompt_info: Optional[dict] = None,
  501. messages: Optional[List[BaseMessage]] = None,
  502. system_prompt: Optional[str] = None,
  503. user_prompt: Optional[str] = None,
  504. prompt: Optional[str] = None,
  505. timeout: Optional[int] = None,
  506. model_name: Optional[str] = None,
  507. enable_thinking: Optional[bool] = False,
  508. function_name: Optional[str] = None
  509. ):
  510. """模型流式生成(同步生成器)
  511. 支持多种调用方式(优先级从高到低):
  512. 1. messages: 直接传入 LangChain Message 对象列表
  513. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  514. 3. prompt: 传入单条用户提示词字符串
  515. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  516. Args:
  517. trace_id: 追踪ID
  518. task_prompt_info: 任务提示词信息(兼容旧接口)
  519. messages: LangChain Message 对象列表
  520. system_prompt: 系统提示词字符串
  521. user_prompt: 用户提示词字符串
  522. prompt: 单条用户提示词字符串
  523. timeout: 超时时间(秒)
  524. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  525. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型配置
  526. Yields:
  527. str: 生成的文本块
  528. Raises:
  529. ValueError: 参数组合错误
  530. """
  531. start_time = time.time()
  532. current_timeout = timeout or self.default_timeout
  533. # 如果提供了功能名称,从配置加载模型
  534. if function_name:
  535. try:
  536. from foundation.ai.models.model_config_loader import get_model_for_function
  537. config_model = get_model_for_function(function_name)
  538. if config_model:
  539. model_name = config_model
  540. logger.info(f"[模型流式调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  541. except Exception as e:
  542. logger.warning(f"[模型流式调用] 加载功能配置失败 [{function_name}]: {e}")
  543. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  544. if not model_name:
  545. try:
  546. from foundation.ai.models.model_config_loader import get_model_for_function
  547. model_name = get_model_for_function("default")
  548. logger.info(f"[模型流式调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  549. except Exception as e:
  550. logger.warning(f"[模型流式调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  551. try:
  552. # 选择模型
  553. llm_to_use = self.model_handler.get_model_by_name(
  554. model_name, enable_thinking=(enable_thinking or False)
  555. ) if model_name else self.llm
  556. logger.info(f"[模型流式调用] 使用{'指定' if model_name else '默认'}模型:{model_name or 'default'}, trace_id: {trace_id}")
  557. logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
  558. # 构建消息列表
  559. final_messages = self._build_messages(
  560. messages=messages,
  561. system_prompt=system_prompt,
  562. user_prompt=user_prompt,
  563. prompt=prompt,
  564. task_prompt_info=task_prompt_info
  565. )
  566. response = llm_to_use.stream(final_messages)
  567. chunk_count = 0
  568. think_filter = _ThinkingBlockStreamFilter()
  569. for chunk in response:
  570. chunk_count += 1
  571. if hasattr(chunk, 'content') and chunk.content:
  572. cleaned = think_filter.feed(chunk.content)
  573. if cleaned:
  574. yield cleaned
  575. elif chunk:
  576. yield chunk
  577. tail = think_filter.flush()
  578. if tail:
  579. yield tail
  580. elapsed_time = time.time() - start_time
  581. logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s")
  582. except Exception as e:
  583. elapsed_time = time.time() - start_time
  584. logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  585. raise
  586. generate_model_client = GenerateModelClient(default_timeout=120, max_retries=10, backoff_factor=0.5)