""" 语音合成服务V2 提供异步语音合成的业务逻辑处理 """ import logging import requests from datetime import datetime from typing import List from sqlalchemy.orm import Session from sqlalchemy import desc from fastapi import HTTPException from app.models.audio import AudioSynthesisV2 from app.schemas.audio_v2 import ( AudioSynthesisV2CreateRequest, AudioSynthesisV2Response, AudioSynthesisV2ListResponse, TaskListQueryParams ) from .base_service import BaseV2Service logger = logging.getLogger(__name__) class AudioSynthesisV2Service(BaseV2Service): """语音合成服务V2(异步模式)""" # DashScope API基础URL DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/api/v1" # 有效的TTS模型 VALID_MODELS = ["cosyvoice-v3-flash", "cosyvoice-v3-plus", "cosyvoice-v2"] async def create_task( self, request: AudioSynthesisV2CreateRequest ) -> AudioSynthesisV2Response: """ 创建语音合成任务 Args: request: 创建请求 Returns: 任务响应 Raises: HTTPException: 创建失败 """ # 验证模型 if request.model not in self.VALID_MODELS: raise HTTPException( status_code=400, detail=f"无效的模型,支持的模型: {self.VALID_MODELS}" ) try: # 调用DashScope API提交异步任务 url = f"{self.DASHSCOPE_BASE_URL}/services/audio/tts/synthesis" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "X-DashScope-Async": "enable" } payload = { "model": request.model, "input": {"text": request.text}, "parameters": { "voice": request.voice, "format": request.format } } response = requests.post(url, headers=headers, json=payload, timeout=30) if response.status_code != 200: error_data = response.json() if response.text else {} error_msg = error_data.get("message", f"HTTP {response.status_code}") logger.error(f"提交TTS任务失败: {error_msg}") raise HTTPException( status_code=502, detail=f"提交合成任务失败: {error_msg}" ) data = response.json() output = data.get("output", {}) task_id = output.get("task_id") if not task_id: raise HTTPException( status_code=502, detail="提交合成任务失败,未返回task_id" ) # 保存到数据库 tts_task = AudioSynthesisV2( user_id=self.user_id, task_id=task_id, model=request.model, voice=request.voice, text=request.text, format=request.format, characters=len(request.text), custom_name=request.custom_name, status="PENDING" ) self.db.add(tts_task) self.db.commit() self.db.refresh(tts_task) return AudioSynthesisV2Response.from_orm(tts_task) except HTTPException: raise except requests.exceptions.Timeout: raise HTTPException(status_code=504, detail="提交合成任务超时") except Exception as e: logger.error(f"创建TTS任务失败: {type(e).__name__}: {str(e)}") raise HTTPException( status_code=502, detail=f"创建合成任务失败: {str(e)}" ) async def get_task(self, task_id: str) -> AudioSynthesisV2Response: """ 查询任务详情 Args: task_id: 任务ID Returns: 任务响应 Raises: HTTPException: 任务不存在 """ task = self.db.query(AudioSynthesisV2).filter( AudioSynthesisV2.task_id == task_id, AudioSynthesisV2.user_id == self.user_id ).first() if not task: raise HTTPException(status_code=404, detail="任务不存在") # 如果任务未完成,查询最新状态 if task.status in ["PENDING", "PROCESSING"]: await self._update_task_status(task) return AudioSynthesisV2Response.from_orm(task) async def list_tasks( self, params: TaskListQueryParams ) -> AudioSynthesisV2ListResponse: """ 查询任务列表 Args: params: 查询参数 Returns: 任务列表响应 """ query = self.db.query(AudioSynthesisV2).filter( AudioSynthesisV2.user_id == self.user_id ) # 状态筛选 if params.status: query = query.filter(AudioSynthesisV2.status == params.status) # 总数 total = query.count() # 排序 if params.order_by == "created_at": order_column = AudioSynthesisV2.created_at elif params.order_by == "updated_at": order_column = AudioSynthesisV2.updated_at else: order_column = AudioSynthesisV2.created_at if params.order == "desc": query = query.order_by(desc(order_column)) else: query = query.order_by(order_column) # 分页 offset = (params.page - 1) * params.page_size tasks = query.offset(offset).limit(params.page_size).all() items = [AudioSynthesisV2Response.from_orm(task) for task in tasks] return AudioSynthesisV2ListResponse(total=total, items=items) async def _update_task_status(self, task: AudioSynthesisV2) -> None: """ 更新任务状态(从DashScope查询) Args: task: 任务对象 """ try: url = f"{self.DASHSCOPE_BASE_URL}/tasks/{task.task_id}" headers = { "Authorization": f"Bearer {self.api_key}", "X-DashScope-Async": "enable" } response = requests.get(url, headers=headers, timeout=30) if response.status_code != 200: logger.warning(f"查询任务状态失败: {response.status_code}") return data = response.json() output = data.get("output", {}) # 更新状态 new_status = output.get("task_status", task.status) task.status = new_status task.updated_at = datetime.now() # 如果任务完成,提取结果 if new_status == "SUCCEEDED": result = output.get("result", {}) # 提取音频URL task.audio_url = result.get("audio_url") # 提取时长 usage = data.get("usage", {}) duration = usage.get("duration", 0) task.duration = duration task.completed_at = datetime.now() elif new_status == "FAILED": # 提取错误信息 task.error_message = output.get("message", "合成失败") task.completed_at = datetime.now() self.db.commit() except Exception as e: logger.error(f"更新任务状态失败: {type(e).__name__}: {str(e)}")