| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- """
- Qwen AI 服务
- """
- import httpx
- import json
- from typing import AsyncGenerator
- from utils.config import settings
- from utils.logger import logger
- from utils.prompt_loader import load_prompt
- from services.deepseek_service import deepseek_service
- class QwenService:
- def __init__(self):
- # 确保 API URL 包含完整路径
- base_url = settings.qwen3.api_url.rstrip('/')
- self.api_url = f"{base_url}/v1/chat/completions"
- self.model = settings.qwen3.model
-
- # 意图识别使用专门的配置
- intent_base_url = settings.intent.api_url.rstrip('/')
- self.intent_api_url = f"{intent_base_url}/v1/chat/completions"
- self.intent_model = settings.intent.model
- def _should_fallback(self, status_code: int) -> bool:
- return status_code in (429, 500, 502, 503, 504)
- async def _fallback_deepseek(self, messages: list) -> str:
- try:
- logger.warning("[Qwen API] Falling back to DeepSeek due to upstream error")
- return await deepseek_service.chat(messages)
- except Exception as e:
- error_msg = str(e).strip() if str(e).strip() else type(e).__name__
- logger.error(f"[Qwen API] DeepSeek fallback failed: {type(e).__name__}: {error_msg}")
- raise RuntimeError(f"AI服务暂时不可用,主模型和备用模型均无法响应({type(e).__name__}),请稍后重试") from e
-
- async def extract_keywords(self, question: str) -> str:
- """从问题中提炼搜索关键词"""
- # 使用prompt加载器加载关键词提取prompt(如果配置了的话)
- # 这里暂时保留原有逻辑,可以后续添加到prompt配置中
- keyword_prompt = """你是一个关键词提取助手。请从用户的问题中提炼出最核心的搜索关键词。
- 要求:
- 1. 提取2-5个最关键的词语
- 2. 去除语气词、助词等无意义词汇
- 3. 保留专业术语和核心概念
- 4. 以空格分隔多个关键词
- 直接返回关键词,不要其他说明。
- 用户问题:"""
-
- messages = [
- {"role": "system", "content": keyword_prompt},
- {"role": "user", "content": question}
- ]
-
- try:
- keywords = await self.chat(messages)
- return keywords.strip()
- except Exception as e:
- logger.error(f"关键词提取失败: {e}")
- # 失败时返回原问题
- return question
-
- async def intent_recognition(self, message: str) -> dict:
- """意图识别"""
- # 使用prompt加载器加载意图识别prompt
- intent_prompt = load_prompt("intent_recognition", userMessage=message)
-
- messages = [
- {"role": "user", "content": intent_prompt}
- ]
-
- try:
- # 使用专门的意图识别API和模型
- response = await self.chat(messages, model=self.intent_model, api_url=self.intent_api_url)
- logger.info(f"意图识别原始响应: {response[:500]}")
- # 尝试解析JSON - 使用支持多行和嵌套的正则
- import re
- # 先去除 markdown 代码块标记
- cleaned = re.sub(r'```(?:json)?\s*', '', response)
- cleaned = cleaned.strip()
- # 匹配最外层的 { ... }(支持多行和嵌套)
- json_match = re.search(r'\{.*\}', cleaned, re.DOTALL)
- if json_match:
- result = json.loads(json_match.group())
- # 兼容模板输出的 "intent" 和 "intent_type" 两种字段名
- intent_type = (result.get("intent_type") or result.get("intent") or "").lower()
- # 统一设置 intent_type 字段,确保下游一致
- result["intent_type"] = intent_type
-
- # 优先使用模型返回的 direct_answer,否则使用预设回复
- direct_answer = result.get("direct_answer", "")
-
- if intent_type in ("greeting", "问候"):
- result["response"] = direct_answer if direct_answer else "您好!我是蜀道集团智能助手,很高兴为您服务。"
- elif intent_type in ("faq", "常见问题"):
- result["response"] = direct_answer if direct_answer else "我可以帮您解答常见问题,请告诉我您想了解什么。"
- else:
- result["response"] = direct_answer or ""
-
- return result
- logger.warning(f"意图识别JSON解析失败,原始响应: {response[:300]}")
- return {"intent_type": "general_chat", "confidence": 0.5, "reason": "无法解析JSON", "response": ""}
- except Exception as e:
- logger.error(f"意图识别失败: {e}")
- return {"intent_type": "general_chat", "confidence": 0.5, "reason": str(e), "response": ""}
-
- async def chat(self, messages: list, model: str = None, api_url: str = None) -> str:
- """同步聊天"""
- data = {
- "model": model or self.model,
- "messages": messages,
- "stream": False # 明确指定非流式
- }
-
- # 使用指定的API URL,默认使用qwen3的URL
- target_url = api_url or self.api_url
- normalized_target = target_url.rstrip("/")
- is_qwen3_target = normalized_target == self.api_url.rstrip("/")
-
- # 详细请求日志
- logger.info(f"[Qwen API] 请求 URL: {target_url}")
- logger.info(f"[Qwen API] 使用模型: {data['model']}")
- logger.info(f"[Qwen API] 消息数量: {len(messages)}")
-
- try:
- # 准备请求头
- headers = {
- "Content-Type": "application/json"
- }
-
- # 如果配置中有 token,添加到请求头(兼容需要认证的场景)
- if hasattr(settings, 'intent') and hasattr(settings.intent, 'token') and normalized_target == self.intent_api_url.rstrip("/"):
- if settings.intent.token:
- headers["Authorization"] = f"Bearer {settings.intent.token}"
- logger.info("[Qwen API] 已添加 Intent API Authorization header")
- elif hasattr(settings, 'qwen3') and hasattr(settings.qwen3, 'token') and normalized_target == self.api_url.rstrip("/"):
- if settings.qwen3.token:
- headers["Authorization"] = f"Bearer {settings.qwen3.token}"
- logger.info("[Qwen API] 已添加 Qwen3 API Authorization header")
-
- async with httpx.AsyncClient(timeout=120.0) as client:
- response = await client.post(
- target_url,
- json=data,
- headers=headers
- )
-
- logger.info(f"[Qwen API] 响应状态码: {response.status_code}")
- logger.info(f"[Qwen API] 响应头: {dict(response.headers)}")
-
- # 记录响应内容(前500字符用于调试)
- response_preview = response.text[:500] if response.text else "(空响应)"
- logger.info(f"[Qwen API] 响应预览: {response_preview}")
-
- response.raise_for_status()
-
- # 检查响应是否为空
- if not response.text:
- logger.error("[Qwen API] 返回空响应")
- return ""
-
- # 检查是否是流式响应(以 data: 开头)
- if response.text.startswith("data:"):
- logger.info("[Qwen API] 检测到流式响应,解析 SSE 格式")
- # 解析 SSE 格式
- content_parts = []
- for line in response.text.split('\n'):
- if line.startswith("data:"):
- data_str = line[5:].strip()
- if data_str and data_str != "[DONE]":
- try:
- data_json = json.loads(data_str)
- delta_content = data_json.get('choices', [{}])[0].get('delta', {}).get('content', '')
- if delta_content:
- content_parts.append(delta_content)
- except json.JSONDecodeError:
- continue
- final_content = ''.join(content_parts)
- logger.info(f"[Qwen API] SSE 解析完成,内容长度: {len(final_content)}")
- return final_content
-
- # 普通 JSON 响应
- try:
- result = response.json()
- content = result.get('response', result.get('choices', [{}])[0].get('message', {}).get('content', ''))
- logger.info(f"[Qwen API] JSON 解析成功,内容长度: {len(content)}")
- return content
- except json.JSONDecodeError as je:
- logger.error(f"[Qwen API] 响应不是有效的 JSON: {response.text[:200]}")
- raise ValueError(f"无效的 JSON 响应: {str(je)}")
-
- except httpx.HTTPStatusError as e:
- logger.error(f"[Qwen API] HTTP 错误 - 状态码: {e.response.status_code}, URL: {target_url}")
- logger.error(f"[Qwen API] HTTP 错误响应: {e.response.text[:500]}")
- if is_qwen3_target and self._should_fallback(e.response.status_code):
- return await self._fallback_deepseek(messages)
- raise
- except httpx.RequestError as e:
- logger.error(f"[Qwen API] 请求错误 - URL: {target_url}, 错误: {type(e).__name__}: {str(e)}")
- if is_qwen3_target:
- return await self._fallback_deepseek(messages)
- raise
- except Exception as e:
- logger.error(f"[Qwen API] 未知错误 - URL: {target_url}, 模型: {data['model']}, 错误: {type(e).__name__}: {str(e)}")
- raise
-
- async def stream_chat(self, messages: list) -> AsyncGenerator[str, None]:
- """流式聊天"""
- data = {
- "model": self.model,
- "messages": messages,
- "stream": True
- }
-
- try:
- async with httpx.AsyncClient(timeout=120.0) as client:
- async with client.stream(
- "POST",
- self.api_url,
- json=data
- ) as response:
- response.raise_for_status()
- async for line in response.aiter_lines():
- if line.startswith("data: "):
- data_str = line[6:]
- if data_str == "[DONE]":
- break
- try:
- data_json = json.loads(data_str)
- # 兼容 OpenAI 格式 choices[0].delta.content
- choices = data_json.get('choices', [])
- if choices:
- content = choices[0].get('delta', {}).get('content', '') or choices[0].get('message', {}).get('content', '')
- else:
- content = data_json.get('content', '')
- if content:
- yield content
- except json.JSONDecodeError:
- continue
- except httpx.HTTPStatusError as e:
- status_code = e.response.status_code if e.response else 0
- logger.error(f"Qwen stream HTTP error: {status_code}")
- if self._should_fallback(status_code):
- logger.warning("[Qwen API] Stream fallback to DeepSeek")
- async for chunk in deepseek_service.stream_chat(messages):
- yield chunk
- return
- raise
- except httpx.RequestError as e:
- logger.error(f"Qwen stream request error: {type(e).__name__}: {e}")
- logger.warning("[Qwen API] Stream fallback to DeepSeek")
- async for chunk in deepseek_service.stream_chat(messages):
- yield chunk
- return
- except Exception as e:
- logger.error(f"Qwen 流式 API 调用失败: {e}")
- raise
- # 全局实例
- qwen_service = QwenService()
|