model_generate.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # !/usr/bin/ python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Project : lq-agent-api
  5. @File :model_generate.py
  6. @IDE :PyCharm
  7. @Author :
  8. @Date :2025/7/14 14:22
  9. '''
  10. from langchain_core.prompts import ChatPromptTemplate
  11. from foundation.ai.models.model_handler import model_handler
  12. from foundation.observability.logger.loggering import server_logger as logger
  13. import asyncio
  14. import time
  15. from typing import Optional, Callable, Any
  16. class GenerateModelClient:
  17. """
  18. 主要是生成式模型
  19. """
  20. def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0):
  21. # 获取默认模型
  22. self.llm = model_handler.get_models()
  23. self.chat = self.llm # 当前chat和llm使用相同模型
  24. # 配置参数
  25. self.default_timeout = default_timeout
  26. self.max_retries = max_retries
  27. self.backoff_factor = backoff_factor
  28. # 保存model_handler引用,用于动态获取模型
  29. self.model_handler = model_handler
  30. async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, **kwargs):
  31. """
  32. 带指数退避的重试机制,每次重试都有独立的超时控制
  33. """
  34. current_timeout = timeout or self.default_timeout
  35. for attempt in range(self.max_retries + 1):
  36. try:
  37. # 每次重试都有独立的超时时间
  38. return await asyncio.wait_for(
  39. func(*args, **kwargs),
  40. timeout=current_timeout
  41. )
  42. except asyncio.TimeoutError:
  43. if attempt == self.max_retries:
  44. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终超时")
  45. raise TimeoutError(f"模型调用在 {self.max_retries} 次重试后均超时")
  46. wait_time = self.backoff_factor * (2 ** attempt)
  47. logger.warning(f"[模型调用] 第 {attempt + 1} 次超时, {wait_time}秒后重试...")
  48. await asyncio.sleep(wait_time)
  49. except Exception as e:
  50. if attempt == self.max_retries:
  51. logger.error(f"[模型调用] 达到最大重试次数 {self.max_retries},最终失败: {str(e)}")
  52. raise
  53. wait_time = self.backoff_factor * (2 ** attempt)
  54. logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {str(e)}, {wait_time}秒后重试...")
  55. await asyncio.sleep(wait_time)
  56. async def get_model_generate_invoke(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None, model_name: Optional[str] = None):
  57. """
  58. 模型非流式生成(异步)
  59. Args:
  60. trace_id: 追踪ID
  61. task_prompt_info: 任务提示词信息
  62. timeout: 超时时间(可选)
  63. model_name: 模型名称(可选),支持动态切换模型
  64. 支持的模型:doubao, qwen, deepseek, gemini,
  65. lq_qwen3_8b, lq_qwen3_8b_lq_lora,
  66. lq_qwen3_4b, qwen_local_14b
  67. 如果为None,则使用默认模型
  68. """
  69. start_time = time.time()
  70. current_timeout = int(timeout) or int(self.default_timeout)
  71. try:
  72. # 根据model_name选择对应的模型
  73. if model_name:
  74. llm_to_use = self.model_handler.get_model_by_name(model_name)
  75. logger.info(f"[模型调用] 使用指定模型: {model_name}, trace_id: {trace_id}")
  76. else:
  77. llm_to_use = self.llm
  78. logger.info(f"[模型调用] 使用默认模型, trace_id: {trace_id}")
  79. logger.info(f"[模型调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
  80. prompt_template = task_prompt_info["task_prompt"]
  81. messages = prompt_template.format_messages()
  82. async def _invoke_model():
  83. loop = asyncio.get_event_loop()
  84. return await loop.run_in_executor(None, llm_to_use.invoke, messages)
  85. # 调用带重试机制的方法,超时控制在重试机制内部处理
  86. response = await self._retry_with_backoff(_invoke_model, timeout=current_timeout)
  87. elapsed_time = time.time() - start_time
  88. logger.info(f"[模型调用] 成功 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  89. return response.content
  90. except asyncio.TimeoutError:
  91. elapsed_time = time.time() - start_time
  92. logger.error(f"[模型调用] 超时 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 超时阈值: {current_timeout}s")
  93. raise TimeoutError(f"模型调用超时,trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s")
  94. except Exception as e:
  95. elapsed_time = time.time() - start_time
  96. logger.error(f"[模型调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}")
  97. raise
  98. def get_model_generate_stream(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None):
  99. """
  100. 模型流式生成(同步生成器)- 带异常处理
  101. """
  102. start_time = time.time()
  103. current_timeout = timeout or self.default_timeout
  104. try:
  105. logger.info(f"[模型流式调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
  106. prompt_template = task_prompt_info["task_prompt"]
  107. messages = prompt_template.format_messages()
  108. response = self.llm.stream(messages)
  109. chunk_count = 0
  110. for chunk in response:
  111. chunk_count += 1
  112. if hasattr(chunk, 'content') and chunk.content:
  113. yield chunk.content
  114. elif chunk: # 处理直接返回字符串的情况
  115. yield chunk
  116. elapsed_time = time.time() - start_time
  117. logger.info(f"[模型流式调用] 成功 trace_id: {trace_id}, 生成块数: {chunk_count}, 耗时: {elapsed_time:.2f}s")
  118. except Exception as e:
  119. elapsed_time = time.time() - start_time
  120. logger.error(f"[模型流式调用] 异常 trace_id: {trace_id}, 耗时: {elapsed_time:.2f}s, 错误类型: {type(e).__name__}, 错误信息: {str(e)}")
  121. raise
  122. generate_model_client = GenerateModelClient(default_timeout=15, max_retries=2, backoff_factor=0.5)