| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- """
- Qwen AI 服务
- """
- import httpx
- import json
- from typing import AsyncGenerator
- from utils.config import settings
- from utils.logger import logger
- from utils.prompt_loader import load_prompt
- class QwenService:
- def __init__(self):
- # 确保 API URL 包含完整路径
- base_url = settings.qwen3.api_url.rstrip('/')
- self.api_url = f"{base_url}/v1/chat/completions"
- self.model = settings.qwen3.model
- self.intent_model = settings.qwen3.model # 意图识别使用相同模型
-
- async def extract_keywords(self, question: str) -> str:
- """从问题中提炼搜索关键词"""
- # 使用prompt加载器加载关键词提取prompt(如果配置了的话)
- # 这里暂时保留原有逻辑,可以后续添加到prompt配置中
- keyword_prompt = """你是一个关键词提取助手。请从用户的问题中提炼出最核心的搜索关键词。
- 要求:
- 1. 提取2-5个最关键的词语
- 2. 去除语气词、助词等无意义词汇
- 3. 保留专业术语和核心概念
- 4. 以空格分隔多个关键词
- 直接返回关键词,不要其他说明。
- 用户问题:"""
-
- messages = [
- {"role": "system", "content": keyword_prompt},
- {"role": "user", "content": question}
- ]
-
- try:
- keywords = await self.chat(messages)
- return keywords.strip()
- except Exception as e:
- logger.error(f"关键词提取失败: {e}")
- # 失败时返回原问题
- return question
-
- async def intent_recognition(self, message: str) -> dict:
- """意图识别"""
- # 使用prompt加载器加载意图识别prompt
- intent_prompt = load_prompt("intent_recognition", userMessage=message)
-
- messages = [
- {"role": "user", "content": intent_prompt}
- ]
-
- try:
- response = await self.chat(messages)
- # 尝试解析JSON
- import re
- json_match = re.search(r'\{[^}]+\}', response)
- if json_match:
- result = json.loads(json_match.group())
- intent_type = result.get("intent_type", "").lower()
-
- # 为 greeting 和 faq 添加预设回复
- if intent_type in ("greeting", "问候"):
- result["response"] = "您好!我是蜀道集团智能助手,很高兴为您服务。"
- elif intent_type in ("faq", "常见问题"):
- result["response"] = "我可以帮您解答常见问题,请告诉我您想了解什么。"
- else:
- result["response"] = ""
-
- return result
- return {"intent_type": "general_chat", "confidence": 0.5, "reason": "无法解析", "response": ""}
- except Exception as e:
- logger.error(f"意图识别失败: {e}")
- return {"intent_type": "general_chat", "confidence": 0.5, "reason": str(e), "response": ""}
-
- async def chat(self, messages: list, model: str = None) -> str:
- """同步聊天"""
- data = {
- "model": model or self.model,
- "messages": messages,
- "stream": False # 明确指定非流式
- }
-
- try:
- async with httpx.AsyncClient(timeout=60.0) as client:
- response = await client.post(
- self.api_url,
- json=data
- )
-
- logger.info(f"Qwen API 响应状态: {response.status_code}")
-
- response.raise_for_status()
-
- # 检查响应是否为空
- if not response.text:
- logger.error("Qwen API 返回空响应")
- return ""
-
- # 检查是否是流式响应(以 data: 开头)
- if response.text.startswith("data:"):
- logger.info("检测到流式响应,解析 SSE 格式")
- # 解析 SSE 格式
- content_parts = []
- for line in response.text.split('\n'):
- if line.startswith("data:"):
- data_str = line[5:].strip()
- if data_str and data_str != "[DONE]":
- try:
- data_json = json.loads(data_str)
- delta_content = data_json.get('choices', [{}])[0].get('delta', {}).get('content', '')
- if delta_content:
- content_parts.append(delta_content)
- except json.JSONDecodeError:
- continue
- return ''.join(content_parts)
-
- # 普通 JSON 响应
- try:
- result = response.json()
- return result.get('response', result.get('choices', [{}])[0].get('message', {}).get('content', ''))
- except json.JSONDecodeError as je:
- logger.error(f"Qwen API 响应不是有效的 JSON: {response.text[:200]}")
- raise ValueError(f"无效的 JSON 响应: {str(je)}")
-
- except Exception as e:
- logger.error(f"Qwen API 调用失败: {e}")
- raise
-
- async def stream_chat(self, messages: list) -> AsyncGenerator[str, None]:
- """流式聊天"""
- data = {
- "model": self.model,
- "messages": messages,
- "stream": True
- }
-
- try:
- async with httpx.AsyncClient(timeout=120.0) as client:
- async with client.stream(
- "POST",
- self.api_url,
- json=data
- ) as response:
- response.raise_for_status()
- async for line in response.aiter_lines():
- if line.startswith("data: "):
- data_str = line[6:]
- if data_str == "[DONE]":
- break
- try:
- data_json = json.loads(data_str)
- # 兼容 OpenAI 格式 choices[0].delta.content
- choices = data_json.get('choices', [])
- if choices:
- content = choices[0].get('delta', {}).get('content', '') or choices[0].get('message', {}).get('content', '')
- else:
- content = data_json.get('content', '')
- if content:
- yield content
- except json.JSONDecodeError:
- continue
- except Exception as e:
- logger.error(f"Qwen 流式 API 调用失败: {e}")
- raise
- # 全局实例
- qwen_service = QwenService()
|