# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :model_generate.py @IDE :PyCharm @Author : @Date :2025/7/14 14:22 ''' from langchain_core.prompts import ChatPromptTemplate from foundation.ai.models.model_handler import model_handler from foundation.observability.logger.loggering import server_logger as logger import asyncio import time from typing import Optional, Callable, Any class GenerateModelClient: """ 主要是生成式模型 """ def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0): # 获取默认模型 self.llm = model_handler.get_models() self.chat = self.llm # 当前chat和llm使用相同模型 # 配置参数 self.default_timeout = default_timeout self.max_retries = max_retries self.backoff_factor = backoff_factor # 保存model_handler引用,用于动态获取模型 self.model_handler = model_handler async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, **kwargs): """ 带指数退避的重试机制,每次重试都有独立的超时控制 """ current_timeout = timeout or self.default_timeout for attempt in range(self.max_retries + 1): try: # 每次重试都有独立的超时时间 return await asyncio.wait_for( func(*args, **kwargs), timeout=current_timeout ) except asyncio.TimeoutError: if attempt == self.max_retries: logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终超时") raise TimeoutError(f"模型调用在 {self.max_retries} 次重试后均超时") wait_time = self.backoff_factor * (2 ** attempt) logger.warning(f"[模型调用] 第 {attempt + 1} 次超时, {wait_time}秒后重试...") await asyncio.sleep(wait_time) except Exception as e: if attempt == self.max_retries: logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {str(e)}") raise wait_time = self.backoff_factor * (2 ** attempt) logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {str(e)}, {wait_time}秒后重试...") await asyncio.sleep(wait_time) async def get_model_generate_invoke(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None, model_name: Optional[str] = None): """ 模型非流式生成(异步) Args: trace_id: 追踪ID task_prompt_info: 任务提示词信息 timeout: 超时时间(可选) model_name: 模型名称(可选),支持动态切换模型 支持的模型:doubao, qwen, deepseek, gemini, lq_qwen3_8b, lq_qwen3_8b_lq_lora, lq_qwen3_4b, qwen_local_14b 如果为None,则使用默认模型 """ start_time = time.time() current_timeout = int(timeout) or int(self.default_timeout) try: # 根据model_name选择对应的模型 if model_name: llm_to_use = self.model_handler.get_model_by_name(model_name) logger.info(f"[模型调用] 使用指定模型: {model_name}, trace_id: {trace_id}") else: llm_to_use = self.llm logger.info(f"[模型调用] 使用默认模型, trace_id: {trace_id}") logger.info(f"[模型调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s") prompt_template = task_prompt_info["task_prompt"] messages = prompt_template.format_messages() async def _invoke_model(): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, llm_to_use.invoke, messages) # 调用带重试机制的方法,超时控制在重试机制内部处理 response = await self._retry_with_backoff(_invoke_model, timeout=current_timeout) elapsed_time = time.time() - start_time logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s") return response.content except asyncio.TimeoutError: elapsed_time = time.time() - start_time logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s") raise TimeoutError(f"模型调用超时,trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s") except Exception as e: elapsed_time = time.time() - start_time logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}") raise def get_model_generate_stream(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None): """ 模型流式生成(同步生成器)- 带异常处理 """ start_time = time.time() current_timeout = timeout or self.default_timeout try: logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s") prompt_template = task_prompt_info["task_prompt"] messages = prompt_template.format_messages() response = self.llm.stream(messages) chunk_count = 0 for chunk in response: chunk_count += 1 if hasattr(chunk, 'content') and chunk.content: yield chunk.content elif chunk: # 处理直接返回字符串的情况 yield chunk elapsed_time = time.time() - start_time logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s") except Exception as e: elapsed_time = time.time() - start_time logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}") raise generate_model_client = GenerateModelClient(default_timeout=15, max_retries=2, backoff_factor=0.5)