""" 声音复刻服务 提供声音复刻的业务逻辑处理,集成阿里云百炼平台DashScope 需求: 3.1, 3.2, 3.3, 3.4, 3.5, 3.6, 3.7, 3.8, 3.9 支持: 创建音色、查询列表、查询详情、更新音色、删除音色 """ import logging import os from datetime import datetime from typing import List, Optional from decimal import Decimal from sqlalchemy.orm import Session from fastapi import HTTPException, UploadFile from app.models.audio import VoiceClone from app.schemas.audio_schema import ( VoiceCreateRequest, VoiceResponse, VoiceListResponse ) from app.services.oss_service import get_oss_service logger = logging.getLogger(__name__) class VoiceCloneService: """声音复刻服务类""" # 支持的音频格式 ALLOWED_AUDIO_TYPES = [ "audio/wav", "audio/x-wav", "audio/mpeg", "audio/mp3", "audio/m4a", "audio/x-m4a", "audio/mp4" ] # 支持的文件扩展名 ALLOWED_EXTENSIONS = [".wav", ".mp3", ".m4a"] # 最大文件大小(10MB) MAX_FILE_SIZE = 10 * 1024 * 1024 # 有效的目标模型 # VALID_TARGET_MODELS 已移除,改为动态查库验证 def __init__(self, db: Session, user_id: str, api_key: str = None): """ 初始化声音复刻服务 Args: db: 数据库会话 user_id: 用户ID api_key: 用户的API密钥(从用户数据动态加载) """ self.db = db self.user_id = user_id self.api_key = api_key or os.getenv("DASHSCOPE_API_KEY") self.oss_service = get_oss_service() self._voice_service = None @property def voice_service(self): """延迟初始化VoiceEnrollmentService""" if self._voice_service is None: import dashscope from dashscope.audio.tts_v2 import VoiceEnrollmentService dashscope.api_key = self.api_key self._voice_service = VoiceEnrollmentService() return self._voice_service def validate_audio_file(self, file: UploadFile) -> None: """ 验证音频文件格式和大小 Args: file: 上传的文件对象 Raises: HTTPException: 文件验证失败 """ # 检查文件是否存在 if not file or not file.filename: raise HTTPException(status_code=400, detail="未提供音频文件") # 检查文件扩展名 filename_lower = file.filename.lower() ext = None for allowed_ext in self.ALLOWED_EXTENSIONS: if filename_lower.endswith(allowed_ext): ext = allowed_ext break if not ext: raise HTTPException( status_code=400, detail=f"不支持的音频格式,仅支持: {', '.join(self.ALLOWED_EXTENSIONS)}" ) # 检查Content-Type(如果提供) if file.content_type and file.content_type not in self.ALLOWED_AUDIO_TYPES: # 某些情况下content_type可能不准确,所以只记录警告 logger.warning(f"音频文件Content-Type不匹配: {file.content_type}") # 检查文件大小 file.file.seek(0, 2) # 移动到文件末尾 file_size = file.file.tell() file.file.seek(0) # 重置到文件开头 if file_size > self.MAX_FILE_SIZE: raise HTTPException( status_code=400, detail=f"文件大小超过限制(最大10MB),当前大小: {file_size / 1024 / 1024:.2f}MB" ) if file_size == 0: raise HTTPException(status_code=400, detail="文件为空") async def create_voice( self, request: VoiceCreateRequest, file: UploadFile = None ) -> VoiceResponse: """ 创建复刻音色 Args: request: 创建音色请求 file: 上传的音频文件(与audio_url二选一) Returns: 音色响应对象 Raises: HTTPException: 创建失败 """ # 动态查库验证目标模型 from app.models.model import ModelNew, ModelCategory valid = self.db.query(ModelNew).filter( ModelNew.model_code == request.target_model, ModelNew.categories.any(int(ModelCategory.TTS)), ModelNew.is_api_enabled == True, ).first() if not valid: raise HTTPException(status_code=400, detail=f"无效的目标模型: {request.target_model}") # 确定音频URL audio_url = request.audio_url local_audio_url = None # 本地OSS存储的URL if file: # 验证并上传文件 self.validate_audio_file(file) # 读取文件内容 file_content = await file.read() # 上传到OSS local_audio_url = self.oss_service.upload_file( file_content, prefix="audio/voice", original_filename=file.filename ) audio_url = local_audio_url elif not audio_url: raise HTTPException( status_code=400, detail="必须提供音频文件或音频URL" ) try: # 费用(API调用免费) bill = Decimal("0") # 调用DashScope API创建音色 voice_id = self.voice_service.create_voice( target_model=request.target_model, prefix=request.prefix, url=audio_url, language_hints=request.language_hints ) if not voice_id: raise HTTPException(status_code=502, detail="创建音色失败,未返回voice_id") # 保存到数据库 voice_clone = VoiceClone( user_id=self.user_id, voice_id=voice_id, target_model=request.target_model, prefix=request.prefix, voice_name=request.voice_name, status="DEPLOYING", bill=bill, audio_url=local_audio_url or audio_url ) self.db.add(voice_clone) self.db.commit() self.db.refresh(voice_clone) return VoiceResponse( voice_id=voice_id, status="DEPLOYING", target_model=request.target_model, voice_name=request.voice_name ) except HTTPException: raise except Exception as e: # 尝试识别第三方错误类型并返回更精确的提示 err_str = str(e).lower() logger.error(f"创建音色失败: {type(e).__name__}: {str(e)}") if 'audio.audiosilenterror' in err_str or 'silent audio' in err_str or 'silent' in err_str: # DashScope 返回静音错误 raise HTTPException(status_code=400, detail="音频被判定为静音或无有效语音,请检查麦克风并重新录制(建议 ≥5 秒清晰朗读)") if 'invalid' in err_str and 'format' in err_str: raise HTTPException(status_code=400, detail="音频格式不受支持或文件损坏,请上传 WAV/MP3/M4A 格式的清晰录音") # 默认返回较友好的不可用提示,避免将第三方内部信息直接暴露给用户 raise HTTPException(status_code=502, detail="创建音色失败:服务暂时不可用,请稍后重试或检查音频质量") async def list_voices( self, prefix: str = None, page: int = 0, page_size: int = 10, model: Optional[str] = None ) -> VoiceListResponse: """ 查询用户音色列表 Args: prefix: 按前缀筛选 page: 页码(从0开始) page_size: 每页数量 model: 按目标模型筛选(如:cosyvoice-v3-flash、cosyvoice-v3-plus) Returns: 音色列表响应 Raises: HTTPException: 查询失败 """ try: # 先从本地数据库查询当前用户的音色,支持按模型筛选 query = self.db.query(VoiceClone).filter( VoiceClone.user_id == self.user_id ) # 按模型筛选 if model: query = query.filter(VoiceClone.target_model == model) # 按前缀筛选 if prefix: query = query.filter(VoiceClone.prefix.like(f"{prefix}%")) # 获取用户音色ID集合 user_voices = query.all() user_voice_ids = {v.voice_id for v in user_voices} user_voice_map = {v.voice_id: v for v in user_voices} if not user_voice_ids: return VoiceListResponse(total=0, voices=[]) # 调用DashScope API获取音色列表 # 注意:由于需要按模型筛选,我们先从数据库筛选,然后调用API获取状态 # 为了确保获取到所有相关音色,我们获取较大的页面 result = self.voice_service.list_voices( prefix=prefix, page_index=0, # 从第一页开始 page_size=100 # 获取足够多的数据以便筛选 ) # 解析结果 voices = [] if result: # result可能是列表或包含voices字段的对象 if isinstance(result, list): voice_list = result elif hasattr(result, 'voices'): voice_list = result.voices elif isinstance(result, dict): voice_list = result.get('voices', result.get('data', [])) else: voice_list = [] # 过滤出属于当前用户且匹配筛选条件的音色 for v in voice_list: voice_id = v.get('voice_id') if isinstance(v, dict) else getattr(v, 'voice_id', None) # 只返回属于当前用户的音色 if voice_id and voice_id in user_voice_ids: # 如果指定了模型筛选,检查是否匹配 if model: local_voice = user_voice_map.get(voice_id) if not local_voice or local_voice.target_model != model: continue status = v.get('status') if isinstance(v, dict) else getattr(v, 'status', 'UNKNOWN') gmt_create = v.get('gmt_create') if isinstance(v, dict) else getattr(v, 'gmt_create', None) gmt_modified = v.get('gmt_modified') if isinstance(v, dict) else getattr(v, 'gmt_modified', None) # 从本地数据库获取target_model和voice_name local_voice = user_voice_map.get(voice_id) target_model = local_voice.target_model if local_voice else None voice_name = local_voice.voice_name if local_voice else None voices.append(VoiceResponse( voice_id=voice_id, status=status, target_model=target_model, voice_name=voice_name, gmt_create=gmt_create, gmt_modified=gmt_modified )) # 同步更新本地数据库状态 if local_voice and local_voice.status != status: local_voice.status = status local_voice.updated_at = datetime.utcnow() self.db.commit() # 应用分页 total = len(voices) start_idx = page * page_size end_idx = start_idx + page_size paginated_voices = voices[start_idx:end_idx] return VoiceListResponse( total=total, voices=paginated_voices ) except HTTPException: raise except Exception as e: logger.error(f"查询音色列表失败: {type(e).__name__}: {str(e)}") raise HTTPException(status_code=502, detail=f"查询音色列表失败: {str(e)}") async def query_voice(self, voice_id: str) -> VoiceResponse: """ 查询指定音色详情 Args: voice_id: 音色ID Returns: 音色响应对象 Raises: HTTPException: 查询失败或音色不存在 """ # 验证权限:检查音色是否属于当前用户 local_voice = self.db.query(VoiceClone).filter( VoiceClone.voice_id == voice_id, VoiceClone.user_id == self.user_id ).first() if not local_voice: raise HTTPException(status_code=404, detail="音色不存在") try: # 调用DashScope API查询音色详情 result = self.voice_service.query_voice(voice_id=voice_id) if not result: raise HTTPException(status_code=404, detail="音色不存在") # 解析结果 if isinstance(result, dict): status = result.get('status', 'UNKNOWN') target_model = result.get('target_model') resource_link = result.get('resource_link') gmt_create = result.get('gmt_create') gmt_modified = result.get('gmt_modified') else: status = getattr(result, 'status', 'UNKNOWN') target_model = getattr(result, 'target_model', None) resource_link = getattr(result, 'resource_link', None) gmt_create = getattr(result, 'gmt_create', None) gmt_modified = getattr(result, 'gmt_modified', None) # 更新本地数据库状态 if local_voice.status != status: local_voice.status = status local_voice.updated_at = datetime.utcnow() self.db.commit() return VoiceResponse( voice_id=voice_id, status=status, target_model=target_model or local_voice.target_model, voice_name=local_voice.voice_name, resource_link=resource_link, gmt_create=gmt_create, gmt_modified=gmt_modified ) except HTTPException: raise except Exception as e: logger.error(f"查询音色详情失败: {type(e).__name__}: {str(e)}") raise HTTPException(status_code=502, detail=f"查询音色详情失败: {str(e)}") async def update_voice( self, voice_id: str, file: UploadFile = None, audio_url: str = None ) -> VoiceResponse: """ 更新音色(使用新的音频文件) Args: voice_id: 音色ID file: 新的音频文件(与audio_url二选一) audio_url: 新的音频URL(与file二选一) Returns: 音色响应对象 Raises: HTTPException: 更新失败或音色不存在 """ # 验证权限:检查音色是否属于当前用户 local_voice = self.db.query(VoiceClone).filter( VoiceClone.voice_id == voice_id, VoiceClone.user_id == self.user_id ).first() if not local_voice: raise HTTPException(status_code=404, detail="音色不存在") # 确定新的音频URL new_audio_url = audio_url if file: # 验证并上传文件 self.validate_audio_file(file) # 读取文件内容 file_content = await file.read() # 上传到OSS new_audio_url = self.oss_service.upload_file( file_content, prefix="audio/voice", original_filename=file.filename ) elif not audio_url: raise HTTPException( status_code=400, detail="必须提供新的音频文件或音频URL" ) try: # 调用DashScope API更新音色 result = self.voice_service.update_voice( voice_id=voice_id, url=new_audio_url ) # 更新本地数据库 local_voice.audio_url = new_audio_url local_voice.status = "DEPLOYING" # 更新后需要重新审核 local_voice.updated_at = datetime.utcnow() self.db.commit() return VoiceResponse( voice_id=voice_id, status="DEPLOYING" ) except HTTPException: raise except Exception as e: logger.error(f"更新音色失败: {type(e).__name__}: {str(e)}") raise HTTPException(status_code=502, detail=f"更新音色失败: {str(e)}") async def delete_voice(self, voice_id: str) -> None: """ 删除音色 Args: voice_id: 音色ID Raises: HTTPException: 删除失败或音色不存在 """ # 验证权限:检查音色是否属于当前用户 local_voice = self.db.query(VoiceClone).filter( VoiceClone.voice_id == voice_id, VoiceClone.user_id == self.user_id ).first() if not local_voice: raise HTTPException(status_code=404, detail="音色不存在") try: # 调用DashScope API删除音色 self.voice_service.delete_voice(voice_id=voice_id) # 删除本地数据库记录 self.db.delete(local_voice) self.db.commit() except HTTPException: raise except Exception as e: logger.error(f"删除音色失败: {type(e).__name__}: {str(e)}") raise HTTPException(status_code=502, detail=f"删除音色失败: {str(e)}")