model_generate.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. # !/usr/bin/ python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :model_generate.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/14 14:22
  9. '''
  10. from langchain_core.prompts import ChatPromptTemplate
  11. from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
  12. from foundation.ai.models.model_handler import model_handler
  13. from foundation.observability.logger.loggering import review_logger as logger
  14. import asyncio
  15. import re
  16. import time
  17. from typing import Optional, Callable, Any, List, Union
  18. # ============================================================
  19. # 思考内容过滤(统一收敛在调用层)
  20. #
  21. # Qwen3.5 等模型在 enable_thinking=True 时会输出思考过程,
  22. # 标准格式为 <think>...</think>,但部分 SGLang 部署会输出
  23. # "Thinking Process: ... </think>"(缺 <think> 开标签)的畸形格式。
  24. # 统一在此处去除,避免思考内容污染业务输出。
  25. # ============================================================
  26. _THINK_BLOCK_PATTERN = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
  27. _DANGLING_THINK_PATTERN = re.compile(r"<think>[\s\S]*$")
  28. # SGLang 畸形格式:Thinking Process: ... </think>(有闭标签无开标签)
  29. _SGLANG_THINK_PATTERN = re.compile(r"Thinking\s+Process:\s*[\s\S]*?</think>\s*", re.DOTALL)
  30. def _strip_thinking_content(content: str) -> str:
  31. """去除完整响应中的思考内容。
  32. - 标准 <think>...</think> 块:整段去除
  33. - SGLang Thinking Process: ... </think> 畸形格式:整段去除
  34. - 仅 <think> 无 </think>(被截断):从 <think> 起全部丢弃
  35. - 不含思考标签:原文返回
  36. """
  37. if not content:
  38. return content
  39. cleaned = _THINK_BLOCK_PATTERN.sub("", content)
  40. cleaned = _SGLANG_THINK_PATTERN.sub("", cleaned)
  41. if "<think>" in cleaned:
  42. cleaned = _DANGLING_THINK_PATTERN.sub("", cleaned)
  43. logger.warning("[模型调用] 响应包含未闭合的 <think> 块,已截断丢弃")
  44. return cleaned.strip()
  45. class _ThinkingBlockStreamFilter:
  46. """流式响应中过滤思考内容的状态机。
  47. 处理 chunk 边界穿过标签的情况,保证调用方拿到的流不会泄漏
  48. 任何思考片段。支持标准 <think>...</think> 和 SGLang 畸形
  49. "Thinking Process: ... </think>" 两种格式。
  50. """
  51. _OPEN = "<think>"
  52. _CLOSE = "</think>"
  53. _SGLANG_OPEN = "Thinking Process:"
  54. def __init__(self):
  55. self._buf = ""
  56. self._inside = False
  57. self._open_tag = ""
  58. def feed(self, chunk: str) -> str:
  59. """喂入一个 chunk,返回此刻应输出的内容(可能为空字符串)。"""
  60. if not chunk:
  61. return ""
  62. self._buf += chunk
  63. out = []
  64. while True:
  65. if self._inside:
  66. idx = self._buf.find(self._CLOSE)
  67. if idx == -1:
  68. keep_len = self._partial_match_len(self._buf, self._CLOSE)
  69. self._buf = self._buf[-keep_len:] if keep_len else ""
  70. break
  71. self._buf = self._buf[idx + len(self._CLOSE):].lstrip()
  72. self._inside = False
  73. else:
  74. idx_xml = self._buf.find(self._OPEN)
  75. idx_sg = self._buf.find(self._SGLANG_OPEN)
  76. if idx_xml == -1 and idx_sg == -1:
  77. keep_xml = self._partial_match_len(self._buf, self._OPEN)
  78. keep_sg = self._partial_match_len(self._buf, self._SGLANG_OPEN)
  79. keep_len = max(keep_xml, keep_sg)
  80. if keep_len:
  81. out.append(self._buf[:-keep_len])
  82. self._buf = self._buf[-keep_len:]
  83. else:
  84. out.append(self._buf)
  85. self._buf = ""
  86. break
  87. if idx_xml >= 0 and (idx_sg == -1 or idx_xml <= idx_sg):
  88. if idx_xml > 0:
  89. out.append(self._buf[:idx_xml])
  90. self._buf = self._buf[idx_xml + len(self._OPEN):]
  91. self._open_tag = self._OPEN
  92. else:
  93. if idx_sg > 0:
  94. out.append(self._buf[:idx_sg])
  95. self._buf = self._buf[idx_sg + len(self._SGLANG_OPEN):]
  96. self._open_tag = self._SGLANG_OPEN
  97. self._inside = True
  98. return "".join(out)
  99. def flush(self) -> str:
  100. """流结束时调用,返回缓冲区剩余可输出内容。"""
  101. if self._inside:
  102. logger.warning("[模型流式调用] 流结束时仍在 <think> 块内,已丢弃尾部")
  103. self._buf = ""
  104. return ""
  105. result = self._buf
  106. self._buf = ""
  107. return result
  108. @staticmethod
  109. def _partial_match_len(buf: str, tag: str) -> int:
  110. """返回 buf 末尾匹配 tag 前缀的最大长度(避免标签被切断后误输出)。"""
  111. max_n = min(len(tag) - 1, len(buf))
  112. for n in range(max_n, 0, -1):
  113. if buf[-n:] == tag[:n]:
  114. return n
  115. return 0
  116. def _sync_retry_with_backoff(
  117. func: Callable,
  118. *args,
  119. max_retries: int = 3,
  120. backoff_factor: float = 1.0,
  121. trace_id: Optional[str] = None,
  122. model_name: Optional[str] = None,
  123. **kwargs
  124. ) -> Any:
  125. """
  126. 同步版本的带指数退避重试机制
  127. 注意:对于 502/503/504 等服务不可用错误,立即失败不重试
  128. """
  129. model_info = model_name or "default"
  130. def _is_server_unavailable_error(error: Exception) -> bool:
  131. """判断是否为服务端不可用错误(应立即失败)"""
  132. error_str = str(error).lower()
  133. unavailable_codes = ['502', '503', '504', 'internal server error']
  134. return any(code in error_str for code in unavailable_codes)
  135. for attempt in range(max_retries + 1):
  136. try:
  137. return func(*args, **kwargs)
  138. except Exception as e:
  139. error_str = str(e)
  140. # 服务端不可用错误(502/503/504)立即失败,不重试
  141. if _is_server_unavailable_error(e):
  142. logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  143. raise
  144. if attempt == max_retries:
  145. logger.error(f"[模型调用] 达到最大重试次数 {max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  146. raise
  147. wait_time = backoff_factor * (2 ** attempt)
  148. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
  149. time.sleep(wait_time)
  150. class GenerateModelClient:
  151. """
  152. 主要是生成式模型
  153. """
  154. def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0):
  155. # 获取默认模型
  156. self.llm = model_handler.get_models()
  157. self.chat = self.llm # 当前chat和llm使用相同模型
  158. # 配置参数
  159. self.default_timeout = default_timeout
  160. self.max_retries = max_retries
  161. self.backoff_factor = backoff_factor
  162. # 保存model_handler引用,用于动态获取模型
  163. self.model_handler = model_handler
  164. async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, trace_id: Optional[str] = None, model_name: Optional[str] = None, **kwargs):
  165. """
  166. 带指数退避的重试机制,每次重试都有独立的超时控制
  167. 注意:对于 502/503/504 等服务不可用错误,立即失败不重试,
  168. 避免在服务端过载时继续加重负载。
  169. """
  170. current_timeout = timeout or self.default_timeout
  171. model_info = model_name or "default"
  172. def _is_server_unavailable_error(error: Exception) -> bool:
  173. """判断是否为服务端不可用错误(应立即失败)"""
  174. error_str = str(error).lower()
  175. # 502: Bad Gateway, 503: Service Unavailable, 504: Gateway Timeout
  176. unavailable_codes = ['502', '503', '504', 'internal server error']
  177. return any(code in error_str for code in unavailable_codes)
  178. for attempt in range(self.max_retries + 1):
  179. try:
  180. # 每次重试都有独立的超时时间
  181. return await asyncio.wait_for(
  182. func(*args, **kwargs),
  183. timeout=current_timeout
  184. )
  185. except asyncio.TimeoutError as e:
  186. if attempt == self.max_retries:
  187. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终超时 | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
  188. raise TimeoutError(f"模型调用在 {self.max_retries} 次重试后均超时")
  189. wait_time = self.backoff_factor * (2 ** attempt)
  190. logger.warning(f"[模型调用] 第 {attempt + 1} 次超时, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
  191. await asyncio.sleep(wait_time)
  192. except Exception as e:
  193. error_str = str(e)
  194. # 服务端不可用错误(502/503/504)立即失败,不重试
  195. if _is_server_unavailable_error(e):
  196. logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  197. raise
  198. if attempt == self.max_retries:
  199. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  200. raise
  201. wait_time = self.backoff_factor * (2 ** attempt)
  202. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
  203. await asyncio.sleep(wait_time)
  204. async def get_model_generate_invoke(
  205. self,
  206. trace_id: str,
  207. task_prompt_info: Optional[dict] = None,
  208. messages: Optional[List[BaseMessage]] = None,
  209. system_prompt: Optional[str] = None,
  210. user_prompt: Optional[str] = None,
  211. prompt: Optional[str] = None,
  212. timeout: Optional[int] = None,
  213. model_name: Optional[str] = None,
  214. enable_thinking: Optional[bool] = False,
  215. function_name: Optional[str] = None
  216. ) -> str:
  217. """模型非流式生成(异步)
  218. 支持多种调用方式(优先级从高到低):
  219. 1. messages: 直接传入 LangChain Message 对象列表
  220. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  221. 3. prompt: 传入单条用户提示词字符串
  222. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  223. Args:
  224. trace_id: 追踪ID
  225. task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
  226. messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
  227. system_prompt: 系统提示词字符串
  228. user_prompt: 用户提示词字符串
  229. prompt: 单条用户提示词字符串(无系统提示时使用)
  230. timeout: 超时时间(秒),默认使用构造时的 default_timeout
  231. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  232. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  233. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
  234. Returns:
  235. str: 模型生成的文本内容
  236. Raises:
  237. ValueError: 参数组合错误
  238. TimeoutError: 调用超时
  239. Exception: 模型调用异常
  240. Examples:
  241. # 方式1: 使用 Message 列表(推荐)
  242. messages = [SystemMessage(content="你是专家"), HumanMessage(content="请分析...")]
  243. result = await client.get_model_generate_invoke("trace-001", messages=messages)
  244. # 方式2: 分别传入系统和用户提示词
  245. result = await client.get_model_generate_invoke(
  246. "trace-001",
  247. system_prompt="你是专家",
  248. user_prompt="请分析..."
  249. )
  250. # 方式3: 传入单条提示词
  251. result = await client.get_model_generate_invoke("trace-001", prompt="请分析...")
  252. # 方式4: 兼容旧接口(使用 PromptLoader)
  253. task_prompt_info = {"task_prompt": chat_template}
  254. result = await client.get_model_generate_invoke("trace-001", task_prompt_info=task_prompt_info)
  255. # 方式5: 使用功能名称从配置加载模型
  256. result = await client.get_model_generate_invoke("trace-001", function_name="doc_classification_tertiary", system_prompt="...", user_prompt="...")
  257. """
  258. start_time = time.time()
  259. current_timeout = timeout or self.default_timeout
  260. # 如果提供了功能名称,从配置加载模型和 thinking 模式
  261. if function_name:
  262. try:
  263. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  264. config_model = get_model_for_function(function_name)
  265. config_thinking = get_thinking_mode_for_function(function_name)
  266. if config_model:
  267. model_name = config_model
  268. logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  269. if config_thinking is not None and enable_thinking is False:
  270. # 只有默认 False 时才覆盖,显式传入的参数优先
  271. enable_thinking = config_thinking
  272. logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  273. except Exception as e:
  274. logger.warning(f"[模型调用] 加载功能配置失败 [{function_name}]: {e}")
  275. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  276. if not model_name:
  277. try:
  278. from foundation.ai.models.model_config_loader import get_model_for_function
  279. model_name = get_model_for_function("default")
  280. logger.info(f"[模型调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  281. except Exception as e:
  282. logger.warning(f"[模型调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  283. try:
  284. # 选择模型
  285. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  286. logger.info(f"[模型调用] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  287. # 构建消息列表(按优先级)
  288. final_messages = self._build_messages(
  289. messages=messages,
  290. system_prompt=system_prompt,
  291. user_prompt=user_prompt,
  292. prompt=prompt,
  293. task_prompt_info=task_prompt_info
  294. )
  295. # 针对 Qwen3.5 模型处理思考模式
  296. model_to_invoke = llm_to_use
  297. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  298. if is_qwen35:
  299. if enable_thinking is False:
  300. # 显式禁用思考模式
  301. model_to_invoke = llm_to_use.bind(
  302. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  303. )
  304. logger.debug(f"[模型调用] 已禁用 Qwen3.5 思考模式: {model_name}")
  305. elif enable_thinking is True:
  306. # 显式启用思考模式
  307. model_to_invoke = llm_to_use.bind(
  308. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  309. )
  310. logger.debug(f"[模型调用] 已启用 Qwen3.5 思考模式: {model_name}")
  311. else:
  312. # enable_thinking is None,使用模型默认行为(通常是启用)
  313. logger.debug(f"[模型调用] 使用 Qwen3.5 默认思考模式: {model_name}")
  314. # 定义模型调用函数,使用原生 ainvoke
  315. async def _invoke():
  316. return await model_to_invoke.ainvoke(final_messages)
  317. # 调用带重试机制
  318. response = await self._retry_with_backoff(_invoke, timeout=current_timeout, trace_id=trace_id, model_name=model_name or "default")
  319. elapsed_time = time.time() - start_time
  320. logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  321. return _strip_thinking_content(response.content)
  322. except asyncio.TimeoutError:
  323. elapsed_time = time.time() - start_time
  324. logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
  325. raise TimeoutError(f"模型调用超时,trace_id: {trace_id}")
  326. except Exception as e:
  327. elapsed_time = time.time() - start_time
  328. logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  329. raise
  330. def _build_messages(
  331. self,
  332. messages: Optional[List[BaseMessage]] = None,
  333. system_prompt: Optional[str] = None,
  334. user_prompt: Optional[str] = None,
  335. prompt: Optional[str] = None,
  336. task_prompt_info: Optional[dict] = None
  337. ) -> List[BaseMessage]:
  338. """构建消息列表(内部方法)
  339. 优先级:messages > system_prompt+user_prompt > prompt > task_prompt_info
  340. """
  341. # 方式1: 直接使用传入的 Message 列表
  342. if messages is not None:
  343. if not isinstance(messages, list):
  344. raise ValueError("messages 必须是列表")
  345. if len(messages) == 0:
  346. raise ValueError("messages 不能为空列表")
  347. logger.debug(f"使用传入的 messages 列表,共 {len(messages)} 条消息")
  348. return messages
  349. # 方式2: system_prompt + user_prompt
  350. if system_prompt is not None and user_prompt is not None:
  351. logger.debug("使用 system_prompt + user_prompt 构建消息")
  352. return [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
  353. # 方式3: 单独 system_prompt(可能是特殊情况)
  354. if system_prompt is not None:
  355. logger.debug("使用单独的 system_prompt 构建消息")
  356. return [SystemMessage(content=system_prompt)]
  357. # 方式4: 单条 prompt 字符串
  358. if prompt is not None:
  359. logger.debug("使用单条 prompt 字符串构建消息")
  360. return [HumanMessage(content=prompt)]
  361. # 方式5: 兼容旧接口 task_prompt_info
  362. if task_prompt_info is not None:
  363. if "task_prompt" not in task_prompt_info:
  364. raise ValueError("task_prompt_info 必须包含 'task_prompt' 键")
  365. task_prompt = task_prompt_info["task_prompt"]
  366. if hasattr(task_prompt, 'format_messages'):
  367. logger.debug("使用 task_prompt_info 中的 ChatPromptTemplate 构建消息")
  368. return task_prompt.format_messages()
  369. elif isinstance(task_prompt, str):
  370. logger.debug("使用 task_prompt_info 中的字符串构建消息")
  371. return [HumanMessage(content=task_prompt)]
  372. else:
  373. raise ValueError(f"task_prompt 类型不支持: {type(task_prompt)}")
  374. # 没有提供任何有效参数
  375. raise ValueError(
  376. "必须提供以下参数之一: "
  377. "messages, system_prompt+user_prompt, prompt, 或 task_prompt_info"
  378. )
  379. def get_model_generate_invoke_sync(
  380. self,
  381. trace_id: str,
  382. task_prompt_info: Optional[dict] = None,
  383. messages: Optional[List[BaseMessage]] = None,
  384. system_prompt: Optional[str] = None,
  385. user_prompt: Optional[str] = None,
  386. prompt: Optional[str] = None,
  387. timeout: Optional[int] = None,
  388. model_name: Optional[str] = None,
  389. enable_thinking: Optional[bool] = False,
  390. function_name: Optional[str] = None
  391. ) -> str:
  392. """模型非流式生成(同步版本)
  393. 适用于同步上下文调用,功能与异步版本完全一致。
  394. 支持多种调用方式(优先级从高到低):
  395. 1. messages: 直接传入 LangChain Message 对象列表
  396. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  397. 3. prompt: 传入单条用户提示词字符串
  398. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  399. Args:
  400. trace_id: 追踪ID
  401. task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
  402. messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
  403. system_prompt: 系统提示词字符串
  404. user_prompt: 用户提示词字符串
  405. prompt: 单条用户提示词字符串(无系统提示时使用)
  406. timeout: 超时时间(秒),默认使用构造时的 default_timeout(同步版本忽略)
  407. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  408. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  409. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
  410. Returns:
  411. str: 模型生成的文本内容
  412. Raises:
  413. ValueError: 参数组合错误
  414. Exception: 模型调用异常
  415. Examples:
  416. # 同步调用(用于同步上下文)
  417. result = generate_model_client.get_model_generate_invoke_sync(
  418. "trace-001",
  419. system_prompt="你是专家",
  420. user_prompt="请分析...",
  421. function_name="doc_classification_secondary"
  422. )
  423. """
  424. start_time = time.time()
  425. # 如果提供了功能名称,从配置加载模型和 thinking 模式
  426. if function_name:
  427. try:
  428. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  429. config_model = get_model_for_function(function_name)
  430. config_thinking = get_thinking_mode_for_function(function_name)
  431. if config_model:
  432. model_name = config_model
  433. logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  434. if config_thinking is not None and enable_thinking is False:
  435. enable_thinking = config_thinking
  436. logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  437. except Exception as e:
  438. logger.warning(f"[模型调用-同步] 加载功能配置失败 [{function_name}]: {e}")
  439. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  440. if not model_name:
  441. try:
  442. from foundation.ai.models.model_config_loader import get_model_for_function
  443. model_name = get_model_for_function("default")
  444. logger.info(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  445. except Exception as e:
  446. logger.warning(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  447. try:
  448. # 选择模型
  449. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  450. logger.info(f"[模型调用-同步] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  451. # 构建消息列表(按优先级)
  452. final_messages = self._build_messages(
  453. messages=messages,
  454. system_prompt=system_prompt,
  455. user_prompt=user_prompt,
  456. prompt=prompt,
  457. task_prompt_info=task_prompt_info
  458. )
  459. # 针对 Qwen3.5 模型处理思考模式
  460. model_to_invoke = llm_to_use
  461. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  462. if is_qwen35:
  463. if enable_thinking is False:
  464. model_to_invoke = llm_to_use.bind(
  465. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  466. )
  467. logger.debug(f"[模型调用-同步] 已禁用 Qwen3.5 思考模式: {model_name}")
  468. elif enable_thinking is True:
  469. model_to_invoke = llm_to_use.bind(
  470. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  471. )
  472. logger.debug(f"[模型调用-同步] 已启用 Qwen3.5 思考模式: {model_name}")
  473. else:
  474. logger.debug(f"[模型调用-同步] 使用 Qwen3.5 默认思考模式: {model_name}")
  475. # 定义模型调用函数,使用同步 invoke
  476. def _invoke():
  477. return model_to_invoke.invoke(final_messages)
  478. # 调用带重试机制(同步版本)
  479. response = _sync_retry_with_backoff(
  480. _invoke,
  481. max_retries=self.max_retries,
  482. backoff_factor=self.backoff_factor,
  483. trace_id=trace_id,
  484. model_name=model_name or "default"
  485. )
  486. elapsed_time = time.time() - start_time
  487. logger.info(f"[模型调用-同步] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  488. return _strip_thinking_content(response.content)
  489. except Exception as e:
  490. elapsed_time = time.time() - start_time
  491. logger.error(f"[模型调用-同步] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  492. raise
  493. def get_model_generate_stream(
  494. self,
  495. trace_id: str,
  496. task_prompt_info: Optional[dict] = None,
  497. messages: Optional[List[BaseMessage]] = None,
  498. system_prompt: Optional[str] = None,
  499. user_prompt: Optional[str] = None,
  500. prompt: Optional[str] = None,
  501. timeout: Optional[int] = None,
  502. model_name: Optional[str] = None,
  503. function_name: Optional[str] = None
  504. ):
  505. """模型流式生成(同步生成器)
  506. 支持多种调用方式(优先级从高到低):
  507. 1. messages: 直接传入 LangChain Message 对象列表
  508. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  509. 3. prompt: 传入单条用户提示词字符串
  510. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  511. Args:
  512. trace_id: 追踪ID
  513. task_prompt_info: 任务提示词信息(兼容旧接口)
  514. messages: LangChain Message 对象列表
  515. system_prompt: 系统提示词字符串
  516. user_prompt: 用户提示词字符串
  517. prompt: 单条用户提示词字符串
  518. timeout: 超时时间(秒)
  519. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  520. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型配置
  521. Yields:
  522. str: 生成的文本块
  523. Raises:
  524. ValueError: 参数组合错误
  525. """
  526. start_time = time.time()
  527. current_timeout = timeout or self.default_timeout
  528. # 如果提供了功能名称,从配置加载模型
  529. if function_name:
  530. try:
  531. from foundation.ai.models.model_config_loader import get_model_for_function
  532. config_model = get_model_for_function(function_name)
  533. if config_model:
  534. model_name = config_model
  535. logger.info(f"[模型流式调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  536. except Exception as e:
  537. logger.warning(f"[模型流式调用] 加载功能配置失败 [{function_name}]: {e}")
  538. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  539. if not model_name:
  540. try:
  541. from foundation.ai.models.model_config_loader import get_model_for_function
  542. model_name = get_model_for_function("default")
  543. logger.info(f"[模型流式调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  544. except Exception as e:
  545. logger.warning(f"[模型流式调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  546. try:
  547. # 选择模型
  548. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  549. logger.info(f"[模型流式调用] 使用{'指定' if model_name else '默认'}模型:{model_name or 'default'}, trace_id: {trace_id}")
  550. logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
  551. # 构建消息列表
  552. final_messages = self._build_messages(
  553. messages=messages,
  554. system_prompt=system_prompt,
  555. user_prompt=user_prompt,
  556. prompt=prompt,
  557. task_prompt_info=task_prompt_info
  558. )
  559. response = llm_to_use.stream(final_messages)
  560. chunk_count = 0
  561. think_filter = _ThinkingBlockStreamFilter()
  562. for chunk in response:
  563. chunk_count += 1
  564. if hasattr(chunk, 'content') and chunk.content:
  565. cleaned = think_filter.feed(chunk.content)
  566. if cleaned:
  567. yield cleaned
  568. elif chunk:
  569. yield chunk
  570. tail = think_filter.flush()
  571. if tail:
  572. yield tail
  573. elapsed_time = time.time() - start_time
  574. logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s")
  575. except Exception as e:
  576. elapsed_time = time.time() - start_time
  577. logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  578. raise
  579. generate_model_client = GenerateModelClient(default_timeout=60, max_retries=10, backoff_factor=0.5)