# !/usr/bin/ python # -*- coding: utf-8 -*- ''' @Project : lq-agent-api @File :model_generate.py @IDE :PyCharm @Author : @Date :2025/7/14 14:22 ''' from langchain_core.prompts import ChatPromptTemplate from foundation.ai.models.model_handler import get_models from foundation.observability.logger.loggering import server_logger as logger import asyncio import time from typing import Optional, Callable, Any class GenerateModelClient: """ 主要是生成式模型 """ def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0): # 获取部署的模型列表 llm, chat, embed = get_models() self.llm = llm self.chat = chat # 配置参数 self.default_timeout = default_timeout self.max_retries = max_retries self.backoff_factor = backoff_factor async def _retry_with_backoff(self, func: Callable, *args, **kwargs): """ 带指数退避的重试机制 """ for attempt in range(self.max_retries + 1): try: return await func(*args, **kwargs) except Exception as e: if attempt == self.max_retries: logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {str(e)}") raise wait_time = self.backoff_factor * (2 ** attempt) logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {str(e)}, {wait_time}秒后重试...") await asyncio.sleep(wait_time) async def get_model_generate_invoke(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None): """ 模型非流式生成(异步) """ start_time = time.time() current_timeout = timeout or self.default_timeout try: logger.info(f"[模型调用] 开始处理 trace_id: {trace_id}, 超时时间: {current_timeout}s") prompt_template = task_prompt_info["task_prompt"] messages = prompt_template.format_messages() async def _invoke_model(): loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self.llm.invoke, messages) # 使用超时包装调用 response = await asyncio.wait_for( self._retry_with_backoff(_invoke_model), timeout=current_timeout ) elapsed_time = time.time() - start_time logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s") return response.content except asyncio.TimeoutError: elapsed_time = time.time() - start_time logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s") raise TimeoutError(f"模型调用超时,trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s") except Exception as e: elapsed_time = time.time() - start_time logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}") raise def get_model_generate_stream(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None): """ 模型流式生成(同步生成器)- 带异常处理 """ start_time = time.time() current_timeout = timeout or self.default_timeout try: logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时时间: {current_timeout}s") prompt_template = task_prompt_info["task_prompt"] messages = prompt_template.format_messages() response = self.llm.stream(messages) chunk_count = 0 for chunk in response: chunk_count += 1 if hasattr(chunk, 'content') and chunk.content: yield chunk.content elif chunk: # 处理直接返回字符串的情况 yield chunk elapsed_time = time.time() - start_time logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s") except Exception as e: elapsed_time = time.time() - start_time logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}") raise generate_model_client = GenerateModelClient(default_timeout=120, max_retries=3, backoff_factor=1.0)