qwen_service.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. """
  2. Qwen AI 服务
  3. """
  4. import httpx
  5. import json
  6. from typing import AsyncGenerator
  7. from utils.config import settings
  8. from utils.logger import logger
  9. from utils.prompt_loader import load_prompt
  10. class QwenService:
  11. def __init__(self):
  12. # 确保 API URL 包含完整路径
  13. base_url = settings.qwen3.api_url.rstrip('/')
  14. self.api_url = f"{base_url}/v1/chat/completions"
  15. self.model = settings.qwen3.model
  16. self.intent_model = settings.qwen3.model # 意图识别使用相同模型
  17. async def extract_keywords(self, question: str) -> str:
  18. """从问题中提炼搜索关键词"""
  19. # 使用prompt加载器加载关键词提取prompt(如果配置了的话)
  20. # 这里暂时保留原有逻辑,可以后续添加到prompt配置中
  21. keyword_prompt = """你是一个关键词提取助手。请从用户的问题中提炼出最核心的搜索关键词。
  22. 要求:
  23. 1. 提取2-5个最关键的词语
  24. 2. 去除语气词、助词等无意义词汇
  25. 3. 保留专业术语和核心概念
  26. 4. 以空格分隔多个关键词
  27. 直接返回关键词,不要其他说明。
  28. 用户问题:"""
  29. messages = [
  30. {"role": "system", "content": keyword_prompt},
  31. {"role": "user", "content": question}
  32. ]
  33. try:
  34. keywords = await self.chat(messages)
  35. return keywords.strip()
  36. except Exception as e:
  37. logger.error(f"关键词提取失败: {e}")
  38. # 失败时返回原问题
  39. return question
  40. async def intent_recognition(self, message: str) -> dict:
  41. """意图识别"""
  42. # 使用prompt加载器加载意图识别prompt
  43. intent_prompt = load_prompt("intent_recognition", userMessage=message)
  44. messages = [
  45. {"role": "user", "content": intent_prompt}
  46. ]
  47. try:
  48. response = await self.chat(messages)
  49. # 尝试解析JSON
  50. import re
  51. json_match = re.search(r'\{[^}]+\}', response)
  52. if json_match:
  53. result = json.loads(json_match.group())
  54. intent_type = result.get("intent_type", "").lower()
  55. # 为 greeting 和 faq 添加预设回复
  56. if intent_type in ("greeting", "问候"):
  57. result["response"] = "您好!我是蜀道集团智能助手,很高兴为您服务。"
  58. elif intent_type in ("faq", "常见问题"):
  59. result["response"] = "我可以帮您解答常见问题,请告诉我您想了解什么。"
  60. else:
  61. result["response"] = ""
  62. return result
  63. return {"intent_type": "general_chat", "confidence": 0.5, "reason": "无法解析", "response": ""}
  64. except Exception as e:
  65. logger.error(f"意图识别失败: {e}")
  66. return {"intent_type": "general_chat", "confidence": 0.5, "reason": str(e), "response": ""}
  67. async def chat(self, messages: list, model: str = None) -> str:
  68. """同步聊天"""
  69. data = {
  70. "model": model or self.model,
  71. "messages": messages,
  72. "stream": False # 明确指定非流式
  73. }
  74. try:
  75. async with httpx.AsyncClient(timeout=60.0) as client:
  76. response = await client.post(
  77. self.api_url,
  78. json=data
  79. )
  80. logger.info(f"Qwen API 响应状态: {response.status_code}")
  81. response.raise_for_status()
  82. # 检查响应是否为空
  83. if not response.text:
  84. logger.error("Qwen API 返回空响应")
  85. return ""
  86. # 检查是否是流式响应(以 data: 开头)
  87. if response.text.startswith("data:"):
  88. logger.info("检测到流式响应,解析 SSE 格式")
  89. # 解析 SSE 格式
  90. content_parts = []
  91. for line in response.text.split('\n'):
  92. if line.startswith("data:"):
  93. data_str = line[5:].strip()
  94. if data_str and data_str != "[DONE]":
  95. try:
  96. data_json = json.loads(data_str)
  97. delta_content = data_json.get('choices', [{}])[0].get('delta', {}).get('content', '')
  98. if delta_content:
  99. content_parts.append(delta_content)
  100. except json.JSONDecodeError:
  101. continue
  102. return ''.join(content_parts)
  103. # 普通 JSON 响应
  104. try:
  105. result = response.json()
  106. return result.get('response', result.get('choices', [{}])[0].get('message', {}).get('content', ''))
  107. except json.JSONDecodeError as je:
  108. logger.error(f"Qwen API 响应不是有效的 JSON: {response.text[:200]}")
  109. raise ValueError(f"无效的 JSON 响应: {str(je)}")
  110. except Exception as e:
  111. logger.error(f"Qwen API 调用失败: {e}")
  112. raise
  113. async def stream_chat(self, messages: list) -> AsyncGenerator[str, None]:
  114. """流式聊天"""
  115. data = {
  116. "model": self.model,
  117. "messages": messages,
  118. "stream": True
  119. }
  120. try:
  121. async with httpx.AsyncClient(timeout=120.0) as client:
  122. async with client.stream(
  123. "POST",
  124. self.api_url,
  125. json=data
  126. ) as response:
  127. response.raise_for_status()
  128. async for line in response.aiter_lines():
  129. if line.startswith("data: "):
  130. data_str = line[6:]
  131. if data_str == "[DONE]":
  132. break
  133. try:
  134. data_json = json.loads(data_str)
  135. # 兼容 OpenAI 格式 choices[0].delta.content
  136. choices = data_json.get('choices', [])
  137. if choices:
  138. content = choices[0].get('delta', {}).get('content', '') or choices[0].get('message', {}).get('content', '')
  139. else:
  140. content = data_json.get('content', '')
  141. if content:
  142. yield content
  143. except json.JSONDecodeError:
  144. continue
  145. except Exception as e:
  146. logger.error(f"Qwen 流式 API 调用失败: {e}")
  147. raise
  148. # 全局实例
  149. qwen_service = QwenService()