# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :model_generate.py @IDE :PyCharm @Author : @Date :2025/7/14 14:22 ''' from langchain_core.prompts import ChatPromptTemplate from langchain_core.messages import BaseMessage, SystemMessage, HumanMessage from foundation.ai.models.model_handler import model_handler from foundation.observability.logger.loggering import review_logger as logger import asyncio import time from typing import Optional, Callable, Any, List, Union class GenerateModelClient: """ 主要是生成式模型 """ def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0): # 获取默认模型 self.llm = model_handler.get_models() self.chat = self.llm # 当前chat和llm使用相同模型 # 配置参数 self.default_timeout = default_timeout self.max_retries = max_retries self.backoff_factor = backoff_factor # 保存model_handler引用,用于动态获取模型 self.model_handler = model_handler async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, trace_id: Optional[str] = None, model_name: Optional[str] = None, **kwargs): """ 带指数退避的重试机制,每次重试都有独立的超时控制 注意:对于 502/503/504 等服务不可用错误,立即失败不重试, 避免在服务端过载时继续加重负载。 """ current_timeout = timeout or self.default_timeout model_info = model_name or "default" def _is_server_unavailable_error(error: Exception) -> bool: """判断是否为服务端不可用错误(应立即失败)""" error_str = str(error).lower() # 502: Bad Gateway, 503: Service Unavailable, 504: Gateway Timeout unavailable_codes = ['502', '503', '504', 'internal server error'] return any(code in error_str for code in unavailable_codes) for attempt in range(self.max_retries + 1): try: # 每次重试都有独立的超时时间 return await asyncio.wait_for( func(*args, **kwargs), timeout=current_timeout ) except asyncio.TimeoutError as e: if attempt == self.max_retries: logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终超时 | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}") raise TimeoutError(f"模型调用在 {self.max_retries} 次重试后均超时") wait_time = self.backoff_factor * (2 ** attempt) logger.warning(f"[模型调用] 第 {attempt + 1} 次超时, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}, timeout: {current_timeout}s, error_type: {type(e).__name__}, error_msg: {str(e)}") await asyncio.sleep(wait_time) except Exception as e: error_str = str(e) # 服务端不可用错误(502/503/504)立即失败,不重试 if _is_server_unavailable_error(e): logger.error(f"[模型调用] 服务端不可用,立即失败: {error_str} | trace_id: {trace_id}, model: {model_info}") raise if attempt == self.max_retries: logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {error_str} | trace_id: {trace_id}, model: {model_info}") raise wait_time = self.backoff_factor * (2 ** attempt) logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {error_str}, {wait_time}秒后重试... | trace_id: {trace_id}, model: {model_info}") await asyncio.sleep(wait_time) async def get_model_generate_invoke( self, trace_id: str, task_prompt_info: Optional[dict] = None, messages: Optional[List[BaseMessage]] = None, system_prompt: Optional[str] = None, user_prompt: Optional[str] = None, prompt: Optional[str] = None, timeout: Optional[int] = None, model_name: Optional[str] = None, enable_thinking: Optional[bool] = False, function_name: Optional[str] = None ) -> str: """模型非流式生成(异步) 支持多种调用方式(优先级从高到低): 1. messages: 直接传入 LangChain Message 对象列表 2. system_prompt + user_prompt: 分别传入系统和用户提示词 3. prompt: 传入单条用户提示词字符串 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口) Args: trace_id: 追踪ID task_prompt_info: 任务提示词信息(兼容旧接口),需包含 format_messages() 方法 messages: LangChain Message 对象列表(如 [SystemMessage, HumanMessage]) system_prompt: 系统提示词字符串 user_prompt: 用户提示词字符串 prompt: 单条用户提示词字符串(无系统提示时使用) timeout: 超时时间(秒),默认使用构造时的 default_timeout model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等 enable_thinking: 是否启用思考模式,默认 False(仅对 Qwen3.5 系列模型有效) function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型和 thinking 配置 Returns: str: 模型生成的文本内容 Raises: ValueError: 参数组合错误 TimeoutError: 调用超时 Exception: 模型调用异常 Examples: # 方式1: 使用 Message 列表(推荐) messages = [SystemMessage(content="你是专家"), HumanMessage(content="请分析...")] result = await client.get_model_generate_invoke("trace-001", messages=messages) # 方式2: 分别传入系统和用户提示词 result = await client.get_model_generate_invoke( "trace-001", system_prompt="你是专家", user_prompt="请分析..." ) # 方式3: 传入单条提示词 result = await client.get_model_generate_invoke("trace-001", prompt="请分析...") # 方式4: 兼容旧接口(使用 PromptLoader) task_prompt_info = {"task_prompt": chat_template} result = await client.get_model_generate_invoke("trace-001", task_prompt_info=task_prompt_info) # 方式5: 使用功能名称从配置加载模型 result = await client.get_model_generate_invoke("trace-001", function_name="doc_classification_tertiary", system_prompt="...", user_prompt="...") """ start_time = time.time() current_timeout = timeout or self.default_timeout # 如果提供了功能名称,从配置加载模型和 thinking 模式 if function_name: try: from config.model_config_loader import get_model_for_function, get_thinking_mode_for_function config_model = get_model_for_function(function_name) config_thinking = get_thinking_mode_for_function(function_name) if config_model: model_name = config_model logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的模型: {model_name}") if config_thinking is not None and enable_thinking is False: # 只有默认 False 时才覆盖,显式传入的参数优先 enable_thinking = config_thinking logger.info(f"[模型调用] 从配置加载功能 '{function_name}' 的 thinking 模式: {enable_thinking}") except Exception as e: logger.warning(f"[模型调用] 加载功能配置失败 [{function_name}]: {e}") # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置 if not model_name: try: from config.model_config_loader import get_model_for_function model_name = get_model_for_function("default") logger.info(f"[模型调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}") except Exception as e: logger.warning(f"[模型调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型") try: # 选择模型 llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm logger.info(f"[模型调用] 使用{'指定' if model_name else '默认'}模型: {model_name or 'default'}, trace_id: {trace_id}") # 构建消息列表(按优先级) final_messages = self._build_messages( messages=messages, system_prompt=system_prompt, user_prompt=user_prompt, prompt=prompt, task_prompt_info=task_prompt_info ) # 针对 Qwen3.5 模型处理思考模式 model_to_invoke = llm_to_use is_qwen35 = model_name and ('qwen3.5' in model_name.lower() or 'qwen3_5' in model_name.lower()) if is_qwen35: if enable_thinking is False: # 显式禁用思考模式 model_to_invoke = llm_to_use.bind( extra_body={"chat_template_kwargs": {"enable_thinking": False}} ) logger.debug(f"[模型调用] 已禁用 Qwen3.5 思考模式: {model_name}") elif enable_thinking is True: # 显式启用思考模式 model_to_invoke = llm_to_use.bind( extra_body={"chat_template_kwargs": {"enable_thinking": True}} ) logger.debug(f"[模型调用] 已启用 Qwen3.5 思考模式: {model_name}") else: # enable_thinking is None,使用模型默认行为(通常是启用) logger.debug(f"[模型调用] 使用 Qwen3.5 默认思考模式: {model_name}") # 定义模型调用函数,使用原生 ainvoke async def _invoke(): return await model_to_invoke.ainvoke(final_messages) # 调用带重试机制 response = await self._retry_with_backoff(_invoke, timeout=current_timeout, trace_id=trace_id, model_name=model_name or "default") elapsed_time = time.time() - start_time logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s") return response.content except asyncio.TimeoutError: elapsed_time = time.time() - start_time logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s") raise TimeoutError(f"模型调用超时,trace_id: {trace_id}") except Exception as e: elapsed_time = time.time() - start_time logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}") raise def _build_messages( self, messages: Optional[List[BaseMessage]] = None, system_prompt: Optional[str] = None, user_prompt: Optional[str] = None, prompt: Optional[str] = None, task_prompt_info: Optional[dict] = None ) -> List[BaseMessage]: """构建消息列表(内部方法) 优先级:messages > system_prompt+user_prompt > prompt > task_prompt_info """ # 方式1: 直接使用传入的 Message 列表 if messages is not None: if not isinstance(messages, list): raise ValueError("messages 必须是列表") if len(messages) == 0: raise ValueError("messages 不能为空列表") logger.debug(f"使用传入的 messages 列表,共 {len(messages)} 条消息") return messages # 方式2: system_prompt + user_prompt if system_prompt is not None and user_prompt is not None: logger.debug("使用 system_prompt + user_prompt 构建消息") return [SystemMessage(content=system_prompt), HumanMessage(content=user_prompt)] # 方式3: 单独 system_prompt(可能是特殊情况) if system_prompt is not None: logger.debug("使用单独的 system_prompt 构建消息") return [SystemMessage(content=system_prompt)] # 方式4: 单条 prompt 字符串 if prompt is not None: logger.debug("使用单条 prompt 字符串构建消息") return [HumanMessage(content=prompt)] # 方式5: 兼容旧接口 task_prompt_info if task_prompt_info is not None: if "task_prompt" not in task_prompt_info: raise ValueError("task_prompt_info 必须包含 'task_prompt' 键") task_prompt = task_prompt_info["task_prompt"] if hasattr(task_prompt, 'format_messages'): logger.debug("使用 task_prompt_info 中的 ChatPromptTemplate 构建消息") return task_prompt.format_messages() elif isinstance(task_prompt, str): logger.debug("使用 task_prompt_info 中的字符串构建消息") return [HumanMessage(content=task_prompt)] else: raise ValueError(f"task_prompt 类型不支持: {type(task_prompt)}") # 没有提供任何有效参数 raise ValueError( "必须提供以下参数之一: " "messages, system_prompt+user_prompt, prompt, 或 task_prompt_info" ) def get_model_generate_stream( self, trace_id: str, task_prompt_info: Optional[dict] = None, messages: Optional[List[BaseMessage]] = None, system_prompt: Optional[str] = None, user_prompt: Optional[str] = None, prompt: Optional[str] = None, timeout: Optional[int] = None, model_name: Optional[str] = None, function_name: Optional[str] = None ): """模型流式生成(同步生成器) 支持多种调用方式(优先级从高到低): 1. messages: 直接传入 LangChain Message 对象列表 2. system_prompt + user_prompt: 分别传入系统和用户提示词 3. prompt: 传入单条用户提示词字符串 4. task_prompt_info: 传入包含 ChatPromptTemplate 的字典(兼容旧接口) Args: trace_id: 追踪ID task_prompt_info: 任务提示词信息(兼容旧接口) messages: LangChain Message 对象列表 system_prompt: 系统提示词字符串 user_prompt: 用户提示词字符串 prompt: 单条用户提示词字符串 timeout: 超时时间(秒) model_name: 模型名称(可选),支持 doubao/qwen/deepseek/gemini 等 function_name: 功能名称(可选),如提供则从 model_setting.yaml 加载模型配置 Yields: str: 生成的文本块 Raises: ValueError: 参数组合错误 """ start_time = time.time() current_timeout = timeout or self.default_timeout # 如果提供了功能名称,从配置加载模型 if function_name: try: from config.model_config_loader import get_model_for_function config_model = get_model_for_function(function_name) if config_model: model_name = config_model logger.info(f"[模型流式调用] 从配置加载功能 '{function_name}' 的模型: {model_name}") except Exception as e: logger.warning(f"[模型流式调用] 加载功能配置失败 [{function_name}]: {e}") # 如果没有指定模型名称,从 model_setting.yaml 读取默认配置 if not model_name: try: from config.model_config_loader import get_model_for_function model_name = get_model_for_function("default") logger.info(f"[模型流式调用] 从 model_setting.yaml 读取默认模型: {model_name}, trace_id: {trace_id}") except Exception as e: logger.warning(f"[模型流式调用] 从 model_setting.yaml 读取默认模型失败: {e},使用初始化模型") try: # 选择模型 llm_to_use = self.model_handler.get_model_by_name(model_name) if model_name else self.llm logger.info(f"[模型流式调用] 使用{'指定' if model_name else '默认'}模型:{model_name or 'default'}, trace_id: {trace_id}") logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s") # 构建消息列表 final_messages = self._build_messages( messages=messages, system_prompt=system_prompt, user_prompt=user_prompt, prompt=prompt, task_prompt_info=task_prompt_info ) response = llm_to_use.stream(final_messages) chunk_count = 0 for chunk in response: chunk_count += 1 if hasattr(chunk, 'content') and chunk.content: yield chunk.content elif chunk: yield chunk elapsed_time = time.time() - start_time logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s") except Exception as e: elapsed_time = time.time() - start_time logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误: {type(e).__name__}: {str(e)}") raise generate_model_client = GenerateModelClient(default_timeout=60, max_retries=10, backoff_factor=0.5)