qwen_service.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. """
  2. Qwen AI 服务
  3. """
  4. import httpx
  5. import json
  6. from typing import AsyncGenerator
  7. from utils.config import settings
  8. from utils.logger import logger
  9. from utils.prompt_loader import load_prompt
  10. class QwenService:
  11. def __init__(self):
  12. # 确保 API URL 包含完整路径
  13. base_url = settings.qwen3.api_url.rstrip('/')
  14. self.api_url = f"{base_url}/v1/chat/completions"
  15. self.model = settings.qwen3.model
  16. # 意图识别使用专门的配置
  17. intent_base_url = settings.intent.api_url.rstrip('/')
  18. self.intent_api_url = f"{intent_base_url}/v1/chat/completions"
  19. self.intent_model = settings.intent.model
  20. async def extract_keywords(self, question: str) -> str:
  21. """从问题中提炼搜索关键词"""
  22. # 使用prompt加载器加载关键词提取prompt(如果配置了的话)
  23. # 这里暂时保留原有逻辑,可以后续添加到prompt配置中
  24. keyword_prompt = """你是一个关键词提取助手。请从用户的问题中提炼出最核心的搜索关键词。
  25. 要求:
  26. 1. 提取2-5个最关键的词语
  27. 2. 去除语气词、助词等无意义词汇
  28. 3. 保留专业术语和核心概念
  29. 4. 以空格分隔多个关键词
  30. 直接返回关键词,不要其他说明。
  31. 用户问题:"""
  32. messages = [
  33. {"role": "system", "content": keyword_prompt},
  34. {"role": "user", "content": question}
  35. ]
  36. try:
  37. keywords = await self.chat(messages)
  38. return keywords.strip()
  39. except Exception as e:
  40. logger.error(f"关键词提取失败: {e}")
  41. # 失败时返回原问题
  42. return question
  43. async def intent_recognition(self, message: str) -> dict:
  44. """意图识别"""
  45. # 使用prompt加载器加载意图识别prompt
  46. intent_prompt = load_prompt("intent_recognition", userMessage=message)
  47. messages = [
  48. {"role": "user", "content": intent_prompt}
  49. ]
  50. try:
  51. # 使用专门的意图识别API和模型
  52. response = await self.chat(messages, model=self.intent_model, api_url=self.intent_api_url)
  53. logger.info(f"意图识别原始响应: {response[:500]}")
  54. # 尝试解析JSON - 使用支持多行和嵌套的正则
  55. import re
  56. # 先去除 markdown 代码块标记
  57. cleaned = re.sub(r'```(?:json)?\s*', '', response)
  58. cleaned = cleaned.strip()
  59. # 匹配最外层的 { ... }(支持多行和嵌套)
  60. json_match = re.search(r'\{.*\}', cleaned, re.DOTALL)
  61. if json_match:
  62. result = json.loads(json_match.group())
  63. # 兼容模板输出的 "intent" 和 "intent_type" 两种字段名
  64. intent_type = (result.get("intent_type") or result.get("intent") or "").lower()
  65. # 统一设置 intent_type 字段,确保下游一致
  66. result["intent_type"] = intent_type
  67. # 优先使用模型返回的 direct_answer,否则使用预设回复
  68. direct_answer = result.get("direct_answer", "")
  69. if intent_type in ("greeting", "问候"):
  70. result["response"] = direct_answer if direct_answer else "您好!我是蜀道集团智能助手,很高兴为您服务。"
  71. elif intent_type in ("faq", "常见问题"):
  72. result["response"] = direct_answer if direct_answer else "我可以帮您解答常见问题,请告诉我您想了解什么。"
  73. else:
  74. result["response"] = direct_answer or ""
  75. return result
  76. logger.warning(f"意图识别JSON解析失败,原始响应: {response[:300]}")
  77. return {"intent_type": "general_chat", "confidence": 0.5, "reason": "无法解析JSON", "response": ""}
  78. except Exception as e:
  79. logger.error(f"意图识别失败: {e}")
  80. return {"intent_type": "general_chat", "confidence": 0.5, "reason": str(e), "response": ""}
  81. async def chat(self, messages: list, model: str = None, api_url: str = None) -> str:
  82. """同步聊天"""
  83. data = {
  84. "model": model or self.model,
  85. "messages": messages,
  86. "stream": False # 明确指定非流式
  87. }
  88. # 使用指定的API URL,默认使用qwen3的URL
  89. target_url = api_url or self.api_url
  90. # 详细请求日志
  91. logger.info(f"[Qwen API] 请求 URL: {target_url}")
  92. logger.info(f"[Qwen API] 使用模型: {data['model']}")
  93. logger.info(f"[Qwen API] 消息数量: {len(messages)}")
  94. try:
  95. # 准备请求头
  96. headers = {
  97. "Content-Type": "application/json"
  98. }
  99. # 如果配置中有 token,添加到请求头(兼容需要认证的场景)
  100. if hasattr(settings, 'intent') and hasattr(settings.intent, 'token') and api_url == self.intent_api_url:
  101. if settings.intent.token:
  102. headers["Authorization"] = f"Bearer {settings.intent.token}"
  103. logger.info("[Qwen API] 已添加 Intent API Authorization header")
  104. elif hasattr(settings, 'qwen3') and hasattr(settings.qwen3, 'token') and api_url == self.api_url:
  105. if settings.qwen3.token:
  106. headers["Authorization"] = f"Bearer {settings.qwen3.token}"
  107. logger.info("[Qwen API] 已添加 Qwen3 API Authorization header")
  108. async with httpx.AsyncClient(timeout=60.0) as client:
  109. response = await client.post(
  110. target_url,
  111. json=data,
  112. headers=headers
  113. )
  114. logger.info(f"[Qwen API] 响应状态码: {response.status_code}")
  115. logger.info(f"[Qwen API] 响应头: {dict(response.headers)}")
  116. # 记录响应内容(前500字符用于调试)
  117. response_preview = response.text[:500] if response.text else "(空响应)"
  118. logger.info(f"[Qwen API] 响应预览: {response_preview}")
  119. response.raise_for_status()
  120. # 检查响应是否为空
  121. if not response.text:
  122. logger.error("[Qwen API] 返回空响应")
  123. return ""
  124. # 检查是否是流式响应(以 data: 开头)
  125. if response.text.startswith("data:"):
  126. logger.info("[Qwen API] 检测到流式响应,解析 SSE 格式")
  127. # 解析 SSE 格式
  128. content_parts = []
  129. for line in response.text.split('\n'):
  130. if line.startswith("data:"):
  131. data_str = line[5:].strip()
  132. if data_str and data_str != "[DONE]":
  133. try:
  134. data_json = json.loads(data_str)
  135. delta_content = data_json.get('choices', [{}])[0].get('delta', {}).get('content', '')
  136. if delta_content:
  137. content_parts.append(delta_content)
  138. except json.JSONDecodeError:
  139. continue
  140. final_content = ''.join(content_parts)
  141. logger.info(f"[Qwen API] SSE 解析完成,内容长度: {len(final_content)}")
  142. return final_content
  143. # 普通 JSON 响应
  144. try:
  145. result = response.json()
  146. content = result.get('response', result.get('choices', [{}])[0].get('message', {}).get('content', ''))
  147. logger.info(f"[Qwen API] JSON 解析成功,内容长度: {len(content)}")
  148. return content
  149. except json.JSONDecodeError as je:
  150. logger.error(f"[Qwen API] 响应不是有效的 JSON: {response.text[:200]}")
  151. raise ValueError(f"无效的 JSON 响应: {str(je)}")
  152. except httpx.HTTPStatusError as e:
  153. logger.error(f"[Qwen API] HTTP 错误 - 状态码: {e.response.status_code}, URL: {target_url}")
  154. logger.error(f"[Qwen API] HTTP 错误响应: {e.response.text[:500]}")
  155. raise
  156. except httpx.RequestError as e:
  157. logger.error(f"[Qwen API] 请求错误 - URL: {target_url}, 错误: {type(e).__name__}: {str(e)}")
  158. raise
  159. except Exception as e:
  160. logger.error(f"[Qwen API] 未知错误 - URL: {target_url}, 模型: {data['model']}, 错误: {type(e).__name__}: {str(e)}")
  161. raise
  162. async def stream_chat(self, messages: list) -> AsyncGenerator[str, None]:
  163. """流式聊天"""
  164. data = {
  165. "model": self.model,
  166. "messages": messages,
  167. "stream": True
  168. }
  169. try:
  170. async with httpx.AsyncClient(timeout=120.0) as client:
  171. async with client.stream(
  172. "POST",
  173. self.api_url,
  174. json=data
  175. ) as response:
  176. response.raise_for_status()
  177. async for line in response.aiter_lines():
  178. if line.startswith("data: "):
  179. data_str = line[6:]
  180. if data_str == "[DONE]":
  181. break
  182. try:
  183. data_json = json.loads(data_str)
  184. # 兼容 OpenAI 格式 choices[0].delta.content
  185. choices = data_json.get('choices', [])
  186. if choices:
  187. content = choices[0].get('delta', {}).get('content', '') or choices[0].get('message', {}).get('content', '')
  188. else:
  189. content = data_json.get('content', '')
  190. if content:
  191. yield content
  192. except json.JSONDecodeError:
  193. continue
  194. except Exception as e:
  195. logger.error(f"Qwen 流式 API 调用失败: {e}")
  196. raise
  197. # 全局实例
  198. qwen_service = QwenService()