""" DeepSeek AI 服务 """ import httpx import json from typing import AsyncGenerator from utils.config import settings from utils.logger import logger class DeepSeekService: def __init__(self): self.api_key = settings.deepseek.api_key self.api_url = settings.deepseek.api_url self.base_url = f"{self.api_url}/v1" async def chat(self, messages: list, model: str = "deepseek-chat") -> str: """同步聊天""" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": model, "messages": messages, "temperature": 0.7 } try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( f"{self.base_url}/chat/completions", headers=headers, json=data ) response.raise_for_status() result = response.json() return result['choices'][0]['message']['content'] except Exception as e: logger.error(f"DeepSeek API 调用失败: {e}") raise async def stream_chat(self, messages: list, model: str = "deepseek-chat") -> AsyncGenerator[str, None]: """流式聊天""" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": model, "messages": messages, "temperature": 0.7, "stream": True } try: async with httpx.AsyncClient(timeout=120.0) as client: async with client.stream( "POST", f"{self.base_url}/chat/completions", headers=headers, json=data ) as response: response.raise_for_status() async for line in response.aiter_lines(): if line.startswith("data: "): data_str = line[6:] if data_str == "[DONE]": break try: data_json = json.loads(data_str) if 'choices' in data_json and len(data_json['choices']) > 0: delta = data_json['choices'][0].get('delta', {}) content = delta.get('content', '') if content: yield content except json.JSONDecodeError: continue except Exception as e: logger.error(f"DeepSeek 流式 API 调用失败: {e}") raise # 全局实例 deepseek_service = DeepSeekService()