""" 声音克隆服务V2 提供异步声音克隆的业务逻辑处理 """ import logging from datetime import datetime from typing import List from sqlalchemy.orm import Session from sqlalchemy import desc from fastapi import HTTPException from app.models.audio import VoiceCloneV2 from app.schemas.audio_v2 import ( VoiceCloneV2CreateRequest, VoiceCloneV2Response, VoiceCloneV2ListResponse, TaskListQueryParams ) from .base_service import BaseV2Service logger = logging.getLogger(__name__) class VoiceCloneV2Service(BaseV2Service): """声音克隆服务V2(异步模式)""" # 有效的目标模型 VALID_MODELS = ["cosyvoice-v3-plus", "cosyvoice-v3-flash", "cosyvoice-v2"] def __init__(self, db: Session, user_id: str, api_key: str = None): """初始化服务""" super().__init__(db, user_id, api_key) self._voice_service = None @property def voice_service(self): """延迟初始化VoiceEnrollmentService""" if self._voice_service is None: from dashscope.audio.tts_v2 import VoiceEnrollmentService self._voice_service = VoiceEnrollmentService() return self._voice_service async def create_task( self, request: VoiceCloneV2CreateRequest ) -> VoiceCloneV2Response: """ 创建声音克隆任务 Args: request: 创建请求 Returns: 任务响应 Raises: HTTPException: 创建失败 """ # 验证模型 if request.target_model not in self.VALID_MODELS: raise HTTPException( status_code=400, detail=f"无效的模型,支持的模型: {self.VALID_MODELS}" ) try: # 调用DashScope API创建音色 voice_id = self.voice_service.create_voice( target_model=request.target_model, prefix=request.prefix, url=request.audio_url ) if not voice_id: raise HTTPException( status_code=502, detail="创建音色失败,未返回voice_id" ) # 保存到数据库(使用voice_id作为task_id) voice_task = VoiceCloneV2( user_id=self.user_id, task_id=voice_id, # 使用voice_id作为task_id voice_id=None, # 完成后才有 target_model=request.target_model, prefix=request.prefix, voice_name=request.voice_name, audio_url=request.audio_url, status="PENDING", bill=bill ) self.db.add(voice_task) self.db.commit() self.db.refresh(voice_task) return VoiceCloneV2Response.from_orm(voice_task) except HTTPException: raise except Exception as e: logger.error(f"创建声音克隆任务失败: {type(e).__name__}: {str(e)}") raise HTTPException( status_code=502, detail=f"创建声音克隆任务失败: {str(e)}" ) async def get_task(self, task_id: str) -> VoiceCloneV2Response: """ 查询任务详情 Args: task_id: 任务ID Returns: 任务响应 Raises: HTTPException: 任务不存在 """ task = self.db.query(VoiceCloneV2).filter( VoiceCloneV2.task_id == task_id, VoiceCloneV2.user_id == self.user_id ).first() if not task: raise HTTPException(status_code=404, detail="任务不存在") # 如果任务未完成,查询最新状态 if task.status in ["PENDING", "PROCESSING"]: await self._update_task_status(task) return VoiceCloneV2Response.from_orm(task) async def list_tasks( self, params: TaskListQueryParams ) -> VoiceCloneV2ListResponse: """ 查询任务列表 Args: params: 查询参数 Returns: 任务列表响应 """ query = self.db.query(VoiceCloneV2).filter( VoiceCloneV2.user_id == self.user_id ) # 状态筛选 if params.status: query = query.filter(VoiceCloneV2.status == params.status) # 总数 total = query.count() # 排序 if params.order_by == "created_at": order_column = VoiceCloneV2.created_at elif params.order_by == "updated_at": order_column = VoiceCloneV2.updated_at else: order_column = VoiceCloneV2.created_at if params.order == "desc": query = query.order_by(desc(order_column)) else: query = query.order_by(order_column) # 分页 offset = (params.page - 1) * params.page_size tasks = query.offset(offset).limit(params.page_size).all() items = [VoiceCloneV2Response.from_orm(task) for task in tasks] return VoiceCloneV2ListResponse(total=total, items=items) async def _update_task_status(self, task: VoiceCloneV2) -> None: """ 更新任务状态(从DashScope查询) Args: task: 任务对象 """ try: # 查询音色状态 result = self.voice_service.query_voice(voice_id=task.task_id) if not result: return # 解析状态 if isinstance(result, dict): status = result.get('status', 'UNKNOWN') else: status = getattr(result, 'status', 'UNKNOWN') # 映射状态 status_map = { "DEPLOYING": "PROCESSING", "OK": "SUCCEEDED", "DEPLOYED": "SUCCEEDED", "UNDEPLOYED": "FAILED", "FAILED": "FAILED" } new_status = status_map.get(status, status) task.status = new_status task.updated_at = datetime.now() # 如果成功,设置voice_id if new_status == "SUCCEEDED" and not task.voice_id: task.voice_id = task.task_id task.completed_at = datetime.now() elif new_status == "FAILED": task.error_message = "音色训练失败" task.completed_at = datetime.now() self.db.commit() except Exception as e: logger.error(f"更新任务状态失败: {type(e).__name__}: {str(e)}")