""" DashScope客户端封装 封装与阿里云百炼平台的交互 需求: 2.2, 3.1, 3.2, 3.3, 3.4 支持: 流式输出、Token消耗输出、思考模式、图像输入、联网搜索 联网搜索需求: 1.1, 2.1, 2.2, 2.3, 5.1 """ import logging from typing import Dict, Generator, List, Optional, Union import dashscope from dashscope import Generation, MultiModalConversation from app.schemas.llm_schema import SearchOptions, SearchResult from app.services.vertical_domain_processor import VerticalDomainProcessor logger = logging.getLogger(__name__) # DashScope API基础URL DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/api/v1" class DashScopeClient: """百炼平台DashScope客户端""" def __init__(self, api_key: str): """ 初始化客户端 Args: api_key: 用户的API密钥(从用户数据动态加载) """ self.api_key = api_key # 使用中国站Endpoint(如需国际站请改为 https://dashscope-intl.aliyuncs.com/api/v1) dashscope.base_http_api_url = DASHSCOPE_BASE_URL def _build_search_params(self, search_options: SearchOptions) -> Dict: """ 构建搜索参数 Args: search_options: 搜索配置选项 Returns: 搜索参数字典 """ if not search_options or not search_options.enable_search: return {} search_params = {} # 基础搜索参数 if search_options.forced_search: search_params["forced_search"] = True if search_options.enable_search_extension: search_params["enable_search_extension"] = True # 搜索策略和时效性 search_options_dict = { "search_strategy": search_options.search_strategy } # 时效性参数(仅turbo策略支持) if (search_options.freshness is not None and search_options.search_strategy == "turbo"): search_options_dict["freshness"] = search_options.freshness # 自然语言搜索控制 if search_options.intention_options: search_options_dict["intention_options"] = search_options.intention_options search_params["search_options"] = search_options_dict # 搜索来源和引用相关参数 if search_options.enable_source: search_params["enable_source"] = True if search_options.enable_citation: search_params["enable_citation"] = True search_params["citation_format"] = search_options.citation_format if search_options.prepend_search_result: search_params["prepend_search_result"] = True logger.info(f"构建搜索参数: {search_params}") return search_params def _extract_search_info(self, response) -> Optional[Dict]: """ 提取搜索信息 Args: response: DashScope API响应 Returns: 搜索信息字典,如果没有搜索信息则返回None """ try: # 检查响应中是否包含搜索信息 if hasattr(response, 'output') and hasattr(response.output, 'search_info'): search_info = response.output.search_info logger.info(f"提取到搜索信息: {len(search_info.get('search_results', []))} 个结果") # 处理垂直领域搜索结果 enhanced_search_info = VerticalDomainProcessor.process_vertical_domain_search(search_info) return enhanced_search_info # 对于字典格式的响应 if isinstance(response, dict): output = response.get('output', {}) if 'search_info' in output: search_info = output['search_info'] logger.info(f"提取到搜索信息: {len(search_info.get('search_results', []))} 个结果") # 处理垂直领域搜索结果 enhanced_search_info = VerticalDomainProcessor.process_vertical_domain_search(search_info) return enhanced_search_info return None except Exception as e: logger.warning(f"提取搜索信息时出错: {e}") return None def _extract_search_results(self, search_info: Dict) -> List[SearchResult]: """ 从搜索信息中提取搜索结果列表 Args: search_info: 搜索信息字典 Returns: 搜索结果列表 """ if not search_info or 'search_results' not in search_info: return [] results = [] for item in search_info['search_results']: try: result = SearchResult( index=item.get('index', 0), title=item.get('title', ''), url=item.get('url', ''), snippet=item.get('snippet') ) results.append(result) except Exception as e: logger.warning(f"解析搜索结果项时出错: {e}, 项目: {item}") continue return results def _has_image_content(self, messages: List[Dict]) -> bool: """检查消息中是否包含图像内容""" for msg in messages: content = msg.get('content') if isinstance(content, list): for item in content: if isinstance(item, dict) and item.get('type') == 'image_url': return True return False def _should_use_multimodal_api(self, model: str) -> bool: """判断是否应该使用多模态API 根据模型名称判断是否需要使用MultiModalConversation API: - 名称含 -vl- 或 -vl 结尾的视觉语言模型 - 名称含 -omni- 或 -omni 结尾的全模态模型 - 名称含 multimodal 的模型 - 显式维护的特殊模型名单 """ model_lower = model.lower() # 按命名规律匹配 VL / omni / 多模态模型 if '-vl-' in model_lower or model_lower.endswith('-vl'): return True if '-omni-' in model_lower or model_lower.endswith('-omni'): return True if 'multimodal' in model_lower: return True # 显式列表(不符合上述命名规律的特殊多模态模型) explicit_multimodal = { 'qwen3.5-flash', 'qwen3.5-plus', 'qwen3.6-plus', 'kimi-k2.5', } return model in explicit_multimodal def _convert_to_multimodal_format(self, messages: List[Dict]) -> List[Dict]: """将消息转换为多模态格式""" converted = [] for msg in messages: role = msg.get('role') content = msg.get('content') if isinstance(content, str): converted.append({'role': role, 'content': [{'text': content}]}) elif isinstance(content, list): new_content = [] for item in content: if isinstance(item, dict): if item.get('type') == 'text': new_content.append({'text': item.get('text', '')}) elif item.get('type') == 'image_url': image_url = item.get('image_url', {}) url = image_url.get('url', '') new_content.append({'image': url}) elif isinstance(item, str): new_content.append({'text': item}) converted.append({'role': role, 'content': new_content}) else: converted.append({'role': role, 'content': [{'text': str(content)}]}) return converted def call( self, model: str, messages: List[Dict], temperature: Optional[float] = None, top_p: Optional[float] = None, max_tokens: Optional[int] = None, enable_thinking: bool = False, thinking_budget: Optional[int] = None, search_options: Optional[SearchOptions] = None ) -> Dict: """ 非流式调用 Args: model: 模型名称 messages: 对话消息列表 temperature: 采样温度 top_p: 核采样概率 max_tokens: 最大输出token数 enable_thinking: 是否启用思考模式 thinking_budget: 思考过程的最大Token数 search_options: 搜索配置选项 Returns: API响应字典 """ has_image = self._has_image_content(messages) use_multimodal = self._should_use_multimodal_api(model) # 构建搜索参数 search_params = self._build_search_params(search_options) if search_options else {} if has_image or use_multimodal: # 使用多模态API converted_messages = self._convert_to_multimodal_format(messages) kwargs = { 'api_key': self.api_key, 'model': model, 'messages': converted_messages } if max_tokens is not None: kwargs['max_tokens'] = max_tokens # 多模态API暂不支持搜索功能,记录警告 if search_params: logger.warning("多模态API暂不支持联网搜索功能,将忽略搜索参数") response = MultiModalConversation.call(**kwargs) else: # 使用文本生成API kwargs = { 'api_key': self.api_key, 'model': model, 'messages': messages, 'result_format': 'message' } if temperature is not None: kwargs['temperature'] = temperature if top_p is not None: kwargs['top_p'] = top_p if max_tokens is not None: kwargs['max_tokens'] = max_tokens if enable_thinking: kwargs['enable_thinking'] = True if thinking_budget is not None: kwargs['thinking_budget'] = thinking_budget # 添加搜索参数 if search_params: kwargs['enable_search'] = True kwargs.update(search_params) logger.info(f"启用联网搜索: model={model}, search_strategy={search_options.search_strategy}") response = Generation.call(**kwargs) return response def call_stream( self, model: str, messages: List[Dict], temperature: Optional[float] = None, top_p: Optional[float] = None, max_tokens: Optional[int] = None, enable_thinking: bool = False, thinking_budget: Optional[int] = None, search_options: Optional[SearchOptions] = None ) -> Generator: """ 流式调用 Args: model: 模型名称 messages: 对话消息列表 temperature: 采样温度 top_p: 核采样概率 max_tokens: 最大输出token数 enable_thinking: 是否启用思考模式 thinking_budget: 思考过程的最大Token数 search_options: 搜索配置选项 Yields: 流式响应块 """ has_image = self._has_image_content(messages) use_multimodal = self._should_use_multimodal_api(model) # 构建搜索参数 search_params = self._build_search_params(search_options) if search_options else {} logger.info(f"DashScope流式调用: model={model}, has_image={has_image}, use_multimodal={use_multimodal}, enable_thinking={enable_thinking}, enable_search={bool(search_params)}") try: if has_image or use_multimodal: # 使用多模态API流式调用 converted_messages = self._convert_to_multimodal_format(messages) kwargs = { 'api_key': self.api_key, 'model': model, 'messages': converted_messages, 'stream': True, 'incremental_output': True } if max_tokens is not None: kwargs['max_tokens'] = max_tokens # 多模态API暂不支持搜索功能,记录警告 if search_params: logger.warning("多模态API暂不支持联网搜索功能,将忽略搜索参数") responses = MultiModalConversation.call(**kwargs) else: # 使用文本生成API流式调用 kwargs = { 'api_key': self.api_key, 'model': model, 'messages': messages, 'result_format': 'message', 'stream': True, 'incremental_output': True } if temperature is not None: kwargs['temperature'] = temperature if top_p is not None: kwargs['top_p'] = top_p if max_tokens is not None: kwargs['max_tokens'] = max_tokens if enable_thinking: kwargs['enable_thinking'] = True if thinking_budget is not None: kwargs['thinking_budget'] = thinking_budget # 添加搜索参数 if search_params: kwargs['enable_search'] = True kwargs.update(search_params) logger.info(f"启用流式联网搜索: model={model}, search_strategy={search_options.search_strategy}") responses = Generation.call(**kwargs) for response in responses: yield response except Exception as e: logger.error(f"DashScope流式调用异常: {type(e).__name__}: {str(e)}") raise def call_with_search( self, model: str, messages: List[Dict], search_options: SearchOptions, **kwargs ) -> Dict: """ 支持搜索的非流式调用(便利方法) Args: model: 模型名称 messages: 对话消息列表 search_options: 搜索配置选项 **kwargs: 其他参数 Returns: API响应字典 """ return self.call( model=model, messages=messages, search_options=search_options, **kwargs ) def call_stream_with_search( self, model: str, messages: List[Dict], search_options: SearchOptions, **kwargs ) -> Generator: """ 支持搜索的流式调用(便利方法) Args: model: 模型名称 messages: 对话消息列表 search_options: 搜索配置选项 **kwargs: 其他参数 Yields: 流式响应块 """ return self.call_stream( model=model, messages=messages, search_options=search_options, **kwargs ) def extract_search_results_from_response(self, response) -> List[SearchResult]: """ 从响应中提取搜索结果(便利方法) Args: response: DashScope API响应 Returns: 搜索结果列表 """ search_info = self._extract_search_info(response) if search_info: return self._extract_search_results(search_info) return [] def has_search_results(self, response) -> bool: """ 检查响应是否包含搜索结果 Args: response: DashScope API响应 Returns: 是否包含搜索结果 """ search_info = self._extract_search_info(response) return search_info is not None and len(search_info.get('search_results', [])) > 0