qwen_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. """
  2. Qwen AI 服务
  3. """
  4. import httpx
  5. import json
  6. from typing import AsyncGenerator
  7. from utils.config import settings
  8. from utils.logger import logger
  9. from utils.prompt_loader import load_prompt
  10. from services.deepseek_service import deepseek_service
  11. class QwenService:
  12. def __init__(self):
  13. # 确保 API URL 包含完整路径
  14. base_url = settings.qwen3.api_url.rstrip('/')
  15. self.api_url = f"{base_url}/v1/chat/completions"
  16. self.model = settings.qwen3.model
  17. # 意图识别使用专门的配置
  18. intent_base_url = settings.intent.api_url.rstrip('/')
  19. self.intent_api_url = f"{intent_base_url}/v1/chat/completions"
  20. self.intent_model = settings.intent.model
  21. def _should_fallback(self, status_code: int) -> bool:
  22. return status_code in (429, 500, 502, 503, 504)
  23. async def _fallback_deepseek(self, messages: list) -> str:
  24. try:
  25. logger.warning("[Qwen API] Falling back to DeepSeek due to upstream error")
  26. return await deepseek_service.chat(messages)
  27. except Exception as e:
  28. error_msg = str(e).strip() if str(e).strip() else type(e).__name__
  29. logger.error(f"[Qwen API] DeepSeek fallback failed: {type(e).__name__}: {error_msg}")
  30. raise RuntimeError(f"AI服务暂时不可用,主模型和备用模型均无法响应({type(e).__name__}),请稍后重试") from e
  31. async def extract_keywords(self, question: str) -> str:
  32. """从问题中提炼搜索关键词"""
  33. # 使用prompt加载器加载关键词提取prompt(如果配置了的话)
  34. # 这里暂时保留原有逻辑,可以后续添加到prompt配置中
  35. keyword_prompt = """你是一个关键词提取助手。请从用户的问题中提炼出最核心的搜索关键词。
  36. 要求:
  37. 1. 提取2-5个最关键的词语
  38. 2. 去除语气词、助词等无意义词汇
  39. 3. 保留专业术语和核心概念
  40. 4. 以空格分隔多个关键词
  41. 直接返回关键词,不要其他说明。
  42. 用户问题:"""
  43. messages = [
  44. {"role": "system", "content": keyword_prompt},
  45. {"role": "user", "content": question}
  46. ]
  47. try:
  48. keywords = await self.chat(messages)
  49. return keywords.strip()
  50. except Exception as e:
  51. logger.error(f"关键词提取失败: {e}")
  52. # 失败时返回原问题
  53. return question
  54. async def intent_recognition(self, message: str) -> dict:
  55. """意图识别"""
  56. # 使用prompt加载器加载意图识别prompt
  57. intent_prompt = load_prompt("intent_recognition", userMessage=message)
  58. messages = [
  59. {"role": "user", "content": intent_prompt}
  60. ]
  61. try:
  62. # 使用专门的意图识别API和模型
  63. response = await self.chat(messages, model=self.intent_model, api_url=self.intent_api_url)
  64. logger.info(f"意图识别原始响应: {response[:500]}")
  65. # 尝试解析JSON - 使用支持多行和嵌套的正则
  66. import re
  67. # 先去除 markdown 代码块标记
  68. cleaned = re.sub(r'```(?:json)?\s*', '', response)
  69. cleaned = cleaned.strip()
  70. # 匹配最外层的 { ... }(支持多行和嵌套)
  71. json_match = re.search(r'\{.*\}', cleaned, re.DOTALL)
  72. if json_match:
  73. result = json.loads(json_match.group())
  74. # 兼容模板输出的 "intent" 和 "intent_type" 两种字段名
  75. intent_type = (result.get("intent_type") or result.get("intent") or "").lower()
  76. # 统一设置 intent_type 字段,确保下游一致
  77. result["intent_type"] = intent_type
  78. # 优先使用模型返回的 direct_answer,否则使用预设回复
  79. direct_answer = result.get("direct_answer", "")
  80. if intent_type in ("greeting", "问候"):
  81. result["response"] = direct_answer if direct_answer else "您好!我是蜀道集团智能助手,很高兴为您服务。"
  82. elif intent_type in ("faq", "常见问题"):
  83. result["response"] = direct_answer if direct_answer else "我可以帮您解答常见问题,请告诉我您想了解什么。"
  84. else:
  85. result["response"] = direct_answer or ""
  86. return result
  87. logger.warning(f"意图识别JSON解析失败,原始响应: {response[:300]}")
  88. return {"intent_type": "general_chat", "confidence": 0.5, "reason": "无法解析JSON", "response": ""}
  89. except Exception as e:
  90. logger.error(f"意图识别失败: {e}")
  91. return {"intent_type": "general_chat", "confidence": 0.5, "reason": str(e), "response": ""}
  92. async def chat(self, messages: list, model: str = None, api_url: str = None) -> str:
  93. """同步聊天"""
  94. data = {
  95. "model": model or self.model,
  96. "messages": messages,
  97. "stream": False # 明确指定非流式
  98. }
  99. # 使用指定的API URL,默认使用qwen3的URL
  100. target_url = api_url or self.api_url
  101. normalized_target = target_url.rstrip("/")
  102. is_qwen3_target = normalized_target == self.api_url.rstrip("/")
  103. # 详细请求日志
  104. logger.info(f"[Qwen API] 请求 URL: {target_url}")
  105. logger.info(f"[Qwen API] 使用模型: {data['model']}")
  106. logger.info(f"[Qwen API] 消息数量: {len(messages)}")
  107. try:
  108. # 准备请求头
  109. headers = {
  110. "Content-Type": "application/json"
  111. }
  112. # 如果配置中有 token,添加到请求头(兼容需要认证的场景)
  113. if hasattr(settings, 'intent') and hasattr(settings.intent, 'token') and normalized_target == self.intent_api_url.rstrip("/"):
  114. if settings.intent.token:
  115. headers["Authorization"] = f"Bearer {settings.intent.token}"
  116. logger.info("[Qwen API] 已添加 Intent API Authorization header")
  117. elif hasattr(settings, 'qwen3') and hasattr(settings.qwen3, 'token') and normalized_target == self.api_url.rstrip("/"):
  118. if settings.qwen3.token:
  119. headers["Authorization"] = f"Bearer {settings.qwen3.token}"
  120. logger.info("[Qwen API] 已添加 Qwen3 API Authorization header")
  121. async with httpx.AsyncClient(timeout=120.0) as client:
  122. response = await client.post(
  123. target_url,
  124. json=data,
  125. headers=headers
  126. )
  127. logger.info(f"[Qwen API] 响应状态码: {response.status_code}")
  128. logger.info(f"[Qwen API] 响应头: {dict(response.headers)}")
  129. # 记录响应内容(前500字符用于调试)
  130. response_preview = response.text[:500] if response.text else "(空响应)"
  131. logger.info(f"[Qwen API] 响应预览: {response_preview}")
  132. response.raise_for_status()
  133. # 检查响应是否为空
  134. if not response.text:
  135. logger.error("[Qwen API] 返回空响应")
  136. return ""
  137. # 检查是否是流式响应(以 data: 开头)
  138. if response.text.startswith("data:"):
  139. logger.info("[Qwen API] 检测到流式响应,解析 SSE 格式")
  140. # 解析 SSE 格式
  141. content_parts = []
  142. for line in response.text.split('\n'):
  143. if line.startswith("data:"):
  144. data_str = line[5:].strip()
  145. if data_str and data_str != "[DONE]":
  146. try:
  147. data_json = json.loads(data_str)
  148. delta_content = data_json.get('choices', [{}])[0].get('delta', {}).get('content', '')
  149. if delta_content:
  150. content_parts.append(delta_content)
  151. except json.JSONDecodeError:
  152. continue
  153. final_content = ''.join(content_parts)
  154. logger.info(f"[Qwen API] SSE 解析完成,内容长度: {len(final_content)}")
  155. return final_content
  156. # 普通 JSON 响应
  157. try:
  158. result = response.json()
  159. content = result.get('response', result.get('choices', [{}])[0].get('message', {}).get('content', ''))
  160. logger.info(f"[Qwen API] JSON 解析成功,内容长度: {len(content)}")
  161. return content
  162. except json.JSONDecodeError as je:
  163. logger.error(f"[Qwen API] 响应不是有效的 JSON: {response.text[:200]}")
  164. raise ValueError(f"无效的 JSON 响应: {str(je)}")
  165. except httpx.HTTPStatusError as e:
  166. logger.error(f"[Qwen API] HTTP 错误 - 状态码: {e.response.status_code}, URL: {target_url}")
  167. logger.error(f"[Qwen API] HTTP 错误响应: {e.response.text[:500]}")
  168. if is_qwen3_target and self._should_fallback(e.response.status_code):
  169. return await self._fallback_deepseek(messages)
  170. raise
  171. except httpx.RequestError as e:
  172. logger.error(f"[Qwen API] 请求错误 - URL: {target_url}, 错误: {type(e).__name__}: {str(e)}")
  173. if is_qwen3_target:
  174. return await self._fallback_deepseek(messages)
  175. raise
  176. except Exception as e:
  177. logger.error(f"[Qwen API] 未知错误 - URL: {target_url}, 模型: {data['model']}, 错误: {type(e).__name__}: {str(e)}")
  178. raise
  179. async def stream_chat(self, messages: list) -> AsyncGenerator[str, None]:
  180. """流式聊天"""
  181. data = {
  182. "model": self.model,
  183. "messages": messages,
  184. "stream": True
  185. }
  186. try:
  187. async with httpx.AsyncClient(timeout=120.0) as client:
  188. async with client.stream(
  189. "POST",
  190. self.api_url,
  191. json=data
  192. ) as response:
  193. response.raise_for_status()
  194. async for line in response.aiter_lines():
  195. if line.startswith("data: "):
  196. data_str = line[6:]
  197. if data_str == "[DONE]":
  198. break
  199. try:
  200. data_json = json.loads(data_str)
  201. # 兼容 OpenAI 格式 choices[0].delta.content
  202. choices = data_json.get('choices', [])
  203. if choices:
  204. content = choices[0].get('delta', {}).get('content', '') or choices[0].get('message', {}).get('content', '')
  205. else:
  206. content = data_json.get('content', '')
  207. if content:
  208. yield content
  209. except json.JSONDecodeError:
  210. continue
  211. except httpx.HTTPStatusError as e:
  212. status_code = e.response.status_code if e.response else 0
  213. logger.error(f"Qwen stream HTTP error: {status_code}")
  214. if self._should_fallback(status_code):
  215. logger.warning("[Qwen API] Stream fallback to DeepSeek")
  216. async for chunk in deepseek_service.stream_chat(messages):
  217. yield chunk
  218. return
  219. raise
  220. except httpx.RequestError as e:
  221. logger.error(f"Qwen stream request error: {type(e).__name__}: {e}")
  222. logger.warning("[Qwen API] Stream fallback to DeepSeek")
  223. async for chunk in deepseek_service.stream_chat(messages):
  224. yield chunk
  225. return
  226. except Exception as e:
  227. logger.error(f"Qwen 流式 API 调用失败: {e}")
  228. raise
  229. # 全局实例
  230. qwen_service = QwenService()