qwen_service.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. """
  2. Qwen AI 服务
  3. """
  4. import httpx
  5. import json
  6. import re
  7. import time
  8. from typing import Any, AsyncGenerator, Optional
  9. from utils.config import settings
  10. from utils.logger import logger
  11. from utils.prompt_loader import load_prompt
  12. from services.deepseek_service import deepseek_service
  13. class QwenService:
  14. def __init__(self):
  15. # 确保 API URL 包含完整路径
  16. base_url = settings.qwen3.api_url.rstrip('/')
  17. self.api_url = f"{base_url}/v1/chat/completions"
  18. self.model = settings.qwen3.model
  19. # 意图识别使用专门的配置
  20. intent_base_url = settings.intent.api_url.rstrip('/')
  21. self.intent_api_url = f"{intent_base_url}/v1/chat/completions"
  22. self.intent_model = settings.intent.model
  23. self._timeout = httpx.Timeout(120.0, connect=10.0)
  24. self._limits = httpx.Limits(
  25. max_connections=50, max_keepalive_connections=20)
  26. self._client = httpx.AsyncClient(
  27. timeout=self._timeout, limits=self._limits)
  28. async def aclose(self) -> None:
  29. await self._client.aclose()
  30. def _should_fallback(self, status_code: int) -> bool:
  31. return status_code in (429, 500, 502, 503, 504)
  32. async def _fallback_deepseek(self, messages: list) -> str:
  33. try:
  34. logger.warning(
  35. "[Qwen API] Falling back to DeepSeek due to upstream error")
  36. return await deepseek_service.chat(messages)
  37. except Exception as e:
  38. error_msg = str(e).strip() if str(e).strip() else type(e).__name__
  39. logger.error(
  40. f"[Qwen API] DeepSeek fallback failed: {type(e).__name__}: {error_msg}")
  41. raise RuntimeError(
  42. f"AI服务暂时不可用,主模型和备用模型均无法响应({type(e).__name__}),请稍后重试") from e
  43. def _extract_first_json_object(self, response_text: str) -> Optional[dict[str, Any]]:
  44. """从模型文本中提取首个合法 JSON 对象,避免贪婪正则误匹配多段内容。"""
  45. if not response_text:
  46. return None
  47. cleaned = re.sub(r"```(?:json)?\s*", "", response_text).strip()
  48. start_index = cleaned.find("{")
  49. if start_index < 0:
  50. return None
  51. depth = 0
  52. in_string = False
  53. escape = False
  54. for index in range(start_index, len(cleaned)):
  55. char = cleaned[index]
  56. if in_string:
  57. if escape:
  58. escape = False
  59. elif char == "\\":
  60. escape = True
  61. elif char == '"':
  62. in_string = False
  63. continue
  64. if char == '"':
  65. in_string = True
  66. continue
  67. if char == "{":
  68. depth += 1
  69. elif char == "}":
  70. depth -= 1
  71. if depth == 0:
  72. candidate = cleaned[start_index:index + 1]
  73. try:
  74. parsed = json.loads(candidate)
  75. except json.JSONDecodeError:
  76. return None
  77. return parsed if isinstance(parsed, dict) else None
  78. return None
  79. def _normalize_route_mode(self, raw_route_mode: object) -> str:
  80. route_mode_mapping = {
  81. "ai-qa": "ai-qa",
  82. "ai_qa": "ai-qa",
  83. "ai_qa_module": "ai-qa",
  84. "qa": "ai-qa",
  85. "general_chat": "ai-qa",
  86. "ai-writing": "ai-writing",
  87. "ai_writing": "ai-writing",
  88. "writing": "ai-writing",
  89. "document_writing": "ai-writing",
  90. "safety-training": "safety-training",
  91. "safety_training": "safety-training",
  92. "training": "safety-training",
  93. "ppt_outline": "safety-training",
  94. "exam-workshop": "exam-workshop",
  95. "exam_workshop": "exam-workshop",
  96. "exam": "exam-workshop",
  97. "question_bank": "exam-workshop",
  98. }
  99. normalized_value = str(raw_route_mode or "ai-qa").strip().lower()
  100. return route_mode_mapping.get(normalized_value, "ai-qa")
  101. def _route_mode_to_business_type(self, route_mode: str) -> int:
  102. return {
  103. "ai-qa": 0,
  104. "safety-training": 1,
  105. "ai-writing": 2,
  106. "exam-workshop": 3,
  107. }.get(route_mode, 0)
  108. async def extract_keywords(self, question: str) -> str:
  109. """从问题中提炼搜索关键词"""
  110. # 使用prompt加载器加载关键词提取prompt(如果配置了的话)
  111. # 这里暂时保留原有逻辑,可以后续添加到prompt配置中
  112. keyword_prompt = """你是一个关键词提取助手。请从用户的问题中提炼出最核心的搜索关键词。
  113. 要求:
  114. 1. 提取2-5个最关键的词语
  115. 2. 去除语气词、助词等无意义词汇
  116. 3. 保留专业术语和核心概念
  117. 4. 以空格分隔多个关键词
  118. 直接返回关键词,不要其他说明。
  119. 用户问题:"""
  120. messages = [
  121. {"role": "system", "content": keyword_prompt},
  122. {"role": "user", "content": question}
  123. ]
  124. try:
  125. keywords = await self.chat(messages)
  126. return keywords.strip()
  127. except Exception as e:
  128. logger.error(f"关键词提取失败: {e}")
  129. # 失败时返回原问题
  130. return question
  131. async def intent_recognition(self, message: str) -> dict:
  132. """意图识别"""
  133. # 使用prompt加载器加载意图识别prompt
  134. intent_prompt = load_prompt("intent_recognition", userMessage=message)
  135. messages = [
  136. {"role": "user", "content": intent_prompt}
  137. ]
  138. try:
  139. # 使用专门的意图识别API和模型
  140. response = await self.chat(messages, model=self.intent_model, api_url=self.intent_api_url)
  141. logger.info(f"意图识别原始响应: {response[:500]}")
  142. result = self._extract_first_json_object(response)
  143. if result:
  144. # 兼容模板输出的 "intent" 和 "intent_type" 两种字段名
  145. intent_type = (result.get("intent_type")
  146. or result.get("intent") or "").lower()
  147. # 统一设置 intent_type 字段,确保下游一致
  148. result["intent_type"] = intent_type
  149. # 优先使用模型返回的 direct_answer,否则使用预设回复
  150. direct_answer = result.get("direct_answer", "")
  151. if intent_type in ("greeting", "问候"):
  152. result["response"] = direct_answer if direct_answer else "您好!我是蜀道集团智能助手,很高兴为您服务。"
  153. elif intent_type in ("faq", "常见问题"):
  154. result["response"] = direct_answer if direct_answer else "我可以帮您解答常见问题,请告诉我您想了解什么。"
  155. else:
  156. result["response"] = direct_answer or ""
  157. return result
  158. logger.warning(f"意图识别JSON解析失败,原始响应: {response[:300]}")
  159. return {"intent_type": "general_chat", "confidence": 0.5, "reason": "无法解析JSON", "response": ""}
  160. except Exception as e:
  161. logger.error(f"意图识别失败: {e}")
  162. return {"intent_type": "general_chat", "confidence": 0.5, "reason": str(e), "response": ""}
  163. async def module_dispatch_recognition(self, message: str) -> dict:
  164. """顶层模块分发识别"""
  165. def build_dispatch_result(route_mode: str, confidence: float, reason: str) -> dict:
  166. normalized_route_mode = self._normalize_route_mode(route_mode)
  167. return {
  168. "route_mode": normalized_route_mode,
  169. "business_type": self._route_mode_to_business_type(normalized_route_mode),
  170. "confidence": confidence,
  171. "reason": reason,
  172. }
  173. def explicit_rule_route(user_message: str) -> Optional[dict]:
  174. normalized_message = (user_message or "").strip().lower()
  175. if not normalized_message:
  176. return None
  177. exam_keywords = (
  178. "试卷", "题库", "题目", "考题", "考试", "考核", "出题", "组卷", "练习题"
  179. )
  180. strong_training_keywords = (
  181. "培训课件", "培训大纲", "培训讲稿", "培训计划", "培训材料", "培训资料", "培训ppt"
  182. )
  183. training_keywords = (
  184. "课件", "讲稿", "大纲", "ppt",
  185. )
  186. writing_action_keywords = (
  187. "写", "写个", "写一份", "写一个", "起草", "草拟", "拟一份", "拟写", "撰写", "生成", "润色", "改写", "给我一份", "帮我出一份", "整理一份", "拟定", "编写",
  188. )
  189. writing_document_keywords = (
  190. "通知", "方案", "报告", "制度", "纪要", "函", "总结", "公文", "申请", "发言稿", "倡议书", "要点", "方法", "流程", "预案", "指南", "手册", "细则",
  191. )
  192. if any(keyword in normalized_message for keyword in exam_keywords):
  193. return build_dispatch_result("exam-workshop", 0.97, "显式命中考试工坊关键词")
  194. if any(keyword in normalized_message for keyword in strong_training_keywords):
  195. return build_dispatch_result("safety-training", 0.96, "显式要求生成核心培训物料")
  196. has_training_keyword = any(
  197. keyword in normalized_message for keyword in training_keywords) or "培训" in normalized_message
  198. has_writing_action = any(
  199. keyword in normalized_message for keyword in writing_action_keywords)
  200. has_writing_document = any(
  201. keyword in normalized_message for keyword in writing_document_keywords)
  202. if has_training_keyword and has_writing_document:
  203. return build_dispatch_result("ai-writing", 0.95, "培训场景下显式要求撰写文稿,优先归入AI写作")
  204. if has_training_keyword:
  205. return build_dispatch_result("safety-training", 0.93, "显式命中安全培训关键词")
  206. if has_writing_document and has_writing_action:
  207. return build_dispatch_result("ai-writing", 0.98, "显式要求撰写正式文稿,优先归入AI写作")
  208. return None
  209. def keyword_fallback_route(user_message: str) -> dict:
  210. explicit_route = explicit_rule_route(user_message)
  211. if explicit_route:
  212. return explicit_route
  213. normalized_message = (user_message or "").strip().lower()
  214. if not normalized_message:
  215. return build_dispatch_result("ai-qa", 0.3, "空消息回退到AI助手")
  216. exam_keywords = (
  217. "试卷", "题库", "题目", "考题", "考试", "考核", "出题", "组卷", "练习题"
  218. )
  219. training_keywords = (
  220. "培训课件", "培训大纲", "培训讲稿", "培训计划", "培训材料", "培训ppt", "课件", "讲稿", "大纲", "ppt"
  221. )
  222. writing_action_keywords = (
  223. "写", "写个", "写一份", "写一个", "起草", "草拟", "拟一份", "拟写", "撰写", "生成", "润色", "改写", "给我一份", "帮我出一份", "整理一份", "拟定", "编写"
  224. )
  225. writing_document_keywords = (
  226. "通知", "方案", "报告", "制度", "纪要", "函", "总结", "公文", "申请", "发言稿", "倡议书", "要点", "方法", "流程", "预案", "指南", "手册", "细则"
  227. )
  228. if any(keyword in normalized_message for keyword in exam_keywords):
  229. return build_dispatch_result("exam-workshop", 0.85, "关键词规则命中考试工坊")
  230. if any(keyword in normalized_message for keyword in training_keywords):
  231. return build_dispatch_result("safety-training", 0.8, "关键词规则命中安全培训")
  232. has_writing_action = any(
  233. keyword in normalized_message for keyword in writing_action_keywords
  234. )
  235. has_writing_document = any(
  236. keyword in normalized_message for keyword in writing_document_keywords
  237. )
  238. if has_writing_document and has_writing_action:
  239. return build_dispatch_result("ai-writing", 0.8, "关键词规则命中AI写作")
  240. if "培训" in normalized_message and ("通知" in normalized_message or "方案" in normalized_message):
  241. return build_dispatch_result("ai-writing", 0.78, "培训类文稿回退到AI写作")
  242. if "培训" in normalized_message:
  243. return build_dispatch_result("safety-training", 0.72, "培训关键词回退到安全培训")
  244. return build_dispatch_result("ai-qa", 0.5, "未命中规则,回退到AI助手")
  245. explicit_route = explicit_rule_route(message)
  246. if explicit_route:
  247. logger.info(
  248. f"模块分发快速命中显式规则: route={explicit_route['route_mode']}, "
  249. f"message={str(message or '')[:120]}"
  250. )
  251. return explicit_route
  252. dispatch_prompt = load_prompt("module_dispatch", userMessage=message)
  253. messages = [
  254. {"role": "user", "content": dispatch_prompt}
  255. ]
  256. try:
  257. response = await self.chat(messages, model=self.intent_model, api_url=self.intent_api_url)
  258. logger.info(f"模块分发原始响应: {response[:500]}")
  259. result = self._extract_first_json_object(response)
  260. if not result:
  261. logger.warning(f"模块分发JSON解析失败,原始响应: {response[:300]}")
  262. return keyword_fallback_route(message)
  263. normalized_route_mode = self._normalize_route_mode(
  264. result.get("route_mode")
  265. or result.get("target_module")
  266. or result.get("intent_type")
  267. or result.get("intent")
  268. or "ai_qa"
  269. )
  270. return build_dispatch_result(
  271. normalized_route_mode,
  272. float(result.get("confidence", 0.5) or 0.5),
  273. result.get("reason", "")
  274. )
  275. except Exception as e:
  276. logger.error(f"模块分发识别失败: {e}")
  277. return keyword_fallback_route(message)
  278. async def chat(
  279. self,
  280. messages: list,
  281. model: str = None,
  282. api_url: str = None,
  283. disable_reasoning: bool = False,
  284. ) -> str:
  285. """同步聊天"""
  286. final_messages = messages
  287. if disable_reasoning:
  288. reasoning_control_message = {
  289. "role": "system",
  290. "content": (
  291. "本次回复禁止输出任何思维链、推理过程、Thinking Process、Reasoning、分析步骤、"
  292. "前言、后记或 JSON 之外的说明文字。请只输出最终结果。"
  293. )
  294. }
  295. final_messages = [reasoning_control_message, *messages]
  296. data = {
  297. "model": model or self.model,
  298. "messages": final_messages,
  299. "stream": False # 明确指定非流式
  300. }
  301. if data["model"] == self.intent_model:
  302. # 意图识别是确定性分类,降低随机性并严格限制生成长度以加速
  303. data["temperature"] = 0.1
  304. data["max_tokens"] = 30
  305. # 使用指定的API URL,默认使用qwen3的URL
  306. target_url = api_url or self.api_url
  307. normalized_target = target_url.rstrip("/")
  308. is_qwen3_target = normalized_target == self.api_url.rstrip("/")
  309. start_at = time.monotonic()
  310. logger.info(
  311. f"[Qwen API] 请求: url={target_url} model={data['model']} "
  312. f"messages={len(final_messages)} disable_reasoning={disable_reasoning}"
  313. )
  314. try:
  315. # 准备请求头
  316. headers = {
  317. "Content-Type": "application/json"
  318. }
  319. # 如果配置中有 token,添加到请求头(兼容需要认证的场景)
  320. if hasattr(settings, 'intent') and hasattr(settings.intent, 'token') and normalized_target == self.intent_api_url.rstrip("/"):
  321. if settings.intent.token:
  322. headers["Authorization"] = f"Bearer {settings.intent.token}"
  323. logger.info(
  324. "[Qwen API] 已添加 Intent API Authorization header")
  325. elif hasattr(settings, 'qwen3') and hasattr(settings.qwen3, 'token') and normalized_target == self.api_url.rstrip("/"):
  326. if settings.qwen3.token:
  327. headers["Authorization"] = f"Bearer {settings.qwen3.token}"
  328. logger.info(
  329. "[Qwen API] 已添加 Qwen3 API Authorization header")
  330. response = await self._client.post(
  331. target_url,
  332. json=data,
  333. headers=headers,
  334. )
  335. elapsed_ms = int((time.monotonic() - start_at) * 1000)
  336. logger.info(
  337. f"[Qwen API] 响应: status={response.status_code} elapsed_ms={elapsed_ms}")
  338. logger.debug(f"[Qwen API] 响应头: {dict(response.headers)}")
  339. logger.debug(
  340. f"[Qwen API] 响应预览: {(response.text[:500] if response.text else '(空响应)')}")
  341. response.raise_for_status()
  342. if not response.text:
  343. logger.error("[Qwen API] 返回空响应")
  344. return ""
  345. if response.text.startswith("data:"):
  346. logger.info("[Qwen API] 检测到流式响应,解析 SSE 格式")
  347. content_parts = []
  348. for line in response.text.split('\n'):
  349. if line.startswith("data:"):
  350. data_str = line[5:].strip()
  351. if data_str and data_str != "[DONE]":
  352. try:
  353. data_json = json.loads(data_str)
  354. delta_content = data_json.get('choices', [{}])[0].get(
  355. 'delta', {}).get('content', '')
  356. if delta_content:
  357. content_parts.append(delta_content)
  358. except json.JSONDecodeError:
  359. continue
  360. final_content = ''.join(content_parts)
  361. logger.info(f"[Qwen API] SSE 解析完成,内容长度: {len(final_content)}")
  362. return final_content
  363. try:
  364. result = response.json()
  365. content = result.get('response', result.get('choices', [{}])[
  366. 0].get('message', {}).get('content', ''))
  367. logger.info(f"[Qwen API] JSON 解析成功,内容长度: {len(content)}")
  368. return content
  369. except json.JSONDecodeError as je:
  370. logger.error(f"[Qwen API] 响应不是有效的 JSON: {response.text[:200]}")
  371. raise ValueError(f"无效的 JSON 响应: {str(je)}")
  372. except httpx.HTTPStatusError as e:
  373. logger.error(
  374. f"[Qwen API] HTTP 错误 - 状态码: {e.response.status_code}, URL: {target_url}")
  375. logger.error(f"[Qwen API] HTTP 错误响应: {e.response.text[:500]}")
  376. if is_qwen3_target and self._should_fallback(e.response.status_code):
  377. return await self._fallback_deepseek(final_messages)
  378. raise
  379. except httpx.RequestError as e:
  380. logger.error(
  381. f"[Qwen API] 请求错误 - URL: {target_url}, 错误: {type(e).__name__}: {str(e)}")
  382. if is_qwen3_target:
  383. return await self._fallback_deepseek(final_messages)
  384. raise
  385. except Exception as e:
  386. logger.error(
  387. f"[Qwen API] 未知错误 - URL: {target_url}, 模型: {data['model']}, 错误: {type(e).__name__}: {str(e)}")
  388. raise
  389. async def stream_chat(self, messages: list) -> AsyncGenerator[str, None]:
  390. """流式聊天"""
  391. data = {
  392. "model": self.model,
  393. "messages": messages,
  394. "stream": True
  395. }
  396. try:
  397. async with self._client.stream(
  398. "POST",
  399. self.api_url,
  400. json=data,
  401. ) as response:
  402. response.raise_for_status()
  403. async for line in response.aiter_lines():
  404. if line.startswith("data: "):
  405. data_str = line[6:]
  406. if data_str == "[DONE]":
  407. break
  408. try:
  409. data_json = json.loads(data_str)
  410. choices = data_json.get('choices', [])
  411. if choices:
  412. content = choices[0].get('delta', {}).get(
  413. 'content', '') or choices[0].get('message', {}).get('content', '')
  414. else:
  415. content = data_json.get('content', '')
  416. if content:
  417. yield content
  418. except json.JSONDecodeError:
  419. continue
  420. except httpx.HTTPStatusError as e:
  421. status_code = e.response.status_code if e.response else 0
  422. logger.error(f"Qwen stream HTTP error: {status_code}")
  423. if self._should_fallback(status_code):
  424. logger.warning("[Qwen API] Stream fallback to DeepSeek")
  425. async for chunk in deepseek_service.stream_chat(messages):
  426. yield chunk
  427. return
  428. raise
  429. except httpx.RequestError as e:
  430. logger.error(f"Qwen stream request error: {type(e).__name__}: {e}")
  431. logger.warning("[Qwen API] Stream fallback to DeepSeek")
  432. async for chunk in deepseek_service.stream_chat(messages):
  433. yield chunk
  434. return
  435. except Exception as e:
  436. logger.error(f"Qwen 流式 API 调用失败: {e}")
  437. raise
  438. # 全局实例
  439. qwen_service = QwenService()