| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- # !/usr/bin/ python
- # -*- coding: utf-8 -*-
- '''
- @Project : lq-agent-api
- @File :model_generate.py
- @IDE :PyCharm
- @Author :
- @Date :2025/7/14 14:22
- '''
- from langchain_core.prompts import ChatPromptTemplate
- from foundation.ai.models.model_handler import model_handler
- from foundation.observability.logger.loggering import server_logger as logger
- import asyncio
- import time
- from typing import Optional, Callable, Any
- class GenerateModelClient:
- """
- 主要是生成式模型
- """
- def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0):
- # 获取默认模型
- self.llm = model_handler.get_models()
- self.chat = self.llm # 当前chat和llm使用相同模型
- # 配置参数
- self.default_timeout = default_timeout
- self.max_retries = max_retries
- self.backoff_factor = backoff_factor
- # 保存model_handler引用,用于动态获取模型
- self.model_handler = model_handler
- async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, **kwargs):
- """
- 带指数退避的重试机制,每次重试都有独立的超时控制
- """
- current_timeout = timeout or self.default_timeout
- for attempt in range(self.max_retries + 1):
- try:
- # 每次重试都有独立的超时时间
- return await asyncio.wait_for(
- func(*args, **kwargs),
- timeout=current_timeout
- )
- except asyncio.TimeoutError:
- if attempt == self.max_retries:
- logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终超时")
- raise TimeoutError(f"模型调用在 {self.max_retries} 次重试后均超时")
- wait_time = self.backoff_factor * (2 ** attempt)
- logger.warning(f"[模型调用] 第 {attempt + 1} 次超时, {wait_time}秒后重试...")
- await asyncio.sleep(wait_time)
- except Exception as e:
- if attempt == self.max_retries:
- logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {str(e)}")
- raise
- wait_time = self.backoff_factor * (2 ** attempt)
- logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {str(e)}, {wait_time}秒后重试...")
- await asyncio.sleep(wait_time)
- async def get_model_generate_invoke(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None, model_name: Optional[str] = None):
- """
- 模型非流式生成(异步)
- Args:
- trace_id: 追踪ID
- task_prompt_info: 任务提示词信息
- timeout: 超时时间(可选)
- model_name: 模型名称(可选),支持动态切换模型
- 支持的模型:doubao, qwen, deepseek, gemini,
- lq_qwen3_8b, lq_qwen3_8b_lq_lora,
- lq_qwen3_4b, qwen_local_14b
- 如果为None,则使用默认模型
- """
- start_time = time.time()
- current_timeout = int(timeout) or int(self.default_timeout)
- try:
- # 根据model_name选择对应的模型
- if model_name:
- llm_to_use = self.model_handler.get_model_by_name(model_name)
- logger.info(f"[模型调用] 使用指定模型: {model_name}, trace_id: {trace_id}")
- else:
- llm_to_use = self.llm
- logger.info(f"[模型调用] 使用默认模型, trace_id: {trace_id}")
- logger.info(f"[模型调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
- prompt_template = task_prompt_info["task_prompt"]
- messages = prompt_template.format_messages()
- async def _invoke_model():
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, llm_to_use.invoke, messages)
- # 调用带重试机制的方法,超时控制在重试机制内部处理
- response = await self._retry_with_backoff(_invoke_model, timeout=current_timeout)
- elapsed_time = time.time() - start_time
- logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
- return response.content
- except asyncio.TimeoutError:
- elapsed_time = time.time() - start_time
- logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
- raise TimeoutError(f"模型调用超时,trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
- except Exception as e:
- elapsed_time = time.time() - start_time
- logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}")
- raise
- def get_model_generate_stream(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None):
- """
- 模型流式生成(同步生成器)- 带异常处理
- """
- start_time = time.time()
- current_timeout = timeout or self.default_timeout
- try:
- logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
- prompt_template = task_prompt_info["task_prompt"]
- messages = prompt_template.format_messages()
- response = self.llm.stream(messages)
- chunk_count = 0
- for chunk in response:
- chunk_count += 1
- if hasattr(chunk, 'content') and chunk.content:
- yield chunk.content
- elif chunk: # 处理直接返回字符串的情况
- yield chunk
- elapsed_time = time.time() - start_time
- logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s")
- except Exception as e:
- elapsed_time = time.time() - start_time
- logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}")
- raise
- generate_model_client = GenerateModelClient(default_timeout=15, max_retries=2, backoff_factor=0.5)
|