""" Qwen AI 服务 """ import httpx import json from typing import AsyncGenerator from utils.config import settings from utils.logger import logger from utils.prompt_loader import load_prompt class QwenService: def __init__(self): # 确保 API URL 包含完整路径 base_url = settings.qwen3.api_url.rstrip('/') self.api_url = f"{base_url}/v1/chat/completions" self.model = settings.qwen3.model self.intent_model = settings.qwen3.model # 意图识别使用相同模型 async def extract_keywords(self, question: str) -> str: """从问题中提炼搜索关键词""" # 使用prompt加载器加载关键词提取prompt(如果配置了的话) # 这里暂时保留原有逻辑,可以后续添加到prompt配置中 keyword_prompt = """你是一个关键词提取助手。请从用户的问题中提炼出最核心的搜索关键词。 要求: 1. 提取2-5个最关键的词语 2. 去除语气词、助词等无意义词汇 3. 保留专业术语和核心概念 4. 以空格分隔多个关键词 直接返回关键词,不要其他说明。 用户问题:""" messages = [ {"role": "system", "content": keyword_prompt}, {"role": "user", "content": question} ] try: keywords = await self.chat(messages) return keywords.strip() except Exception as e: logger.error(f"关键词提取失败: {e}") # 失败时返回原问题 return question async def intent_recognition(self, message: str) -> dict: """意图识别""" # 使用prompt加载器加载意图识别prompt intent_prompt = load_prompt("intent_recognition", userMessage=message) messages = [ {"role": "user", "content": intent_prompt} ] try: response = await self.chat(messages) # 尝试解析JSON import re json_match = re.search(r'\{[^}]+\}', response) if json_match: result = json.loads(json_match.group()) intent_type = result.get("intent_type", "").lower() # 为 greeting 和 faq 添加预设回复 if intent_type in ("greeting", "问候"): result["response"] = "您好!我是蜀道集团智能助手,很高兴为您服务。" elif intent_type in ("faq", "常见问题"): result["response"] = "我可以帮您解答常见问题,请告诉我您想了解什么。" else: result["response"] = "" return result return {"intent_type": "general_chat", "confidence": 0.5, "reason": "无法解析", "response": ""} except Exception as e: logger.error(f"意图识别失败: {e}") return {"intent_type": "general_chat", "confidence": 0.5, "reason": str(e), "response": ""} async def chat(self, messages: list, model: str = None) -> str: """同步聊天""" data = { "model": model or self.model, "messages": messages, "stream": False # 明确指定非流式 } try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( self.api_url, json=data ) logger.info(f"Qwen API 响应状态: {response.status_code}") response.raise_for_status() # 检查响应是否为空 if not response.text: logger.error("Qwen API 返回空响应") return "" # 检查是否是流式响应(以 data: 开头) if response.text.startswith("data:"): logger.info("检测到流式响应,解析 SSE 格式") # 解析 SSE 格式 content_parts = [] for line in response.text.split('\n'): if line.startswith("data:"): data_str = line[5:].strip() if data_str and data_str != "[DONE]": try: data_json = json.loads(data_str) delta_content = data_json.get('choices', [{}])[0].get('delta', {}).get('content', '') if delta_content: content_parts.append(delta_content) except json.JSONDecodeError: continue return ''.join(content_parts) # 普通 JSON 响应 try: result = response.json() return result.get('response', result.get('choices', [{}])[0].get('message', {}).get('content', '')) except json.JSONDecodeError as je: logger.error(f"Qwen API 响应不是有效的 JSON: {response.text[:200]}") raise ValueError(f"无效的 JSON 响应: {str(je)}") except Exception as e: logger.error(f"Qwen API 调用失败: {e}") raise async def stream_chat(self, messages: list) -> AsyncGenerator[str, None]: """流式聊天""" data = { "model": self.model, "messages": messages, "stream": True } try: async with httpx.AsyncClient(timeout=120.0) as client: async with client.stream( "POST", self.api_url, json=data ) as response: response.raise_for_status() async for line in response.aiter_lines(): if line.startswith("data: "): data_str = line[6:] if data_str == "[DONE]": break try: data_json = json.loads(data_str) # 兼容 OpenAI 格式 choices[0].delta.content choices = data_json.get('choices', []) if choices: content = choices[0].get('delta', {}).get('content', '') or choices[0].get('message', {}).get('content', '') else: content = data_json.get('content', '') if content: yield content except json.JSONDecodeError: continue except Exception as e: logger.error(f"Qwen 流式 API 调用失败: {e}") raise # 全局实例 qwen_service = QwenService()