""" Qwen AI 服务 """ import httpx import json from typing import AsyncGenerator from utils.config import settings from utils.logger import logger from utils.prompt_loader import load_prompt from services.deepseek_service import deepseek_service class QwenService: def __init__(self): # 确保 API URL 包含完整路径 base_url = settings.qwen3.api_url.rstrip('/') self.api_url = f"{base_url}/v1/chat/completions" self.model = settings.qwen3.model # 意图识别使用专门的配置 intent_base_url = settings.intent.api_url.rstrip('/') self.intent_api_url = f"{intent_base_url}/v1/chat/completions" self.intent_model = settings.intent.model def _should_fallback(self, status_code: int) -> bool: return status_code in (429, 500, 502, 503, 504) async def _fallback_deepseek(self, messages: list) -> str: try: logger.warning("[Qwen API] Falling back to DeepSeek due to upstream error") return await deepseek_service.chat(messages) except Exception as e: error_msg = str(e).strip() if str(e).strip() else type(e).__name__ logger.error(f"[Qwen API] DeepSeek fallback failed: {type(e).__name__}: {error_msg}") raise RuntimeError(f"AI服务暂时不可用,主模型和备用模型均无法响应({type(e).__name__}),请稍后重试") from e async def extract_keywords(self, question: str) -> str: """从问题中提炼搜索关键词""" # 使用prompt加载器加载关键词提取prompt(如果配置了的话) # 这里暂时保留原有逻辑,可以后续添加到prompt配置中 keyword_prompt = """你是一个关键词提取助手。请从用户的问题中提炼出最核心的搜索关键词。 要求: 1. 提取2-5个最关键的词语 2. 去除语气词、助词等无意义词汇 3. 保留专业术语和核心概念 4. 以空格分隔多个关键词 直接返回关键词,不要其他说明。 用户问题:""" messages = [ {"role": "system", "content": keyword_prompt}, {"role": "user", "content": question} ] try: keywords = await self.chat(messages) return keywords.strip() except Exception as e: logger.error(f"关键词提取失败: {e}") # 失败时返回原问题 return question async def intent_recognition(self, message: str) -> dict: """意图识别""" # 使用prompt加载器加载意图识别prompt intent_prompt = load_prompt("intent_recognition", userMessage=message) messages = [ {"role": "user", "content": intent_prompt} ] try: # 使用专门的意图识别API和模型 response = await self.chat(messages, model=self.intent_model, api_url=self.intent_api_url) logger.info(f"意图识别原始响应: {response[:500]}") # 尝试解析JSON - 使用支持多行和嵌套的正则 import re # 先去除 markdown 代码块标记 cleaned = re.sub(r'```(?:json)?\s*', '', response) cleaned = cleaned.strip() # 匹配最外层的 { ... }(支持多行和嵌套) json_match = re.search(r'\{.*\}', cleaned, re.DOTALL) if json_match: result = json.loads(json_match.group()) # 兼容模板输出的 "intent" 和 "intent_type" 两种字段名 intent_type = (result.get("intent_type") or result.get("intent") or "").lower() # 统一设置 intent_type 字段,确保下游一致 result["intent_type"] = intent_type # 优先使用模型返回的 direct_answer,否则使用预设回复 direct_answer = result.get("direct_answer", "") if intent_type in ("greeting", "问候"): result["response"] = direct_answer if direct_answer else "您好!我是蜀道集团智能助手,很高兴为您服务。" elif intent_type in ("faq", "常见问题"): result["response"] = direct_answer if direct_answer else "我可以帮您解答常见问题,请告诉我您想了解什么。" else: result["response"] = direct_answer or "" return result logger.warning(f"意图识别JSON解析失败,原始响应: {response[:300]}") return {"intent_type": "general_chat", "confidence": 0.5, "reason": "无法解析JSON", "response": ""} except Exception as e: logger.error(f"意图识别失败: {e}") return {"intent_type": "general_chat", "confidence": 0.5, "reason": str(e), "response": ""} async def chat(self, messages: list, model: str = None, api_url: str = None) -> str: """同步聊天""" data = { "model": model or self.model, "messages": messages, "stream": False # 明确指定非流式 } # 使用指定的API URL,默认使用qwen3的URL target_url = api_url or self.api_url normalized_target = target_url.rstrip("/") is_qwen3_target = normalized_target == self.api_url.rstrip("/") # 详细请求日志 logger.info(f"[Qwen API] 请求 URL: {target_url}") logger.info(f"[Qwen API] 使用模型: {data['model']}") logger.info(f"[Qwen API] 消息数量: {len(messages)}") try: # 准备请求头 headers = { "Content-Type": "application/json" } # 如果配置中有 token,添加到请求头(兼容需要认证的场景) if hasattr(settings, 'intent') and hasattr(settings.intent, 'token') and normalized_target == self.intent_api_url.rstrip("/"): if settings.intent.token: headers["Authorization"] = f"Bearer {settings.intent.token}" logger.info("[Qwen API] 已添加 Intent API Authorization header") elif hasattr(settings, 'qwen3') and hasattr(settings.qwen3, 'token') and normalized_target == self.api_url.rstrip("/"): if settings.qwen3.token: headers["Authorization"] = f"Bearer {settings.qwen3.token}" logger.info("[Qwen API] 已添加 Qwen3 API Authorization header") async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post( target_url, json=data, headers=headers ) logger.info(f"[Qwen API] 响应状态码: {response.status_code}") logger.info(f"[Qwen API] 响应头: {dict(response.headers)}") # 记录响应内容(前500字符用于调试) response_preview = response.text[:500] if response.text else "(空响应)" logger.info(f"[Qwen API] 响应预览: {response_preview}") response.raise_for_status() # 检查响应是否为空 if not response.text: logger.error("[Qwen API] 返回空响应") return "" # 检查是否是流式响应(以 data: 开头) if response.text.startswith("data:"): logger.info("[Qwen API] 检测到流式响应,解析 SSE 格式") # 解析 SSE 格式 content_parts = [] for line in response.text.split('\n'): if line.startswith("data:"): data_str = line[5:].strip() if data_str and data_str != "[DONE]": try: data_json = json.loads(data_str) delta_content = data_json.get('choices', [{}])[0].get('delta', {}).get('content', '') if delta_content: content_parts.append(delta_content) except json.JSONDecodeError: continue final_content = ''.join(content_parts) logger.info(f"[Qwen API] SSE 解析完成,内容长度: {len(final_content)}") return final_content # 普通 JSON 响应 try: result = response.json() content = result.get('response', result.get('choices', [{}])[0].get('message', {}).get('content', '')) logger.info(f"[Qwen API] JSON 解析成功,内容长度: {len(content)}") return content except json.JSONDecodeError as je: logger.error(f"[Qwen API] 响应不是有效的 JSON: {response.text[:200]}") raise ValueError(f"无效的 JSON 响应: {str(je)}") except httpx.HTTPStatusError as e: logger.error(f"[Qwen API] HTTP 错误 - 状态码: {e.response.status_code}, URL: {target_url}") logger.error(f"[Qwen API] HTTP 错误响应: {e.response.text[:500]}") if is_qwen3_target and self._should_fallback(e.response.status_code): return await self._fallback_deepseek(messages) raise except httpx.RequestError as e: logger.error(f"[Qwen API] 请求错误 - URL: {target_url}, 错误: {type(e).__name__}: {str(e)}") if is_qwen3_target: return await self._fallback_deepseek(messages) raise except Exception as e: logger.error(f"[Qwen API] 未知错误 - URL: {target_url}, 模型: {data['model']}, 错误: {type(e).__name__}: {str(e)}") raise async def stream_chat(self, messages: list) -> AsyncGenerator[str, None]: """流式聊天""" data = { "model": self.model, "messages": messages, "stream": True } try: async with httpx.AsyncClient(timeout=120.0) as client: async with client.stream( "POST", self.api_url, json=data ) as response: response.raise_for_status() async for line in response.aiter_lines(): if line.startswith("data: "): data_str = line[6:] if data_str == "[DONE]": break try: data_json = json.loads(data_str) # 兼容 OpenAI 格式 choices[0].delta.content choices = data_json.get('choices', []) if choices: content = choices[0].get('delta', {}).get('content', '') or choices[0].get('message', {}).get('content', '') else: content = data_json.get('content', '') if content: yield content except json.JSONDecodeError: continue except httpx.HTTPStatusError as e: status_code = e.response.status_code if e.response else 0 logger.error(f"Qwen stream HTTP error: {status_code}") if self._should_fallback(status_code): logger.warning("[Qwen API] Stream fallback to DeepSeek") async for chunk in deepseek_service.stream_chat(messages): yield chunk return raise except httpx.RequestError as e: logger.error(f"Qwen stream request error: {type(e).__name__}: {e}") logger.warning("[Qwen API] Stream fallback to DeepSeek") async for chunk in deepseek_service.stream_chat(messages): yield chunk return except Exception as e: logger.error(f"Qwen 流式 API 调用失败: {e}") raise # 全局实例 qwen_service = QwenService()