""" LLM服务层 提供LLM对话的业务逻辑处理,集成会话和消息管理 需求: 1.4, 2.2, 2.3, 3.1, 3.2 支持: 流式输出、Token消耗输出、思考模式、图像输入、联网搜索 搜索需求: 1.1, 1.3, 1.4, 8.1, 8.4, 8.5 """ import json import logging from decimal import Decimal from typing import AsyncGenerator, List, Optional, Union from sqlalchemy.orm import Session from sqlalchemy import or_ from fastapi import HTTPException from app.models.model import ModelNew, ModelCategory from app.schemas.llm_schema import ( ChatRequest, ChatResponse, StreamChunk, UsageInfo, EnhancedChatRequest, EnhancedChatResponse, EnhancedStreamChunk, SearchOptions, SearchResult ) from app.schemas.model_schema import ModelResponse from app.services.dashscope_client import DashScopeClient from app.services.search_options_validator import SearchOptionsValidator from app.services.citation_formatter import CitationFormatter from app.services.vertical_domain_processor import VerticalDomainProcessor from app.services.stream_search_state import StreamSearchStateManager logger = logging.getLogger(__name__) class LLMService: """LLM业务服务类""" def __init__(self, db: Session, api_key: str = None, user_id: str = None): """ 初始化LLM服务 Args: db: 数据库会话 api_key: 用户的API密钥(从用户数据动态加载) user_id: 用户ID(用于会话管理) """ self.db = db self.api_key = api_key self.user_id = user_id self.client = DashScopeClient(api_key) if api_key else None self.search_validator = SearchOptionsValidator(db) self.citation_formatter = CitationFormatter() self.vertical_processor = VerticalDomainProcessor() def validate_model(self, model_name: str) -> bool: """ 验证模型是否为有效的LLM或多模态模型(category=0 or 1), 排除 OCR 专用模型。 """ OCR_ONLY_MODELS = {'qwen-vl-ocr', 'qwen-vl-ocr-latest'} if model_name in OCR_ONLY_MODELS: return False model = self.db.query(ModelNew).filter( ModelNew.model_code == model_name, or_( ModelNew.categories.any(int(ModelCategory.LLM)), ModelNew.categories.any(int(ModelCategory.MULTIMODAL)) ) ).first() return model is not None def get_model_by_title(self, model_title: str) -> Optional[ModelNew]: return self.db.query(ModelNew).filter(ModelNew.model_code == model_title).first() def _convert_messages(self, messages) -> List[dict]: """ 将请求消息转换为API调用格式 支持纯文本和多模态内容 """ result = [] for msg in messages: content = msg.content if isinstance(content, str): result.append({"role": msg.role, "content": content}) elif isinstance(content, list): # 多模态内容 converted_content = [] for item in content: if hasattr(item, 'type'): if item.type == 'text': converted_content.append({"type": "text", "text": item.text}) elif item.type == 'image_url': converted_content.append({"type": "image_url", "image_url": item.image_url}) elif isinstance(item, dict): converted_content.append(item) result.append({"role": msg.role, "content": converted_content}) else: result.append({"role": msg.role, "content": str(content)}) return result def _get_text_content(self, content) -> str: """从消息内容中提取文本""" if isinstance(content, str): return content elif isinstance(content, list): texts = [] for item in content: if hasattr(item, 'type') and item.type == 'text': texts.append(item.text) elif isinstance(item, dict) and item.get('type') == 'text': texts.append(item.get('text', '')) return ' '.join(texts) return str(content) def chat( self, request: ChatRequest, conversation_id: Optional[int] = None ) -> ChatResponse: """ 非流式对话 Args: request: 对话请求 conversation_id: 会话ID(可选,如果提供则记录消息) Returns: 对话响应 """ if not self.client: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") if not self.validate_model(request.model): raise HTTPException(status_code=400, detail=f"无效的模型名称或模型类型不匹配: {request.model}") messages = self._convert_messages(request.messages) response = self.client.call( model=request.model, messages=messages, temperature=request.temperature, top_p=request.top_p, max_tokens=request.max_tokens, enable_thinking=request.enable_thinking, thinking_budget=request.thinking_budget ) if response.status_code != 200: raise HTTPException(status_code=502, detail=f"百炼平台返回错误: {response.message}") choice = response.output.choices[0] usage = response.usage # 提取思考内容(安全获取,qwen3 等模型可能返回字典格式) try: reasoning_content = choice.message.get('reasoning_content', '') if hasattr(choice.message, 'get') else getattr(choice.message, 'reasoning_content', None) except Exception: reasoning_content = None # 提取文本内容(处理多模态/思考模式返回的列表格式) msg_content = choice.message.content if isinstance(msg_content, list): content_text = ''.join( item.get('text', '') if isinstance(item, dict) else str(item) for item in msg_content ) elif isinstance(msg_content, dict): content_text = msg_content.get('text', str(msg_content)) else: content_text = msg_content or "" return ChatResponse( content=content_text, finish_reason=choice.finish_reason, usage=UsageInfo( input_tokens=usage.input_tokens, output_tokens=usage.output_tokens, total_tokens=usage.total_tokens ), reasoning_content=reasoning_content ) async def chat_stream( self, request: ChatRequest, conversation_id: Optional[int] = None ) -> AsyncGenerator[str, None]: """ 流式对话,返回SSE事件流 Args: request: 对话请求 conversation_id: 会话ID(可选,如果提供则记录消息) Yields: SSE格式的响应块 """ if not self.client: error_data = {"error": "未配置API密钥,请在用户设置中配置apikey"} yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return if not self.validate_model(request.model): error_data = {"error": f"无效的模型名称或模型类型不匹配: {request.model}"} yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return messages = self._convert_messages(request.messages) # 用于收集完整响应内容 full_content = "" full_reasoning_content = "" final_usage = None try: for response in self.client.call_stream( model=request.model, messages=messages, temperature=request.temperature, top_p=request.top_p, max_tokens=request.max_tokens, enable_thinking=request.enable_thinking, thinking_budget=request.thinking_budget ): if response.status_code != 200: error_msg = getattr(response, 'message', str(response)) logger.error(f"DashScope流式调用错误: status={response.status_code}, message={error_msg}") error_data = {"error": error_msg} yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" break choice = response.output.choices[0] # 获取内容(处理多模态响应) msg_content = choice.message.content if isinstance(msg_content, list): # 多模态响应,提取文本 text_parts = [] for item in msg_content: if isinstance(item, dict) and 'text' in item: text_parts.append(item['text']) content_text = ''.join(text_parts) else: content_text = msg_content or "" # 累积内容 if content_text: full_content += content_text # 获取思考内容(增量)- 安全获取,字段可能不存在 reasoning_content = "" try: reasoning_content = choice.message.get('reasoning_content', '') if hasattr(choice.message, 'get') else getattr(choice.message, 'reasoning_content', '') or '' except (KeyError, AttributeError): pass if reasoning_content: full_reasoning_content += reasoning_content chunk = StreamChunk( content=content_text, finish_reason=choice.finish_reason if choice.finish_reason else None, reasoning_content=reasoning_content if reasoning_content else None ) if choice.finish_reason: usage = response.usage final_usage = usage chunk.usage = UsageInfo( input_tokens=usage.input_tokens, output_tokens=usage.output_tokens, total_tokens=usage.total_tokens ) yield f"data: {chunk.model_dump_json()}\n\n" # 流式完成后记录 token 统计 if conversation_id and self.user_id and final_usage: pass # conversation tracking removed except Exception as e: logger.error(f"DashScope流式调用异常: {type(e).__name__}: {str(e)}") error_data = {"error": f"调用失败: {str(e)}"} yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" def chat_with_search( self, request: EnhancedChatRequest, conversation_id: Optional[int] = None ) -> EnhancedChatResponse: """ 支持搜索的非流式对话 Args: request: 增强的对话请求(包含搜索选项) conversation_id: 会话ID(可选,如果提供则记录消息) Returns: 增强的对话响应(包含搜索信息) """ if not self.client: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") if not self.validate_model(request.model): raise HTTPException(status_code=400, detail=f"无效的模型名称或模型类型不匹配: {request.model}") # 验证和标准化搜索选项 validated_search_options = None if request.search_options: try: validated_search_options = self.search_validator.validate_and_normalize( request.model, request.search_options, self.db ) except ValueError as e: raise HTTPException(status_code=400, detail=f"搜索选项验证失败: {str(e)}") messages = self._convert_messages(request.messages) try: response = self.client.call( model=request.model, messages=messages, temperature=request.temperature, top_p=request.top_p, max_tokens=request.max_tokens, enable_thinking=request.enable_thinking, thinking_budget=request.thinking_budget, search_options=validated_search_options ) if response.status_code != 200: # 搜索失败时的降级处理 if validated_search_options and validated_search_options.enable_search: logger.warning(f"搜索功能调用失败,降级到普通对话: {response.message}") # 重试不带搜索的调用 response = self.client.call( model=request.model, messages=messages, temperature=request.temperature, top_p=request.top_p, max_tokens=request.max_tokens, enable_thinking=request.enable_thinking, thinking_budget=request.thinking_budget ) if response.status_code != 200: raise HTTPException(status_code=502, detail=f"百炼平台返回错误: {response.message}") else: raise HTTPException(status_code=502, detail=f"百炼平台返回错误: {response.message}") choice = response.output.choices[0] usage = response.usage # 提取思考内容 reasoning_content = getattr(choice.message, 'reasoning_content', None) # 提取搜索信息 search_info = self.client._extract_search_info(response) search_results = [] # 处理搜索结果和引用格式化 content = choice.message.content if search_info and validated_search_options: search_results = self.citation_formatter.extract_search_results(search_info) # 处理垂直领域搜索结果 if validated_search_options.enable_search_extension and VerticalDomainProcessor.is_vertical_domain_result(search_info): structured_data = search_info.get("vertical_structured_data") content = VerticalDomainProcessor.format_vertical_domain_response( content, search_info, structured_data ) # 格式化引用和来源 if validated_search_options.enable_citation or validated_search_options.enable_source: content, search_results = self.citation_formatter.format_content_with_citations_and_sources( content=content, search_info=search_info, enable_citation=validated_search_options.enable_citation, citation_format=validated_search_options.citation_format, enable_source=validated_search_options.enable_source ) return EnhancedChatResponse( content=content, finish_reason=choice.finish_reason, usage=UsageInfo( input_tokens=usage.input_tokens, output_tokens=usage.output_tokens, total_tokens=usage.total_tokens ), reasoning_content=reasoning_content, search_info=search_info, search_results=search_results ) except HTTPException: raise except Exception as e: logger.error(f"搜索增强对话调用异常: {type(e).__name__}: {str(e)}") # 错误处理和降级逻辑 if validated_search_options and validated_search_options.enable_search: logger.warning("搜索功能异常,尝试降级到普通对话") try: # 降级到普通对话 fallback_request = ChatRequest( model=request.model, messages=request.messages, stream=False, temperature=request.temperature, top_p=request.top_p, max_tokens=request.max_tokens, conversation_id=conversation_id, enable_thinking=request.enable_thinking, thinking_budget=request.thinking_budget ) fallback_response = self.chat(fallback_request, conversation_id) # 转换为增强响应格式 return EnhancedChatResponse( content=fallback_response.content, finish_reason=fallback_response.finish_reason, usage=fallback_response.usage, reasoning_content=fallback_response.reasoning_content, search_info=None, search_results=[] ) except Exception as fallback_error: logger.error(f"降级对话也失败: {fallback_error}") raise HTTPException(status_code=500, detail=f"对话服务异常: {str(e)}") else: raise HTTPException(status_code=500, detail=f"对话服务异常: {str(e)}") async def chat_stream_with_search( self, request: EnhancedChatRequest, conversation_id: Optional[int] = None ) -> AsyncGenerator[str, None]: """ 支持搜索的流式对话,返回SSE事件流 Args: request: 增强的对话请求(包含搜索选项) conversation_id: 会话ID(可选,如果提供则记录消息) Yields: SSE格式的增强响应块 """ if not self.client: error_data = {"error": "未配置API密钥,请在用户设置中配置apikey"} yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return if not self.validate_model(request.model): error_data = {"error": f"无效的模型名称或模型类型不匹配: {request.model}"} yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return # 验证和标准化搜索选项 validated_search_options = None if request.search_options: try: validated_search_options = self.search_validator.validate_and_normalize( request.model, request.search_options, self.db ) except ValueError as e: error_data = {"error": f"搜索选项验证失败: {str(e)}"} yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return messages = self._convert_messages(request.messages) # 初始化流式搜索状态管理器 state_manager = StreamSearchStateManager(validated_search_options) try: for response in self.client.call_stream( model=request.model, messages=messages, temperature=request.temperature, top_p=request.top_p, max_tokens=request.max_tokens, enable_thinking=request.enable_thinking, thinking_budget=request.thinking_budget, search_options=validated_search_options ): if response.status_code != 200: error_msg = getattr(response, 'message', str(response)) logger.error(f"DashScope流式调用错误: status={response.status_code}, message={error_msg}") # 记录错误到状态管理器 state_manager.handle_error(error_msg) # 搜索失败时的降级处理 if validated_search_options and validated_search_options.enable_search: logger.warning("搜索功能流式调用失败,尝试降级到普通流式对话") try: # 降级到普通流式对话 fallback_request = ChatRequest( model=request.model, messages=request.messages, stream=True, temperature=request.temperature, top_p=request.top_p, max_tokens=request.max_tokens, conversation_id=conversation_id, enable_thinking=request.enable_thinking, thinking_budget=request.thinking_budget ) async for chunk in self.chat_stream(fallback_request, conversation_id): yield chunk return except Exception as fallback_error: logger.error(f"降级流式对话也失败: {fallback_error}") error_data = {"error": error_msg} yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" break choice = response.output.choices[0] # 获取内容(处理多模态响应) msg_content = choice.message.content if isinstance(msg_content, list): # 多模态响应,提取文本 text_parts = [] for item in msg_content: if isinstance(item, dict) and 'text' in item: text_parts.append(item['text']) content_text = ''.join(text_parts) else: content_text = msg_content or "" # 获取思考内容(增量) reasoning_content = "" try: reasoning_content = choice.message.get('reasoning_content', '') if hasattr(choice.message, 'get') else getattr(choice.message, 'reasoning_content', '') or '' except (KeyError, AttributeError): pass # 提取搜索信息(如果存在) current_search_info = self.client._extract_search_info(response) current_search_results = [] if current_search_info: current_search_results = self.citation_formatter.extract_search_results(current_search_info) # 使用状态管理器处理响应块 state_info = state_manager.process_response_chunk( response=response, content=content_text, reasoning_content=reasoning_content, search_info=current_search_info, search_results=current_search_results ) # 构建增强的流式响应块 chunk = EnhancedStreamChunk( content=content_text, finish_reason=choice.finish_reason if choice.finish_reason else None, reasoning_content=reasoning_content if reasoning_content else None ) # 如果应该提前返回搜索结果 if state_info['should_prepend']: chunk.search_info = state_info['search_info'] chunk.search_results = state_info['search_results'] state_manager.mark_search_results_sent() logger.info(f"提前返回搜索结果: {len(state_info['search_results'])} 个结果") # 如果是最后一个块且应该返回搜索结果 if choice.finish_reason and state_info['should_append']: chunk.search_info = state_info['search_info'] chunk.search_results = state_info['search_results'] logger.info(f"在最后块中返回搜索结果: {len(state_info['search_results'])} 个结果") # 添加使用统计(如果是最后一个块) if state_info['is_completed'] and state_info['final_usage']: usage_data = state_info['final_usage'] chunk.usage = UsageInfo( input_tokens=usage_data['input_tokens'], output_tokens=usage_data['output_tokens'], total_tokens=usage_data['total_tokens'] ) yield f"data: {chunk.model_dump_json()}\n\n" # 流式完成后记录消息 if conversation_id and self.user_id: final_state = state_manager.get_final_state() if final_state.final_usage: model_obj = self.get_model_by_title(request.model) model_id = model_obj.id if model_obj else None # 处理搜索结果和引用格式化 final_content = final_state.full_content final_search_results = final_state.search_results if final_state.search_info and validated_search_options: # 处理垂直领域搜索结果 if (validated_search_options.enable_search_extension and VerticalDomainProcessor.is_vertical_domain_result(final_state.search_info)): structured_data = final_state.search_info.get("vertical_structured_data") final_content = VerticalDomainProcessor.format_vertical_domain_response( final_content, final_state.search_info, structured_data ) # 格式化引用和来源 if validated_search_options.enable_citation or validated_search_options.enable_source: final_content, final_search_results = self.citation_formatter.format_content_with_citations_and_sources( content=final_state.full_content, search_info=final_state.search_info, enable_citation=validated_search_options.enable_citation, citation_format=validated_search_options.citation_format, enable_source=validated_search_options.enable_source ) # 流式完成后完成(conversation tracking removed) except Exception as e: logger.error(f"DashScope搜索增强流式调用异常: {type(e).__name__}: {str(e)}") # 错误处理和降级逻辑 if validated_search_options and validated_search_options.enable_search: logger.warning("搜索功能流式调用异常,尝试降级到普通流式对话") try: # 降级到普通流式对话 fallback_request = ChatRequest( model=request.model, messages=request.messages, stream=True, temperature=request.temperature, top_p=request.top_p, max_tokens=request.max_tokens, conversation_id=conversation_id, enable_thinking=request.enable_thinking, thinking_budget=request.thinking_budget ) async for chunk in self.chat_stream(fallback_request, conversation_id): yield chunk return except Exception as fallback_error: logger.error(f"降级流式对话也失败: {fallback_error}") error_data = {"error": f"调用失败: {str(e)}"} yield f"data: {json.dumps(error_data, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" def get_llm_models(self) -> List[ModelResponse]: """获取所有LLM和多模态模型(均可用于对话) 排除 OCR 专用模型(qwen-vl-ocr 系列),这类模型必须传图片, 不适合作为通用对话模型使用。 """ # OCR 专用模型不适合出现在对话列表 OCR_ONLY_MODELS = {'qwen-vl-ocr', 'qwen-vl-ocr-latest'} models = self.db.query(ModelNew).filter( or_( ModelNew.categories.any(int(ModelCategory.LLM)), ModelNew.categories.any(int(ModelCategory.MULTIMODAL)) ), ModelNew.is_show_enabled == True, ~ModelNew.model_code.in_(OCR_ONLY_MODELS) ).all() result = [] for m in models: resp = ModelResponse.model_validate(m) resp.title = m.model_code resp.name = m.display_name or m.model_code resp.keyword = m.keywords result.append(resp) return result def get_search_supported_models(self) -> List[str]: models = self.db.query(ModelNew).filter( or_( ModelNew.categories.any(int(ModelCategory.LLM)), ModelNew.categories.any(int(ModelCategory.MULTIMODAL)) ), ModelNew.is_search == True, ModelNew.is_show_enabled == True ).all() return sorted([model.model_code for model in models]) def get_thinking_supported_models(self) -> List[str]: models = self.db.query(ModelNew).filter( or_( ModelNew.categories.any(int(ModelCategory.LLM)), ModelNew.categories.any(int(ModelCategory.MULTIMODAL)) ), ModelNew.is_thinking == True, ModelNew.is_show_enabled == True ).all() return sorted([model.model_code for model in models]) def is_search_supported(self, model: str) -> bool: model_obj = self.db.query(ModelNew).filter( ModelNew.model_code == model, or_( ModelNew.categories.any(int(ModelCategory.LLM)), ModelNew.categories.any(int(ModelCategory.MULTIMODAL)) ) ).first() return model_obj.is_search if model_obj else False def is_thinking_supported(self, model: str) -> bool: model_obj = self.db.query(ModelNew).filter( ModelNew.model_code == model, or_( ModelNew.categories.any(int(ModelCategory.LLM)), ModelNew.categories.any(int(ModelCategory.MULTIMODAL)) ) ).first() return model_obj.is_thinking if model_obj else False def validate_search_options(self, model: str, search_options: Optional[SearchOptions]) -> SearchOptions: """ 验证并标准化搜索选项 Args: model: 模型名称 search_options: 搜索选项 Returns: 验证并标准化后的搜索选项 Raises: ValueError: 当模型不支持或参数无效时 """ return self.search_validator.validate_and_normalize(model, search_options, self.db) def create_search_fallback_response( self, original_request: EnhancedChatRequest, error_message: str, conversation_id: Optional[int] = None ) -> EnhancedChatResponse: """ 创建搜索功能降级响应 Args: original_request: 原始请求 error_message: 错误信息 conversation_id: 会话ID Returns: 降级后的响应 """ try: # 创建不带搜索的请求 fallback_request = ChatRequest( model=original_request.model, messages=original_request.messages, stream=False, temperature=original_request.temperature, top_p=original_request.top_p, max_tokens=original_request.max_tokens, conversation_id=conversation_id, enable_thinking=original_request.enable_thinking, thinking_budget=original_request.thinking_budget ) # 调用普通对话 fallback_response = self.chat(fallback_request, conversation_id) # 在响应内容中添加搜索功能不可用的说明 content_with_notice = ( f"{fallback_response.content}\n\n" f"*注意:由于搜索功能暂时不可用({error_message})," f"以上回复基于模型训练数据,可能不包含最新信息。*" ) # 转换为增强响应格式 return EnhancedChatResponse( content=content_with_notice, finish_reason=fallback_response.finish_reason, usage=fallback_response.usage, reasoning_content=fallback_response.reasoning_content, search_info=None, search_results=[] ) except Exception as e: logger.error(f"创建搜索降级响应失败: {e}") raise HTTPException( status_code=500, detail=f"搜索功能不可用且降级失败: {error_message}" )