model_generate.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825
  1. # !/usr/bin/ python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :model_generate.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/14 14:22
  9. '''
  10. from langchain_core.prompts import ChatPromptTemplate
  11. from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage
  12. from foundation.ai.models.model_handler import model_handler
  13. from foundation.observability.logger.loggering import write_logger as logger
  14. import asyncio
  15. import queue
  16. import re
  17. import threading
  18. import time
  19. from typing import Optional, Callable, Any, AsyncGenerator, List, Union
  20. def _is_non_retryable_model_error(error: Exception) -> bool:
  21. error_str = str(error).lower()
  22. non_retryable_tokens = [
  23. "401",
  24. "403",
  25. "unauthorized",
  26. "forbidden",
  27. "invalid api key",
  28. "incorrect api key",
  29. "authentication",
  30. "permission denied",
  31. ]
  32. return any(token in error_str for token in non_retryable_tokens)
  33. # ============================================================
  34. # 思考内容过滤(统一收敛在调用层)
  35. #
  36. # Qwen3.5 等模型在 enable_thinking=True 时会输出思考过程,
  37. # 标准格式为 <think>...</think>,但部分 SGLang 部署会输出
  38. # "Thinking Process: ... </think>"(缺 <think> 开标签)的畸形格式。
  39. # 统一在此处去除,避免思考内容污染业务输出。
  40. # ============================================================
  41. _THINK_BLOCK_PATTERN = re.compile(r"<think>.*?</think>\s*", re.DOTALL)
  42. _DANGLING_THINK_PATTERN = re.compile(r"<think>[\s\S]*$")
  43. # SGLang 畸形格式:Thinking Process: ... </think>(有闭标签无开标签)
  44. _SGLANG_THINK_PATTERN = re.compile(r"Thinking\s+Process:\s*[\s\S]*?</think>\s*", re.DOTALL)
  45. def _strip_thinking_content(content: str) -> str:
  46. """去除完整响应中的思考内容。
  47. - 标准 <think>...</think> 块:整段去除
  48. - SGLang Thinking Process: ... </think> 畸形格式:整段去除
  49. - 仅 <think> 无 </think>(被截断):从 <think> 起全部丢弃
  50. - 不含思考标签:原文返回
  51. """
  52. if not content:
  53. return content
  54. cleaned = _THINK_BLOCK_PATTERN.sub("", content)
  55. cleaned = _SGLANG_THINK_PATTERN.sub("", cleaned)
  56. if "<think>" in cleaned:
  57. cleaned = _DANGLING_THINK_PATTERN.sub("", cleaned)
  58. logger.warning("[模型调用] 响应包含未闭合的 <think> 块,已截断丢弃")
  59. return cleaned.strip()
  60. class _ThinkingBlockStreamFilter:
  61. """流式响应中过滤思考内容的状态机。
  62. 处理 chunk 边界穿过标签的情况,保证调用方拿到的流不会泄漏
  63. 任何思考片段。支持标准 <think>...</think> 和 SGLang 畸形
  64. "Thinking Process: ... </think>" 两种格式。
  65. """
  66. _OPEN = "<think>"
  67. _CLOSE = "</think>"
  68. _SGLANG_OPEN = "Thinking Process:"
  69. def __init__(self):
  70. self._buf = ""
  71. self._inside = False
  72. self._open_tag = ""
  73. def feed(self, chunk: str) -> str:
  74. """喂入一个 chunk,返回此刻应输出的内容(可能为空字符串)。"""
  75. if not chunk:
  76. return ""
  77. self._buf += chunk
  78. out = []
  79. while True:
  80. if self._inside:
  81. idx = self._buf.find(self._CLOSE)
  82. if idx == -1:
  83. keep_len = self._partial_match_len(self._buf, self._CLOSE)
  84. self._buf = self._buf[-keep_len:] if keep_len else ""
  85. break
  86. self._buf = self._buf[idx + len(self._CLOSE):].lstrip()
  87. self._inside = False
  88. else:
  89. idx_xml = self._buf.find(self._OPEN)
  90. idx_sg = self._buf.find(self._SGLANG_OPEN)
  91. if idx_xml == -1 and idx_sg == -1:
  92. keep_xml = self._partial_match_len(self._buf, self._OPEN)
  93. keep_sg = self._partial_match_len(self._buf, self._SGLANG_OPEN)
  94. keep_len = max(keep_xml, keep_sg)
  95. if keep_len:
  96. out.append(self._buf[:-keep_len])
  97. self._buf = self._buf[-keep_len:]
  98. else:
  99. out.append(self._buf)
  100. self._buf = ""
  101. break
  102. if idx_xml >= 0 and (idx_sg == -1 or idx_xml <= idx_sg):
  103. if idx_xml > 0:
  104. out.append(self._buf[:idx_xml])
  105. self._buf = self._buf[idx_xml + len(self._OPEN):]
  106. self._open_tag = self._OPEN
  107. else:
  108. if idx_sg > 0:
  109. out.append(self._buf[:idx_sg])
  110. self._buf = self._buf[idx_sg + len(self._SGLANG_OPEN):]
  111. self._open_tag = self._SGLANG_OPEN
  112. self._inside = True
  113. return "".join(out)
  114. def flush(self) -> str:
  115. """流结束时调用,返回缓冲区剩余可输出内容。"""
  116. if self._inside:
  117. logger.warning("[模型流式调用] 流结束时仍在 <think> 块内,已丢弃尾部")
  118. self._buf = ""
  119. return ""
  120. result = self._buf
  121. self._buf = ""
  122. return result
  123. @staticmethod
  124. def _partial_match_len(buf: str, tag: str) -> int:
  125. """返回 buf 末尾匹配 tag 前缀的最大长度(避免标签被切断后误输出)。"""
  126. max_n = min(len(tag) - 1, len(buf))
  127. for n in range(max_n, 0, -1):
  128. if buf[-n:] == tag[:n]:
  129. return n
  130. return 0
  131. def _sync_retry_with_backoff(
  132. func: Callable,
  133. *args,
  134. max_retries: int = 3,
  135. backoff_factor: float = 1.0,
  136. trace_id: Optional[str] = None,
  137. model_name: Optional[str] = None,
  138. **kwargs
  139. ) -> Any:
  140. """
  141. 同步版本的带指数退避重试机制
  142. 注意:对于 502/503/504 等服务不可用错误,立即失败不重试
  143. """
  144. model_info = model_name or "default"
  145. def _is_server_unavailable_error(error: Exception) -> bool:
  146. """判断是否为服务端不可用错误(应立即失败)"""
  147. error_str = str(error).lower()
  148. unavailable_codes = ['502', '503', '504', 'internal server error']
  149. return any(code in error_str for code in unavailable_codes)
  150. for attempt in range(max_retries + 1):
  151. try:
  152. return func(*args, **kwargs)
  153. except Exception as e:
  154. error_str = str(e)
  155. if _is_non_retryable_model_error(e):
  156. logger.error(f"[模型调用] 鉴权/权限错误,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  157. raise
  158. # 服务端不可用错误(502/503/504)立即失败,不重试
  159. if _is_server_unavailable_error(e):
  160. logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  161. raise
  162. if attempt == max_retries:
  163. logger.error(f"[模型调用] 达到最大重试次数 {max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  164. raise
  165. wait_time = backoff_factor * (2 ** attempt)
  166. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
  167. time.sleep(wait_time)
  168. class GenerateModelClient:
  169. """
  170. 主要是生成式模型
  171. """
  172. def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0):
  173. # 获取默认模型
  174. self.llm = model_handler.get_models()
  175. self.chat = self.llm # 当前chat和llm使用相同模型
  176. # 配置参数
  177. self.default_timeout = default_timeout
  178. self.max_retries = max_retries
  179. self.backoff_factor = backoff_factor
  180. # 保存model_handler引用,用于动态获取模型
  181. self.model_handler = model_handler
  182. async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, trace_id: Optional[str] = None, model_name: Optional[str] = None, **kwargs):
  183. """
  184. 带指数退避的重试机制,每次重试都有独立的超时控制
  185. 注意:对于 502/503/504 等服务不可用错误,立即失败不重试,
  186. 避免在服务端过载时继续加重负载。
  187. """
  188. current_timeout = timeout or self.default_timeout
  189. model_info = model_name or "default"
  190. def _is_server_unavailable_error(error: Exception) -> bool:
  191. """判断是否为服务端不可用错误(应立即失败)"""
  192. error_str = str(error).lower()
  193. # 502: Bad Gateway, 503: Service Unavailable, 504: Gateway Timeout
  194. unavailable_codes = ['502', '503', '504', 'internal server error']
  195. return any(code in error_str for code in unavailable_codes)
  196. for attempt in range(self.max_retries + 1):
  197. try:
  198. # 每次重试都有独立的超时时间
  199. return await asyncio.wait_for(
  200. func(*args, **kwargs),
  201. timeout=current_timeout
  202. )
  203. except asyncio.TimeoutError as e:
  204. if attempt == self.max_retries:
  205. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终超时 | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
  206. raise TimeoutError(f"模型调用在 {self.max_retries} 次重试后均超时")
  207. wait_time = self.backoff_factor * (2 ** attempt)
  208. logger.warning(f"[模型调用] 第 {attempt + 1} 次超时, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}")
  209. await asyncio.sleep(wait_time)
  210. except Exception as e:
  211. error_str = str(e)
  212. if _is_non_retryable_model_error(e):
  213. logger.error(f"[模型调用] 鉴权/权限错误,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  214. raise
  215. # 服务端不可用错误(502/503/504)立即失败,不重试
  216. if _is_server_unavailable_error(e):
  217. logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  218. raise
  219. if attempt == self.max_retries:
  220. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}")
  221. raise
  222. wait_time = self.backoff_factor * (2 ** attempt)
  223. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}")
  224. await asyncio.sleep(wait_time)
  225. async def get_model_generate_invoke(
  226. self,
  227. trace_id: str,
  228. task_prompt_info: Optional[dict] = None,
  229. messages: Optional[List[BaseMessage]] = None,
  230. system_prompt: Optional[str] = None,
  231. user_prompt: Optional[str] = None,
  232. prompt: Optional[str] = None,
  233. timeout: Optional[int] = None,
  234. model_name: Optional[str] = None,
  235. enable_thinking: Optional[bool] = False,
  236. function_name: Optional[str] = None
  237. ) -> str:
  238. """模型非流式生成(异步)
  239. 支持多种调用方式(优先级从高到低):
  240. 1. messages: 直接传入 LangChain Message 对象列表
  241. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  242. 3. prompt: 传入单条用户提示词字符串
  243. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  244. Args:
  245. trace_id: 追踪ID
  246. task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
  247. messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
  248. system_prompt: 系统提示词字符串
  249. user_prompt: 用户提示词字符串
  250. prompt: 单条用户提示词字符串(无系统提示时使用)
  251. timeout: 超时时间(秒),默认使用构造时的 default_timeout
  252. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  253. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  254. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
  255. Returns:
  256. str: 模型生成的文本内容
  257. Raises:
  258. ValueError: 参数组合错误
  259. TimeoutError: 调用超时
  260. Exception: 模型调用异常
  261. Examples:
  262. # 方式1: 使用 Message 列表(推荐)
  263. messages = [SystemMessage(content="你是专家"), HumanMessage(content="请分析...")]
  264. result = await client.get_model_generate_invoke("trace-001", messages=messages)
  265. # 方式2: 分别传入系统和用户提示词
  266. result = await client.get_model_generate_invoke(
  267. "trace-001",
  268. system_prompt="你是专家",
  269. user_prompt="请分析..."
  270. )
  271. # 方式3: 传入单条提示词
  272. result = await client.get_model_generate_invoke("trace-001", prompt="请分析...")
  273. # 方式4: 兼容旧接口(使用 PromptLoader)
  274. task_prompt_info = {"task_prompt": chat_template}
  275. result = await client.get_model_generate_invoke("trace-001", task_prompt_info=task_prompt_info)
  276. # 方式5: 使用功能名称从配置加载模型
  277. result = await client.get_model_generate_invoke("trace-001", function_name="doc_classification_tertiary", system_prompt="...", user_prompt="...")
  278. """
  279. start_time = time.time()
  280. current_timeout = timeout or self.default_timeout
  281. # 如果提供了功能名称,从配置加载模型和 thinking 模式
  282. if function_name:
  283. try:
  284. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  285. config_model = get_model_for_function(function_name)
  286. config_thinking = get_thinking_mode_for_function(function_name)
  287. if config_model:
  288. model_name = config_model
  289. logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  290. if config_thinking is not None and enable_thinking is False:
  291. # 只有默认 False 时才覆盖,显式传入的参数优先
  292. enable_thinking = config_thinking
  293. logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  294. except Exception as e:
  295. logger.warning(f"[模型调用] 加载功能配置失败 [{function_name}]: {e}")
  296. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  297. if not model_name:
  298. try:
  299. from foundation.ai.models.model_config_loader import get_model_for_function
  300. model_name = get_model_for_function("default")
  301. logger.info(f"[模型调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  302. except Exception as e:
  303. logger.warning(f"[模型调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  304. try:
  305. # 选择模型
  306. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  307. logger.info(f"[模型调用] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  308. # 构建消息列表(按优先级)
  309. final_messages = self._build_messages(
  310. messages=messages,
  311. system_prompt=system_prompt,
  312. user_prompt=user_prompt,
  313. prompt=prompt,
  314. task_prompt_info=task_prompt_info
  315. )
  316. # 针对 Qwen3.5 模型处理思考模式
  317. model_to_invoke = llm_to_use
  318. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  319. if is_qwen35:
  320. if enable_thinking is False:
  321. # 显式禁用思考模式
  322. model_to_invoke = llm_to_use.bind(
  323. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  324. )
  325. logger.debug(f"[模型调用] 已禁用 Qwen3.5 思考模式: {model_name}")
  326. elif enable_thinking is True:
  327. # 显式启用思考模式
  328. model_to_invoke = llm_to_use.bind(
  329. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  330. )
  331. logger.debug(f"[模型调用] 已启用 Qwen3.5 思考模式: {model_name}")
  332. else:
  333. # enable_thinking is None,使用模型默认行为(通常是启用)
  334. logger.debug(f"[模型调用] 使用 Qwen3.5 默认思考模式: {model_name}")
  335. # 定义模型调用函数,使用原生 ainvoke
  336. async def _invoke():
  337. return await model_to_invoke.ainvoke(final_messages)
  338. # 调用带重试机制
  339. response = await self._retry_with_backoff(_invoke, timeout=current_timeout, trace_id=trace_id, model_name=model_name or "default")
  340. elapsed_time = time.time() - start_time
  341. logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  342. return _strip_thinking_content(response.content)
  343. except asyncio.TimeoutError:
  344. elapsed_time = time.time() - start_time
  345. logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
  346. raise TimeoutError(f"模型调用超时,trace_id: {trace_id}")
  347. except Exception as e:
  348. elapsed_time = time.time() - start_time
  349. logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  350. raise
  351. def _build_messages(
  352. self,
  353. messages: Optional[List[BaseMessage]] = None,
  354. system_prompt: Optional[str] = None,
  355. user_prompt: Optional[str] = None,
  356. prompt: Optional[str] = None,
  357. task_prompt_info: Optional[dict] = None
  358. ) -> List[BaseMessage]:
  359. """构建消息列表(内部方法)
  360. 优先级:messages > system_prompt+user_prompt > prompt > task_prompt_info
  361. """
  362. # 方式1: 直接使用传入的 Message 列表
  363. if messages is not None:
  364. if not isinstance(messages, list):
  365. raise ValueError("messages 必须是列表")
  366. if len(messages) == 0:
  367. raise ValueError("messages 不能为空列表")
  368. logger.debug(f"使用传入的 messages 列表,共 {len(messages)} 条消息")
  369. return messages
  370. # 方式2: system_prompt + user_prompt
  371. if system_prompt is not None and user_prompt is not None:
  372. logger.debug("使用 system_prompt + user_prompt 构建消息")
  373. return [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)]
  374. # 方式3: 单独 system_prompt(可能是特殊情况)
  375. if system_prompt is not None:
  376. logger.debug("使用单独的 system_prompt 构建消息")
  377. return [SystemMessage(content=system_prompt)]
  378. # 方式4: 单条 prompt 字符串
  379. if prompt is not None:
  380. logger.debug("使用单条 prompt 字符串构建消息")
  381. return [HumanMessage(content=prompt)]
  382. # 方式5: 兼容旧接口 task_prompt_info
  383. if task_prompt_info is not None:
  384. if "task_prompt" not in task_prompt_info:
  385. raise ValueError("task_prompt_info 必须包含 'task_prompt' 键")
  386. task_prompt = task_prompt_info["task_prompt"]
  387. if hasattr(task_prompt, 'format_messages'):
  388. logger.debug("使用 task_prompt_info 中的 ChatPromptTemplate 构建消息")
  389. return task_prompt.format_messages()
  390. elif isinstance(task_prompt, str):
  391. logger.debug("使用 task_prompt_info 中的字符串构建消息")
  392. return [HumanMessage(content=task_prompt)]
  393. else:
  394. raise ValueError(f"task_prompt 类型不支持: {type(task_prompt)}")
  395. # 没有提供任何有效参数
  396. raise ValueError(
  397. "必须提供以下参数之一: "
  398. "messages, system_prompt+user_prompt, prompt, 或 task_prompt_info"
  399. )
  400. def get_model_generate_invoke_sync(
  401. self,
  402. trace_id: str,
  403. task_prompt_info: Optional[dict] = None,
  404. messages: Optional[List[BaseMessage]] = None,
  405. system_prompt: Optional[str] = None,
  406. user_prompt: Optional[str] = None,
  407. prompt: Optional[str] = None,
  408. timeout: Optional[int] = None,
  409. model_name: Optional[str] = None,
  410. enable_thinking: Optional[bool] = False,
  411. function_name: Optional[str] = None
  412. ) -> str:
  413. """模型非流式生成(同步版本)
  414. 适用于同步上下文调用,功能与异步版本完全一致。
  415. 支持多种调用方式(优先级从高到低):
  416. 1. messages: 直接传入 LangChain Message 对象列表
  417. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  418. 3. prompt: 传入单条用户提示词字符串
  419. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  420. Args:
  421. trace_id: 追踪ID
  422. task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法
  423. messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage])
  424. system_prompt: 系统提示词字符串
  425. user_prompt: 用户提示词字符串
  426. prompt: 单条用户提示词字符串(无系统提示时使用)
  427. timeout: 超时时间(秒),默认使用构造时的 default_timeout(同步版本忽略)
  428. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  429. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  430. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置
  431. Returns:
  432. str: 模型生成的文本内容
  433. Raises:
  434. ValueError: 参数组合错误
  435. Exception: 模型调用异常
  436. Examples:
  437. # 同步调用(用于同步上下文)
  438. result = generate_model_client.get_model_generate_invoke_sync(
  439. "trace-001",
  440. system_prompt="你是专家",
  441. user_prompt="请分析...",
  442. function_name="doc_classification_secondary"
  443. )
  444. """
  445. start_time = time.time()
  446. # 如果提供了功能名称,从配置加载模型和 thinking 模式
  447. if function_name:
  448. try:
  449. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  450. config_model = get_model_for_function(function_name)
  451. config_thinking = get_thinking_mode_for_function(function_name)
  452. if config_model:
  453. model_name = config_model
  454. logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  455. if config_thinking is not None and enable_thinking is False:
  456. enable_thinking = config_thinking
  457. logger.info(f"[模型调用-同步] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  458. except Exception as e:
  459. logger.warning(f"[模型调用-同步] 加载功能配置失败 [{function_name}]: {e}")
  460. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  461. if not model_name:
  462. try:
  463. from foundation.ai.models.model_config_loader import get_model_for_function
  464. model_name = get_model_for_function("default")
  465. logger.info(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  466. except Exception as e:
  467. logger.warning(f"[模型调用-同步] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  468. try:
  469. # 选择模型
  470. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  471. logger.info(f"[模型调用-同步] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  472. # 构建消息列表(按优先级)
  473. final_messages = self._build_messages(
  474. messages=messages,
  475. system_prompt=system_prompt,
  476. user_prompt=user_prompt,
  477. prompt=prompt,
  478. task_prompt_info=task_prompt_info
  479. )
  480. # 针对 Qwen3.5 模型处理思考模式
  481. model_to_invoke = llm_to_use
  482. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  483. if is_qwen35:
  484. if enable_thinking is False:
  485. model_to_invoke = llm_to_use.bind(
  486. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  487. )
  488. logger.debug(f"[模型调用-同步] 已禁用 Qwen3.5 思考模式: {model_name}")
  489. elif enable_thinking is True:
  490. model_to_invoke = llm_to_use.bind(
  491. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  492. )
  493. logger.debug(f"[模型调用-同步] 已启用 Qwen3.5 思考模式: {model_name}")
  494. else:
  495. logger.debug(f"[模型调用-同步] 使用 Qwen3.5 默认思考模式: {model_name}")
  496. # 定义模型调用函数,使用同步 invoke
  497. def _invoke():
  498. return model_to_invoke.invoke(final_messages)
  499. # 调用带重试机制(同步版本)
  500. response = _sync_retry_with_backoff(
  501. _invoke,
  502. max_retries=self.max_retries,
  503. backoff_factor=self.backoff_factor,
  504. trace_id=trace_id,
  505. model_name=model_name or "default"
  506. )
  507. elapsed_time = time.time() - start_time
  508. logger.info(f"[模型调用-同步] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  509. return _strip_thinking_content(response.content)
  510. except Exception as e:
  511. elapsed_time = time.time() - start_time
  512. logger.error(f"[模型调用-同步] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  513. raise
  514. def get_model_generate_stream(
  515. self,
  516. trace_id: str,
  517. task_prompt_info: Optional[dict] = None,
  518. messages: Optional[List[BaseMessage]] = None,
  519. system_prompt: Optional[str] = None,
  520. user_prompt: Optional[str] = None,
  521. prompt: Optional[str] = None,
  522. timeout: Optional[int] = None,
  523. model_name: Optional[str] = None,
  524. function_name: Optional[str] = None
  525. ):
  526. """模型流式生成(同步生成器)
  527. 支持多种调用方式(优先级从高到低):
  528. 1. messages: 直接传入 LangChain Message 对象列表
  529. 2. system_prompt + user_prompt: 分别传入系统和用户提示词
  530. 3. prompt: 传入单条用户提示词字符串
  531. 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口)
  532. Args:
  533. trace_id: 追踪ID
  534. task_prompt_info: 任务提示词信息(兼容旧接口)
  535. messages: LangChain Message 对象列表
  536. system_prompt: 系统提示词字符串
  537. user_prompt: 用户提示词字符串
  538. prompt: 单条用户提示词字符串
  539. timeout: 超时时间(秒)
  540. model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等
  541. function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型配置
  542. Yields:
  543. str: 生成的文本块
  544. Raises:
  545. ValueError: 参数组合错误
  546. """
  547. start_time = time.time()
  548. current_timeout = timeout or self.default_timeout
  549. # 如果提供了功能名称,从配置加载模型
  550. if function_name:
  551. try:
  552. from foundation.ai.models.model_config_loader import get_model_for_function
  553. config_model = get_model_for_function(function_name)
  554. if config_model:
  555. model_name = config_model
  556. logger.info(f"[模型流式调用] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  557. except Exception as e:
  558. logger.warning(f"[模型流式调用] 加载功能配置失败 [{function_name}]: {e}")
  559. # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置
  560. if not model_name:
  561. try:
  562. from foundation.ai.models.model_config_loader import get_model_for_function
  563. model_name = get_model_for_function("default")
  564. logger.info(f"[模型流式调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  565. except Exception as e:
  566. logger.warning(f"[模型流式调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  567. try:
  568. # 选择模型
  569. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  570. logger.info(f"[模型流式调用] 使用{'指定' if model_name else '默认'}模型:{model_name or 'default'}, trace_id: {trace_id}")
  571. logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
  572. # 构建消息列表
  573. final_messages = self._build_messages(
  574. messages=messages,
  575. system_prompt=system_prompt,
  576. user_prompt=user_prompt,
  577. prompt=prompt,
  578. task_prompt_info=task_prompt_info
  579. )
  580. response = llm_to_use.stream(final_messages)
  581. chunk_count = 0
  582. think_filter = _ThinkingBlockStreamFilter()
  583. for chunk in response:
  584. chunk_count += 1
  585. if hasattr(chunk, 'content') and chunk.content:
  586. cleaned = think_filter.feed(chunk.content)
  587. if cleaned:
  588. yield cleaned
  589. elif chunk:
  590. yield chunk
  591. tail = think_filter.flush()
  592. if tail:
  593. yield tail
  594. elapsed_time = time.time() - start_time
  595. logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s")
  596. except Exception as e:
  597. elapsed_time = time.time() - start_time
  598. logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}")
  599. raise
  600. async def get_model_generate_invoke_stream(
  601. self,
  602. trace_id: str,
  603. system_prompt: Optional[str] = None,
  604. user_prompt: Optional[str] = None,
  605. prompt: Optional[str] = None,
  606. timeout: Optional[int] = None,
  607. model_name: Optional[str] = None,
  608. enable_thinking: Optional[bool] = False,
  609. function_name: Optional[str] = None,
  610. ) -> AsyncGenerator[str, None]:
  611. """模型流式生成(异步生成器)
  612. 内部用 worker 线程启动同步流式调用,通过 asyncio.Queue 投递 chunk,
  613. 实现真正的异步流式输出,不阻塞事件循环。
  614. Args:
  615. trace_id: 追踪ID
  616. system_prompt: 系统提示词
  617. user_prompt: 用户提示词
  618. prompt: 单条用户提示词字符串
  619. timeout: 超时时间(秒)
  620. model_name: 模型名称
  621. enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效)
  622. function_name: 功能名称(可选),如提供则从配置加载模型
  623. Yields:
  624. str: 生成的文本块
  625. """
  626. current_timeout = timeout or self.default_timeout
  627. # 加载模型配置(与异步版本一致的逻辑)
  628. if function_name:
  629. try:
  630. from foundation.ai.models.model_config_loader import get_model_for_function, get_thinking_mode_for_function
  631. config_model = get_model_for_function(function_name)
  632. config_thinking = get_thinking_mode_for_function(function_name)
  633. if config_model:
  634. model_name = config_model
  635. logger.info(f"[模型流式-异步] 从配置加载功能 '{function_name}' 的模型: {model_name}")
  636. if config_thinking is not None and enable_thinking is False:
  637. enable_thinking = config_thinking
  638. logger.info(f"[模型流式-异步] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}")
  639. except Exception as e:
  640. logger.warning(f"[模型流式-异步] 加载功能配置失败 [{function_name}]: {e}")
  641. if not model_name:
  642. try:
  643. from foundation.ai.models.model_config_loader import get_model_for_function
  644. model_name = get_model_for_function("default")
  645. logger.info(f"[模型流式-异步] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}")
  646. except Exception as e:
  647. logger.warning(f"[模型流式-异步] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型")
  648. # 选择模型并处理 Qwen3.5 thinking
  649. llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm
  650. logger.info(f"[模型流式-异步] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}")
  651. final_messages = self._build_messages(
  652. system_prompt=system_prompt,
  653. user_prompt=user_prompt,
  654. prompt=prompt,
  655. )
  656. model_to_invoke = llm_to_use
  657. is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower())
  658. if is_qwen35:
  659. if enable_thinking is False:
  660. model_to_invoke = llm_to_use.bind(
  661. extra_body={"chat_template_kwargs": {"enable_thinking": False}}
  662. )
  663. logger.debug(f"[模型流式-异步] 已禁用 Qwen3.5 思考模式: {model_name}")
  664. elif enable_thinking is True:
  665. model_to_invoke = llm_to_use.bind(
  666. extra_body={"chat_template_kwargs": {"enable_thinking": True}}
  667. )
  668. logger.debug(f"[模型流式-异步] 已启用 Qwen3.5 思考模式: {model_name}")
  669. # 用 Queue 桥接同步流式生成 → 异步消费
  670. q: asyncio.Queue = asyncio.Queue()
  671. sentinel = object() # 结束标记
  672. def _worker():
  673. """Worker 线程:运行同步流式生成器,把 chunk 放入 Queue。"""
  674. try:
  675. response = model_to_invoke.stream(final_messages)
  676. think_filter = _ThinkingBlockStreamFilter()
  677. for chunk in response:
  678. if hasattr(chunk, 'content') and chunk.content:
  679. cleaned = think_filter.feed(chunk.content)
  680. if cleaned:
  681. q.put_nowait(cleaned)
  682. elif chunk:
  683. text = chunk.content if hasattr(chunk, 'content') else str(chunk)
  684. if text:
  685. q.put_nowait(text)
  686. tail = think_filter.flush()
  687. if tail:
  688. q.put_nowait(tail)
  689. except Exception as e:
  690. logger.error(f"[模型流式-异步] worker 线程异常 trace_id: {trace_id}: {e}", exc_info=True)
  691. q.put_nowait(None) # None 表示异常
  692. finally:
  693. q.put_nowait(sentinel)
  694. thread = threading.Thread(target=_worker, daemon=True)
  695. thread.start()
  696. start_time = time.time()
  697. try:
  698. while True:
  699. item = await asyncio.wait_for(q.get(), timeout=current_timeout)
  700. if item is sentinel:
  701. break
  702. if item is None:
  703. logger.warning(f"[模型流式-异步] worker 线程发生异常,提前结束 trace_id: {trace_id}")
  704. break
  705. yield item
  706. elapsed_time = time.time() - start_time
  707. logger.info(f"[模型流式-异步] 完成 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  708. except asyncio.TimeoutError:
  709. elapsed_time = time.time() - start_time
  710. logger.error(f"[模型流式-异步] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
  711. raise TimeoutError(f"模型流式调用超时,trace_id: {trace_id}")
  712. generate_model_client = GenerateModelClient(default_timeout=60, max_retries=10, backoff_factor=0.5)