| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 |
- """
- Qwen AI 服务
- """
- import httpx
- import json
- import re
- import time
- from typing import Any, AsyncGenerator, Optional
- from utils.config import settings
- from utils.logger import logger
- from utils.prompt_loader import load_prompt
- from services.deepseek_service import deepseek_service
- class QwenService:
- def __init__(self):
- # 确保 API URL 包含完整路径
- base_url = settings.qwen3.api_url.rstrip('/')
- self.api_url = f"{base_url}/v1/chat/completions"
- self.model = settings.qwen3.model
- # 意图识别使用专门的配置
- intent_base_url = settings.intent.api_url.rstrip('/')
- self.intent_api_url = f"{intent_base_url}/v1/chat/completions"
- self.intent_model = settings.intent.model
- self._timeout = httpx.Timeout(120.0, connect=10.0)
- self._limits = httpx.Limits(
- max_connections=50, max_keepalive_connections=20)
- self._client = httpx.AsyncClient(
- timeout=self._timeout, limits=self._limits)
- async def aclose(self) -> None:
- await self._client.aclose()
- def _should_fallback(self, status_code: int) -> bool:
- return status_code in (429, 500, 502, 503, 504)
- async def _fallback_deepseek(self, messages: list) -> str:
- try:
- logger.warning(
- "[Qwen API] Falling back to DeepSeek due to upstream error")
- return await deepseek_service.chat(messages)
- except Exception as e:
- error_msg = str(e).strip() if str(e).strip() else type(e).__name__
- logger.error(
- f"[Qwen API] DeepSeek fallback failed: {type(e).__name__}: {error_msg}")
- raise RuntimeError(
- f"AI服务暂时不可用,主模型和备用模型均无法响应({type(e).__name__}),请稍后重试") from e
- def _extract_first_json_object(self, response_text: str) -> Optional[dict[str, Any]]:
- """从模型文本中提取首个合法 JSON 对象,避免贪婪正则误匹配多段内容。"""
- if not response_text:
- return None
- cleaned = re.sub(r"```(?:json)?\s*", "", response_text).strip()
- start_index = cleaned.find("{")
- if start_index < 0:
- return None
- depth = 0
- in_string = False
- escape = False
- for index in range(start_index, len(cleaned)):
- char = cleaned[index]
- if in_string:
- if escape:
- escape = False
- elif char == "\\":
- escape = True
- elif char == '"':
- in_string = False
- continue
- if char == '"':
- in_string = True
- continue
- if char == "{":
- depth += 1
- elif char == "}":
- depth -= 1
- if depth == 0:
- candidate = cleaned[start_index:index + 1]
- try:
- parsed = json.loads(candidate)
- except json.JSONDecodeError:
- return None
- return parsed if isinstance(parsed, dict) else None
- return None
- def _normalize_route_mode(self, raw_route_mode: object) -> str:
- route_mode_mapping = {
- "ai-qa": "ai-qa",
- "ai_qa": "ai-qa",
- "ai_qa_module": "ai-qa",
- "qa": "ai-qa",
- "general_chat": "ai-qa",
- "ai-writing": "ai-writing",
- "ai_writing": "ai-writing",
- "writing": "ai-writing",
- "document_writing": "ai-writing",
- "safety-training": "safety-training",
- "safety_training": "safety-training",
- "training": "safety-training",
- "ppt_outline": "safety-training",
- "exam-workshop": "exam-workshop",
- "exam_workshop": "exam-workshop",
- "exam": "exam-workshop",
- "question_bank": "exam-workshop",
- }
- normalized_value = str(raw_route_mode or "ai-qa").strip().lower()
- return route_mode_mapping.get(normalized_value, "ai-qa")
- def _route_mode_to_business_type(self, route_mode: str) -> int:
- return {
- "ai-qa": 0,
- "safety-training": 1,
- "ai-writing": 2,
- "exam-workshop": 3,
- }.get(route_mode, 0)
- async def extract_keywords(self, question: str) -> str:
- """从问题中提炼搜索关键词"""
- # 使用prompt加载器加载关键词提取prompt(如果配置了的话)
- # 这里暂时保留原有逻辑,可以后续添加到prompt配置中
- keyword_prompt = """你是一个关键词提取助手。请从用户的问题中提炼出最核心的搜索关键词。
- 要求:
- 1. 提取2-5个最关键的词语
- 2. 去除语气词、助词等无意义词汇
- 3. 保留专业术语和核心概念
- 4. 以空格分隔多个关键词
- 直接返回关键词,不要其他说明。
- 用户问题:"""
- messages = [
- {"role": "system", "content": keyword_prompt},
- {"role": "user", "content": question}
- ]
- try:
- keywords = await self.chat(messages)
- return keywords.strip()
- except Exception as e:
- logger.error(f"关键词提取失败: {e}")
- # 失败时返回原问题
- return question
- async def intent_recognition(self, message: str) -> dict:
- """意图识别"""
- # 使用prompt加载器加载意图识别prompt
- intent_prompt = load_prompt("intent_recognition", userMessage=message)
- messages = [
- {"role": "user", "content": intent_prompt}
- ]
- try:
- # 使用专门的意图识别API和模型
- response = await self.chat(messages, model=self.intent_model, api_url=self.intent_api_url)
- logger.info(f"意图识别原始响应: {response[:500]}")
- result = self._extract_first_json_object(response)
- if result:
- # 兼容模板输出的 "intent" 和 "intent_type" 两种字段名
- intent_type = (result.get("intent_type")
- or result.get("intent") or "").lower()
- # 统一设置 intent_type 字段,确保下游一致
- result["intent_type"] = intent_type
- # 优先使用模型返回的 direct_answer,否则使用预设回复
- direct_answer = result.get("direct_answer", "")
- if intent_type in ("greeting", "问候"):
- result["response"] = direct_answer if direct_answer else "您好!我是蜀道集团智能助手,很高兴为您服务。"
- elif intent_type in ("faq", "常见问题"):
- result["response"] = direct_answer if direct_answer else "我可以帮您解答常见问题,请告诉我您想了解什么。"
- else:
- result["response"] = direct_answer or ""
- return result
- logger.warning(f"意图识别JSON解析失败,原始响应: {response[:300]}")
- return {"intent_type": "general_chat", "confidence": 0.5, "reason": "无法解析JSON", "response": ""}
- except Exception as e:
- logger.error(f"意图识别失败: {e}")
- return {"intent_type": "general_chat", "confidence": 0.5, "reason": str(e), "response": ""}
- async def module_dispatch_recognition(self, message: str) -> dict:
- """顶层模块分发识别"""
- def build_dispatch_result(route_mode: str, confidence: float, reason: str) -> dict:
- normalized_route_mode = self._normalize_route_mode(route_mode)
- return {
- "route_mode": normalized_route_mode,
- "business_type": self._route_mode_to_business_type(normalized_route_mode),
- "confidence": confidence,
- "reason": reason,
- }
- def explicit_rule_route(user_message: str) -> Optional[dict]:
- normalized_message = (user_message or "").strip().lower()
- if not normalized_message:
- return None
- exam_keywords = (
- "试卷", "题库", "题目", "考题", "考试", "考核", "出题", "组卷", "练习题"
- )
- strong_training_keywords = (
- "培训课件", "培训大纲", "培训讲稿", "培训计划", "培训材料", "培训资料", "培训ppt"
- )
- training_keywords = (
- "课件", "讲稿", "大纲", "ppt",
- )
- writing_action_keywords = (
- "写", "写个", "写一份", "写一个", "起草", "草拟", "拟一份", "拟写", "撰写", "生成", "润色", "改写", "给我一份", "帮我出一份", "整理一份", "拟定", "编写",
- )
- writing_document_keywords = (
- "通知", "方案", "报告", "制度", "纪要", "函", "总结", "公文", "申请", "发言稿", "倡议书", "要点", "方法", "流程", "预案", "指南", "手册", "细则",
- )
- if any(keyword in normalized_message for keyword in exam_keywords):
- return build_dispatch_result("exam-workshop", 0.97, "显式命中考试工坊关键词")
- if any(keyword in normalized_message for keyword in strong_training_keywords):
- return build_dispatch_result("safety-training", 0.96, "显式要求生成核心培训物料")
- has_training_keyword = any(
- keyword in normalized_message for keyword in training_keywords) or "培训" in normalized_message
- has_writing_action = any(
- keyword in normalized_message for keyword in writing_action_keywords)
- has_writing_document = any(
- keyword in normalized_message for keyword in writing_document_keywords)
- if has_training_keyword and has_writing_document:
- return build_dispatch_result("ai-writing", 0.95, "培训场景下显式要求撰写文稿,优先归入AI写作")
- if has_training_keyword:
- return build_dispatch_result("safety-training", 0.93, "显式命中安全培训关键词")
- if has_writing_document and has_writing_action:
- return build_dispatch_result("ai-writing", 0.98, "显式要求撰写正式文稿,优先归入AI写作")
- return None
- def keyword_fallback_route(user_message: str) -> dict:
- explicit_route = explicit_rule_route(user_message)
- if explicit_route:
- return explicit_route
- normalized_message = (user_message or "").strip().lower()
- if not normalized_message:
- return build_dispatch_result("ai-qa", 0.3, "空消息回退到AI助手")
- exam_keywords = (
- "试卷", "题库", "题目", "考题", "考试", "考核", "出题", "组卷", "练习题"
- )
- training_keywords = (
- "培训课件", "培训大纲", "培训讲稿", "培训计划", "培训材料", "培训ppt", "课件", "讲稿", "大纲", "ppt"
- )
- writing_action_keywords = (
- "写", "写个", "写一份", "写一个", "起草", "草拟", "拟一份", "拟写", "撰写", "生成", "润色", "改写", "给我一份", "帮我出一份", "整理一份", "拟定", "编写"
- )
- writing_document_keywords = (
- "通知", "方案", "报告", "制度", "纪要", "函", "总结", "公文", "申请", "发言稿", "倡议书", "要点", "方法", "流程", "预案", "指南", "手册", "细则"
- )
- if any(keyword in normalized_message for keyword in exam_keywords):
- return build_dispatch_result("exam-workshop", 0.85, "关键词规则命中考试工坊")
- if any(keyword in normalized_message for keyword in training_keywords):
- return build_dispatch_result("safety-training", 0.8, "关键词规则命中安全培训")
- has_writing_action = any(
- keyword in normalized_message for keyword in writing_action_keywords
- )
- has_writing_document = any(
- keyword in normalized_message for keyword in writing_document_keywords
- )
- if has_writing_document and has_writing_action:
- return build_dispatch_result("ai-writing", 0.8, "关键词规则命中AI写作")
- if "培训" in normalized_message and ("通知" in normalized_message or "方案" in normalized_message):
- return build_dispatch_result("ai-writing", 0.78, "培训类文稿回退到AI写作")
- if "培训" in normalized_message:
- return build_dispatch_result("safety-training", 0.72, "培训关键词回退到安全培训")
- return build_dispatch_result("ai-qa", 0.5, "未命中规则,回退到AI助手")
- explicit_route = explicit_rule_route(message)
- if explicit_route:
- logger.info(
- f"模块分发快速命中显式规则: route={explicit_route['route_mode']}, "
- f"message={str(message or '')[:120]}"
- )
- return explicit_route
- dispatch_prompt = load_prompt("module_dispatch", userMessage=message)
- messages = [
- {"role": "user", "content": dispatch_prompt}
- ]
- try:
- response = await self.chat(messages, model=self.intent_model, api_url=self.intent_api_url)
- logger.info(f"模块分发原始响应: {response[:500]}")
- result = self._extract_first_json_object(response)
- if not result:
- logger.warning(f"模块分发JSON解析失败,原始响应: {response[:300]}")
- return keyword_fallback_route(message)
- normalized_route_mode = self._normalize_route_mode(
- result.get("route_mode")
- or result.get("target_module")
- or result.get("intent_type")
- or result.get("intent")
- or "ai_qa"
- )
- return build_dispatch_result(
- normalized_route_mode,
- float(result.get("confidence", 0.5) or 0.5),
- result.get("reason", "")
- )
- except Exception as e:
- logger.error(f"模块分发识别失败: {e}")
- return keyword_fallback_route(message)
- async def chat(
- self,
- messages: list,
- model: str = None,
- api_url: str = None,
- disable_reasoning: bool = False,
- ) -> str:
- """同步聊天"""
- final_messages = messages
- if disable_reasoning:
- reasoning_control_message = {
- "role": "system",
- "content": (
- "本次回复禁止输出任何思维链、推理过程、Thinking Process、Reasoning、分析步骤、"
- "前言、后记或 JSON 之外的说明文字。请只输出最终结果。"
- )
- }
- final_messages = [reasoning_control_message, *messages]
- data = {
- "model": model or self.model,
- "messages": final_messages,
- "stream": False # 明确指定非流式
- }
- if data["model"] == self.intent_model:
- # 意图识别是确定性分类,降低随机性并严格限制生成长度以加速
- data["temperature"] = 0.1
- data["max_tokens"] = 30
- # 使用指定的API URL,默认使用qwen3的URL
- target_url = api_url or self.api_url
- normalized_target = target_url.rstrip("/")
- is_qwen3_target = normalized_target == self.api_url.rstrip("/")
- start_at = time.monotonic()
- logger.info(
- f"[Qwen API] 请求: url={target_url} model={data['model']} "
- f"messages={len(final_messages)} disable_reasoning={disable_reasoning}"
- )
- try:
- # 准备请求头
- headers = {
- "Content-Type": "application/json"
- }
- # 如果配置中有 token,添加到请求头(兼容需要认证的场景)
- if hasattr(settings, 'intent') and hasattr(settings.intent, 'token') and normalized_target == self.intent_api_url.rstrip("/"):
- if settings.intent.token:
- headers["Authorization"] = f"Bearer {settings.intent.token}"
- logger.info(
- "[Qwen API] 已添加 Intent API Authorization header")
- elif hasattr(settings, 'qwen3') and hasattr(settings.qwen3, 'token') and normalized_target == self.api_url.rstrip("/"):
- if settings.qwen3.token:
- headers["Authorization"] = f"Bearer {settings.qwen3.token}"
- logger.info(
- "[Qwen API] 已添加 Qwen3 API Authorization header")
- response = await self._client.post(
- target_url,
- json=data,
- headers=headers,
- )
- elapsed_ms = int((time.monotonic() - start_at) * 1000)
- logger.info(
- f"[Qwen API] 响应: status={response.status_code} elapsed_ms={elapsed_ms}")
- logger.debug(f"[Qwen API] 响应头: {dict(response.headers)}")
- logger.debug(
- f"[Qwen API] 响应预览: {(response.text[:500] if response.text else '(空响应)')}")
- response.raise_for_status()
- if not response.text:
- logger.error("[Qwen API] 返回空响应")
- return ""
- if response.text.startswith("data:"):
- logger.info("[Qwen API] 检测到流式响应,解析 SSE 格式")
- content_parts = []
- for line in response.text.split('\n'):
- if line.startswith("data:"):
- data_str = line[5:].strip()
- if data_str and data_str != "[DONE]":
- try:
- data_json = json.loads(data_str)
- delta_content = data_json.get('choices', [{}])[0].get(
- 'delta', {}).get('content', '')
- if delta_content:
- content_parts.append(delta_content)
- except json.JSONDecodeError:
- continue
- final_content = ''.join(content_parts)
- logger.info(f"[Qwen API] SSE 解析完成,内容长度: {len(final_content)}")
- return final_content
- try:
- result = response.json()
- content = result.get('response', result.get('choices', [{}])[
- 0].get('message', {}).get('content', ''))
- logger.info(f"[Qwen API] JSON 解析成功,内容长度: {len(content)}")
- return content
- except json.JSONDecodeError as je:
- logger.error(f"[Qwen API] 响应不是有效的 JSON: {response.text[:200]}")
- raise ValueError(f"无效的 JSON 响应: {str(je)}")
- except httpx.HTTPStatusError as e:
- logger.error(
- f"[Qwen API] HTTP 错误 - 状态码: {e.response.status_code}, URL: {target_url}")
- logger.error(f"[Qwen API] HTTP 错误响应: {e.response.text[:500]}")
- if is_qwen3_target and self._should_fallback(e.response.status_code):
- return await self._fallback_deepseek(final_messages)
- raise
- except httpx.RequestError as e:
- logger.error(
- f"[Qwen API] 请求错误 - URL: {target_url}, 错误: {type(e).__name__}: {str(e)}")
- if is_qwen3_target:
- return await self._fallback_deepseek(final_messages)
- raise
- except Exception as e:
- logger.error(
- f"[Qwen API] 未知错误 - URL: {target_url}, 模型: {data['model']}, 错误: {type(e).__name__}: {str(e)}")
- raise
- async def stream_chat(self, messages: list) -> AsyncGenerator[str, None]:
- """流式聊天"""
- data = {
- "model": self.model,
- "messages": messages,
- "stream": True
- }
- try:
- async with self._client.stream(
- "POST",
- self.api_url,
- json=data,
- ) as response:
- response.raise_for_status()
- async for line in response.aiter_lines():
- if line.startswith("data: "):
- data_str = line[6:]
- if data_str == "[DONE]":
- break
- try:
- data_json = json.loads(data_str)
- choices = data_json.get('choices', [])
- if choices:
- content = choices[0].get('delta', {}).get(
- 'content', '') or choices[0].get('message', {}).get('content', '')
- else:
- content = data_json.get('content', '')
- if content:
- yield content
- except json.JSONDecodeError:
- continue
- except httpx.HTTPStatusError as e:
- status_code = e.response.status_code if e.response else 0
- logger.error(f"Qwen stream HTTP error: {status_code}")
- if self._should_fallback(status_code):
- logger.warning("[Qwen API] Stream fallback to DeepSeek")
- async for chunk in deepseek_service.stream_chat(messages):
- yield chunk
- return
- raise
- except httpx.RequestError as e:
- logger.error(f"Qwen stream request error: {type(e).__name__}: {e}")
- logger.warning("[Qwen API] Stream fallback to DeepSeek")
- async for chunk in deepseek_service.stream_chat(messages):
- yield chunk
- return
- except Exception as e:
- logger.error(f"Qwen 流式 API 调用失败: {e}")
- raise
- # 全局实例
- qwen_service = QwenService()
|