| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- """
- DashScope客户端封装
- 封装与阿里云百炼平台的交互
- 需求: 2.2, 3.1, 3.2, 3.3, 3.4
- 支持: 流式输出、Token消耗输出、思考模式、图像输入、联网搜索
- 联网搜索需求: 1.1, 2.1, 2.2, 2.3, 5.1
- """
- import logging
- from typing import Dict, Generator, List, Optional, Union
- import dashscope
- from dashscope import Generation, MultiModalConversation
- from app.schemas.llm_schema import SearchOptions, SearchResult
- from app.services.vertical_domain_processor import VerticalDomainProcessor
- logger = logging.getLogger(__name__)
- # DashScope API基础URL
- DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/api/v1"
- class DashScopeClient:
- """百炼平台DashScope客户端"""
-
- def __init__(self, api_key: str):
- """
- 初始化客户端
-
- Args:
- api_key: 用户的API密钥(从用户数据动态加载)
- """
- self.api_key = api_key
- # 使用中国站Endpoint(如需国际站请改为 https://dashscope-intl.aliyuncs.com/api/v1)
- dashscope.base_http_api_url = DASHSCOPE_BASE_URL
-
- def _build_search_params(self, search_options: SearchOptions) -> Dict:
- """
- 构建搜索参数
-
- Args:
- search_options: 搜索配置选项
-
- Returns:
- 搜索参数字典
- """
- if not search_options or not search_options.enable_search:
- return {}
-
- search_params = {}
-
- # 基础搜索参数
- if search_options.forced_search:
- search_params["forced_search"] = True
-
- if search_options.enable_search_extension:
- search_params["enable_search_extension"] = True
-
- # 搜索策略和时效性
- search_options_dict = {
- "search_strategy": search_options.search_strategy
- }
-
- # 时效性参数(仅turbo策略支持)
- if (search_options.freshness is not None and
- search_options.search_strategy == "turbo"):
- search_options_dict["freshness"] = search_options.freshness
-
- # 自然语言搜索控制
- if search_options.intention_options:
- search_options_dict["intention_options"] = search_options.intention_options
-
- search_params["search_options"] = search_options_dict
-
- # 搜索来源和引用相关参数
- if search_options.enable_source:
- search_params["enable_source"] = True
-
- if search_options.enable_citation:
- search_params["enable_citation"] = True
- search_params["citation_format"] = search_options.citation_format
-
- if search_options.prepend_search_result:
- search_params["prepend_search_result"] = True
-
- logger.info(f"构建搜索参数: {search_params}")
- return search_params
-
- def _extract_search_info(self, response) -> Optional[Dict]:
- """
- 提取搜索信息
-
- Args:
- response: DashScope API响应
-
- Returns:
- 搜索信息字典,如果没有搜索信息则返回None
- """
- try:
- # 检查响应中是否包含搜索信息
- if hasattr(response, 'output') and hasattr(response.output, 'search_info'):
- search_info = response.output.search_info
- logger.info(f"提取到搜索信息: {len(search_info.get('search_results', []))} 个结果")
-
- # 处理垂直领域搜索结果
- enhanced_search_info = VerticalDomainProcessor.process_vertical_domain_search(search_info)
- return enhanced_search_info
-
- # 对于字典格式的响应
- if isinstance(response, dict):
- output = response.get('output', {})
- if 'search_info' in output:
- search_info = output['search_info']
- logger.info(f"提取到搜索信息: {len(search_info.get('search_results', []))} 个结果")
-
- # 处理垂直领域搜索结果
- enhanced_search_info = VerticalDomainProcessor.process_vertical_domain_search(search_info)
- return enhanced_search_info
-
- return None
- except Exception as e:
- logger.warning(f"提取搜索信息时出错: {e}")
- return None
-
- def _extract_search_results(self, search_info: Dict) -> List[SearchResult]:
- """
- 从搜索信息中提取搜索结果列表
-
- Args:
- search_info: 搜索信息字典
-
- Returns:
- 搜索结果列表
- """
- if not search_info or 'search_results' not in search_info:
- return []
-
- results = []
- for item in search_info['search_results']:
- try:
- result = SearchResult(
- index=item.get('index', 0),
- title=item.get('title', ''),
- url=item.get('url', ''),
- snippet=item.get('snippet')
- )
- results.append(result)
- except Exception as e:
- logger.warning(f"解析搜索结果项时出错: {e}, 项目: {item}")
- continue
-
- return results
-
- def _has_image_content(self, messages: List[Dict]) -> bool:
- """检查消息中是否包含图像内容"""
- for msg in messages:
- content = msg.get('content')
- if isinstance(content, list):
- for item in content:
- if isinstance(item, dict) and item.get('type') == 'image_url':
- return True
- return False
-
- def _should_use_multimodal_api(self, model: str) -> bool:
- """判断是否应该使用多模态API
-
- 根据模型名称判断是否需要使用MultiModalConversation API:
- - 名称含 -vl- 或 -vl 结尾的视觉语言模型
- - 名称含 -omni- 或 -omni 结尾的全模态模型
- - 名称含 multimodal 的模型
- - 显式维护的特殊模型名单
- """
- model_lower = model.lower()
- # 按命名规律匹配 VL / omni / 多模态模型
- if '-vl-' in model_lower or model_lower.endswith('-vl'):
- return True
- if '-omni-' in model_lower or model_lower.endswith('-omni'):
- return True
- if 'multimodal' in model_lower:
- return True
- # 显式列表(不符合上述命名规律的特殊多模态模型)
- explicit_multimodal = {
- 'qwen3.5-flash',
- 'qwen3.5-plus',
- 'qwen3.6-plus',
- 'kimi-k2.5',
- }
- return model in explicit_multimodal
-
- def _convert_to_multimodal_format(self, messages: List[Dict]) -> List[Dict]:
- """将消息转换为多模态格式"""
- converted = []
- for msg in messages:
- role = msg.get('role')
- content = msg.get('content')
-
- if isinstance(content, str):
- converted.append({'role': role, 'content': [{'text': content}]})
- elif isinstance(content, list):
- new_content = []
- for item in content:
- if isinstance(item, dict):
- if item.get('type') == 'text':
- new_content.append({'text': item.get('text', '')})
- elif item.get('type') == 'image_url':
- image_url = item.get('image_url', {})
- url = image_url.get('url', '')
- new_content.append({'image': url})
- elif isinstance(item, str):
- new_content.append({'text': item})
- converted.append({'role': role, 'content': new_content})
- else:
- converted.append({'role': role, 'content': [{'text': str(content)}]})
- return converted
-
- def call(
- self,
- model: str,
- messages: List[Dict],
- temperature: Optional[float] = None,
- top_p: Optional[float] = None,
- max_tokens: Optional[int] = None,
- enable_thinking: bool = False,
- thinking_budget: Optional[int] = None,
- search_options: Optional[SearchOptions] = None
- ) -> Dict:
- """
- 非流式调用
-
- Args:
- model: 模型名称
- messages: 对话消息列表
- temperature: 采样温度
- top_p: 核采样概率
- max_tokens: 最大输出token数
- enable_thinking: 是否启用思考模式
- thinking_budget: 思考过程的最大Token数
- search_options: 搜索配置选项
-
- Returns:
- API响应字典
- """
- has_image = self._has_image_content(messages)
- use_multimodal = self._should_use_multimodal_api(model)
-
- # 构建搜索参数
- search_params = self._build_search_params(search_options) if search_options else {}
-
- if has_image or use_multimodal:
- # 使用多模态API
- converted_messages = self._convert_to_multimodal_format(messages)
- kwargs = {
- 'api_key': self.api_key,
- 'model': model,
- 'messages': converted_messages
- }
- if max_tokens is not None:
- kwargs['max_tokens'] = max_tokens
-
- # 多模态API暂不支持搜索功能,记录警告
- if search_params:
- logger.warning("多模态API暂不支持联网搜索功能,将忽略搜索参数")
-
- response = MultiModalConversation.call(**kwargs)
- else:
- # 使用文本生成API
- kwargs = {
- 'api_key': self.api_key,
- 'model': model,
- 'messages': messages,
- 'result_format': 'message'
- }
-
- if temperature is not None:
- kwargs['temperature'] = temperature
- if top_p is not None:
- kwargs['top_p'] = top_p
- if max_tokens is not None:
- kwargs['max_tokens'] = max_tokens
- if enable_thinking:
- kwargs['enable_thinking'] = True
- if thinking_budget is not None:
- kwargs['thinking_budget'] = thinking_budget
-
- # 添加搜索参数
- if search_params:
- kwargs['enable_search'] = True
- kwargs.update(search_params)
- logger.info(f"启用联网搜索: model={model}, search_strategy={search_options.search_strategy}")
-
- response = Generation.call(**kwargs)
-
- return response
-
- def call_stream(
- self,
- model: str,
- messages: List[Dict],
- temperature: Optional[float] = None,
- top_p: Optional[float] = None,
- max_tokens: Optional[int] = None,
- enable_thinking: bool = False,
- thinking_budget: Optional[int] = None,
- search_options: Optional[SearchOptions] = None
- ) -> Generator:
- """
- 流式调用
-
- Args:
- model: 模型名称
- messages: 对话消息列表
- temperature: 采样温度
- top_p: 核采样概率
- max_tokens: 最大输出token数
- enable_thinking: 是否启用思考模式
- thinking_budget: 思考过程的最大Token数
- search_options: 搜索配置选项
-
- Yields:
- 流式响应块
- """
- has_image = self._has_image_content(messages)
- use_multimodal = self._should_use_multimodal_api(model)
-
- # 构建搜索参数
- search_params = self._build_search_params(search_options) if search_options else {}
-
- logger.info(f"DashScope流式调用: model={model}, has_image={has_image}, use_multimodal={use_multimodal}, enable_thinking={enable_thinking}, enable_search={bool(search_params)}")
-
- try:
- if has_image or use_multimodal:
- # 使用多模态API流式调用
- converted_messages = self._convert_to_multimodal_format(messages)
- kwargs = {
- 'api_key': self.api_key,
- 'model': model,
- 'messages': converted_messages,
- 'stream': True,
- 'incremental_output': True
- }
- if max_tokens is not None:
- kwargs['max_tokens'] = max_tokens
-
- # 多模态API暂不支持搜索功能,记录警告
- if search_params:
- logger.warning("多模态API暂不支持联网搜索功能,将忽略搜索参数")
-
- responses = MultiModalConversation.call(**kwargs)
- else:
- # 使用文本生成API流式调用
- kwargs = {
- 'api_key': self.api_key,
- 'model': model,
- 'messages': messages,
- 'result_format': 'message',
- 'stream': True,
- 'incremental_output': True
- }
-
- if temperature is not None:
- kwargs['temperature'] = temperature
- if top_p is not None:
- kwargs['top_p'] = top_p
- if max_tokens is not None:
- kwargs['max_tokens'] = max_tokens
- if enable_thinking:
- kwargs['enable_thinking'] = True
- if thinking_budget is not None:
- kwargs['thinking_budget'] = thinking_budget
-
- # 添加搜索参数
- if search_params:
- kwargs['enable_search'] = True
- kwargs.update(search_params)
- logger.info(f"启用流式联网搜索: model={model}, search_strategy={search_options.search_strategy}")
-
- responses = Generation.call(**kwargs)
-
- for response in responses:
- yield response
- except Exception as e:
- logger.error(f"DashScope流式调用异常: {type(e).__name__}: {str(e)}")
- raise
-
- def call_with_search(
- self,
- model: str,
- messages: List[Dict],
- search_options: SearchOptions,
- **kwargs
- ) -> Dict:
- """
- 支持搜索的非流式调用(便利方法)
-
- Args:
- model: 模型名称
- messages: 对话消息列表
- search_options: 搜索配置选项
- **kwargs: 其他参数
-
- Returns:
- API响应字典
- """
- return self.call(
- model=model,
- messages=messages,
- search_options=search_options,
- **kwargs
- )
-
- def call_stream_with_search(
- self,
- model: str,
- messages: List[Dict],
- search_options: SearchOptions,
- **kwargs
- ) -> Generator:
- """
- 支持搜索的流式调用(便利方法)
-
- Args:
- model: 模型名称
- messages: 对话消息列表
- search_options: 搜索配置选项
- **kwargs: 其他参数
-
- Yields:
- 流式响应块
- """
- return self.call_stream(
- model=model,
- messages=messages,
- search_options=search_options,
- **kwargs
- )
-
- def extract_search_results_from_response(self, response) -> List[SearchResult]:
- """
- 从响应中提取搜索结果(便利方法)
-
- Args:
- response: DashScope API响应
-
- Returns:
- 搜索结果列表
- """
- search_info = self._extract_search_info(response)
- if search_info:
- return self._extract_search_results(search_info)
- return []
-
- def has_search_results(self, response) -> bool:
- """
- 检查响应是否包含搜索结果
-
- Args:
- response: DashScope API响应
-
- Returns:
- 是否包含搜索结果
- """
- search_info = self._extract_search_info(response)
- return search_info is not None and len(search_info.get('search_results', [])) > 0
|