Audio_generate.md 24 KB

AI语音模块生成提示词

模块概述

请实现一个完整的AI语音模块,集成阿里云百炼平台(DashScope),支持语音合成(TTS)、语音识别(ASR)、声音复刻和音色管理功能。

技术栈

  • 后端框架:FastAPI
  • ORM:SQLAlchemy 2.0
  • 数据库:PostgreSQL(schema: aigcspace)
  • AI平台:阿里云百炼平台(dashscope SDK)
  • 存储:阿里云OSS(已有oss_service)
  • 认证:JWT Token(已有auth中间件)

功能需求

1. 语音合成服务 (TTS)

创建 app/services/tts_service.py

  • 封装DashScope语音合成API(SpeechSynthesizer)
  • 支持模型:cosyvoice-v3-flash、cosyvoice-v3-plus、cosyvoice-v2
  • 非流式合成:调用call方法,返回完整音频
  • 流式合成:使用callback回调方式获取音频流
  • 参数支持:model、voice、text、format、volume、speech_rate、pitch_rate、instruction
  • 音频存储:合成完成后上传至OSS,路径格式 audio/tts/{日期}/{uuid}.{format}
  • 长文本合成:智能切割文本(按句子边界),分段合成后合并

文本切割逻辑

def split_text(text: str, max_length: int = 2000) -> List[str]:
    """按句子边界切割文本,每段不超过max_length"""
    # 句子分隔符:。!?;\n
    # 优先在分隔符处切割
    # 如果单句超过max_length,在逗号处切割
    # 返回切割后的文本列表

2. 语音识别服务 (ASR)

创建 app/services/asr_service.py

  • 封装DashScope语音识别API
  • 同步识别(短音频):
    • 模型:qwen3-asr-flash、qwen-audio-asr
    • 支持URL和Base64两种输入方式
    • 参数:language、enable_itn、context
  • 异步识别(长音频):
    • 模型:qwen3-asr-flash-filetrans
    • 提交任务返回task_id
    • 查询任务状态和结果
    • 参数:file_url、language、enable_itn、context、channel_id

3. 声音复刻服务

创建 app/services/voice_clone_service.py

  • 封装DashScope声音复刻API(VoiceEnrollmentService)
  • 创建音色:
    • 上传音频文件到OSS,获取URL
    • 调用create_voice创建复刻音色
    • 参数:target_model、prefix、url、language_hints
    • 返回voice_id
  • 查询音色列表:list_voices,支持分页和前缀筛选
  • 查询指定音色:query_voice
  • 更新音色:update_voice
  • 删除音色:delete_voice

音频要求验证

def validate_audio_file(file: UploadFile) -> None:
    """验证音频文件是否符合复刻要求"""
    # 格式:WAV、MP3、M4A
    # 大小:≤10MB
    # 时长:10-60秒(可选验证)

4. 系统音色服务

创建 app/services/system_voice_service.py

  • 维护系统预置音色列表(可存储在数据库或配置文件)
  • 获取音色列表:支持按模型、场景分类筛选
  • 音色信息包含:voice_id、name、trait、age、category、languages、models、features

5. 数据模型

创建 app/models/audio.py

AudioSynthesis表(audio_synthesis):

  • id: 主键
  • user_id: 用户ID(外键)
  • model: 使用的模型
  • voice: 音色ID
  • text: 原始文本(Text类型)
  • audio_url: OSS音频URL
  • duration: 音频时长(秒)
  • format: 音频格式
  • characters: 字符数
  • created_at: 创建时间

VoiceClone表(voice_clone):

  • id: 主键
  • user_id: 用户ID(外键)
  • voice_id: 百炼平台返回的音色ID
  • target_model: 驱动模型
  • prefix: 音色前缀
  • status: 状态(DEPLOYING/OK/UNDEPLOYED)
  • audio_url: 原始音频OSS URL
  • created_at, updated_at: 时间戳

ASRTask表(asr_task):

  • id: 主键
  • user_id: 用户ID(外键)
  • task_id: 百炼平台任务ID
  • model: 使用的模型
  • file_url: 音频文件URL
  • status: 任务状态(PENDING/RUNNING/SUCCEEDED/FAILED)
  • result_text: 识别结果文本(Text类型)
  • result_url: 结果文件URL
  • duration: 音频时长(秒)
  • created_at, updated_at: 时间戳

SystemVoice表(system_voice):

  • id: 主键
  • voice_id: 音色标识
  • name: 音色名称
  • trait: 特质描述
  • age: 年龄范围
  • category: 场景分类
  • languages: 支持语言(JSON数组)
  • models: 支持模型(JSON数组)
  • ssml_support: 是否支持SSML
  • instruct_support: 是否支持Instruct
  • timestamp_support: 是否支持时间戳
  • is_active: 是否启用
  • created_at: 创建时间

6. API路由

创建 app/routers/audio_router.py

# 语音合成
GET  /api/audio/tts/models - 获取TTS模型列表
POST /api/audio/tts/synthesize - 语音合成(短文本)
POST /api/audio/tts/synthesize-long - 长文本语音合成

# 语音识别
GET  /api/audio/asr/models - 获取ASR模型列表
POST /api/audio/asr/recognize - 同步语音识别
POST /api/audio/asr/transcribe - 提交异步转写任务
GET  /api/audio/asr/task/{task_id} - 查询转写任务状态

# 声音复刻
POST /api/audio/voice/create - 创建复刻音色
GET  /api/audio/voice/list - 查询用户音色列表
GET  /api/audio/voice/{voice_id} - 查询指定音色
PUT  /api/audio/voice/{voice_id} - 更新音色
DELETE /api/audio/voice/{voice_id} - 删除音色

# 系统音色
GET  /api/audio/voice/system - 获取系统音色列表

7. Schema定义

创建 app/schemas/audio_schema.py

# TTS相关
class TTSRequest(BaseModel):
    model: str
    voice: str
    text: str
    stream: bool = False
    format: str = "mp3"
    sample_rate: int = 22050
    volume: int = 50
    speech_rate: float = 1.0
    pitch_rate: float = 1.0
    instruction: Optional[str] = None

class TTSResponse(BaseModel):
    audio_url: str
    duration: float
    format: str
    sample_rate: int
    characters: int

class LongTTSResponse(BaseModel):
    audio_url: str
    duration: float
    format: str
    total_characters: int
    segments: int

# ASR相关
class ASRRequest(BaseModel):
    model: str
    audio_url: Optional[str] = None
    audio_base64: Optional[str] = None
    language: Optional[str] = None
    enable_itn: bool = False
    context: Optional[str] = None

class ASRResponse(BaseModel):
    text: str
    language: str
    emotion: Optional[str] = None
    duration: int
    usage: dict

class TranscribeRequest(BaseModel):
    model: str
    file_url: str
    language: Optional[str] = None
    enable_itn: bool = False
    context: Optional[str] = None
    channel_id: List[int] = [0]

class TaskResponse(BaseModel):
    task_id: str
    task_status: str
    submit_time: Optional[str] = None
    scheduled_time: Optional[str] = None
    end_time: Optional[str] = None
    result: Optional[dict] = None
    usage: Optional[dict] = None

# 声音复刻相关
class VoiceCreateRequest(BaseModel):
    target_model: str
    prefix: str
    audio_url: Optional[str] = None
    language_hints: Optional[List[str]] = None

class VoiceResponse(BaseModel):
    voice_id: str
    status: str
    target_model: Optional[str] = None
    resource_link: Optional[str] = None
    gmt_create: Optional[str] = None
    gmt_modified: Optional[str] = None

class VoiceListResponse(BaseModel):
    total: int
    voices: List[VoiceResponse]

# 系统音色相关
class SystemVoiceResponse(BaseModel):
    voice_id: str
    name: str
    trait: str
    age: str
    category: str
    languages: List[str]
    models: List[str]
    features: dict

# 模型相关
class TTSModelResponse(BaseModel):
    id: int
    title: str
    name: str
    description: str
    price: str
    features: List[str]

class ASRModelResponse(BaseModel):
    id: int
    title: str
    name: str
    description: str
    call_type: str  # sync/async
    features: List[str]

8. 数据库迁移

创建迁移文件:

013_create_audio_synthesis_table.sql

CREATE TABLE IF NOT EXISTS aigcspace.audio_synthesis (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES aigcspace.users(id),
    model VARCHAR(100) NOT NULL,
    voice VARCHAR(100) NOT NULL,
    text TEXT NOT NULL,
    audio_url VARCHAR(500) NOT NULL,
    duration NUMERIC(10,2),
    format VARCHAR(20),
    characters INTEGER,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_audio_synthesis_user_id ON aigcspace.audio_synthesis(user_id);
CREATE INDEX idx_audio_synthesis_created_at ON aigcspace.audio_synthesis(created_at);

014_create_voice_clone_table.sql

CREATE TABLE IF NOT EXISTS aigcspace.voice_clone (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES aigcspace.users(id),
    voice_id VARCHAR(200) NOT NULL UNIQUE,
    target_model VARCHAR(100) NOT NULL,
    prefix VARCHAR(20) NOT NULL,
    status VARCHAR(20) DEFAULT 'DEPLOYING',
    audio_url VARCHAR(500),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_voice_clone_user_id ON aigcspace.voice_clone(user_id);
CREATE INDEX idx_voice_clone_status ON aigcspace.voice_clone(status);

015_create_asr_task_table.sql

CREATE TABLE IF NOT EXISTS aigcspace.asr_task (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL REFERENCES aigcspace.users(id),
    task_id VARCHAR(100) NOT NULL UNIQUE,
    model VARCHAR(100) NOT NULL,
    file_url VARCHAR(500) NOT NULL,
    status VARCHAR(20) DEFAULT 'PENDING',
    result_text TEXT,
    result_url VARCHAR(500),
    duration INTEGER,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_asr_task_user_id ON aigcspace.asr_task(user_id);
CREATE INDEX idx_asr_task_task_id ON aigcspace.asr_task(task_id);
CREATE INDEX idx_asr_task_status ON aigcspace.asr_task(status);

016_create_system_voice_table.sql

CREATE TABLE IF NOT EXISTS aigcspace.system_voice (
    id SERIAL PRIMARY KEY,
    voice_id VARCHAR(100) NOT NULL UNIQUE,
    name VARCHAR(50) NOT NULL,
    trait VARCHAR(100),
    age VARCHAR(20),
    category VARCHAR(50),
    languages JSONB DEFAULT '[]',
    models JSONB DEFAULT '[]',
    ssml_support BOOLEAN DEFAULT FALSE,
    instruct_support BOOLEAN DEFAULT FALSE,
    timestamp_support BOOLEAN DEFAULT FALSE,
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_system_voice_category ON aigcspace.system_voice(category);
CREATE INDEX idx_system_voice_is_active ON aigcspace.system_voice(is_active);

关键实现细节

TTS服务实现

from dashscope.audio.tts_v2 import SpeechSynthesizer, AudioFormat

class TTSService:
    def __init__(self, db: Session, user_id: int):
        self.db = db
        self.user_id = user_id
        self.oss_service = OSSService()
    
    async def synthesize(self, request: TTSRequest) -> TTSResponse:
        """非流式语音合成"""
        synthesizer = SpeechSynthesizer(
            model=request.model,
            voice=request.voice,
            format=self._get_audio_format(request.format, request.sample_rate),
            volume=request.volume,
            speech_rate=request.speech_rate,
            pitch_rate=request.pitch_rate
        )
        
        # 合成音频
        audio_data = synthesizer.call(request.text)
        
        # 上传到OSS
        filename = f"audio/tts/{date.today()}/{uuid4()}.{request.format}"
        audio_url = await self.oss_service.upload_bytes(audio_data, filename)
        
        # 保存记录
        # ...
        
        return TTSResponse(
            audio_url=audio_url,
            duration=self._calculate_duration(audio_data, request.format),
            format=request.format,
            sample_rate=request.sample_rate,
            characters=len(request.text)
        )
    
    async def synthesize_long(self, request: TTSRequest) -> LongTTSResponse:
        """长文本语音合成"""
        # 切割文本
        segments = self._split_text(request.text)
        
        # 逐段合成
        audio_parts = []
        for segment in segments:
            synthesizer = SpeechSynthesizer(
                model=request.model,
                voice=request.voice,
                format=AudioFormat.PCM_22050HZ_MONO_16BIT,  # 使用PCM便于合并
                volume=request.volume,
                speech_rate=request.speech_rate,
                pitch_rate=request.pitch_rate
            )
            audio_data = synthesizer.call(segment)
            audio_parts.append(audio_data)
        
        # 合并音频
        merged_audio = self._merge_audio(audio_parts)
        
        # 转换格式并上传
        final_audio = self._convert_format(merged_audio, request.format)
        filename = f"audio/tts/{date.today()}/{uuid4()}.{request.format}"
        audio_url = await self.oss_service.upload_bytes(final_audio, filename)
        
        return LongTTSResponse(
            audio_url=audio_url,
            duration=self._calculate_duration(final_audio, request.format),
            format=request.format,
            total_characters=len(request.text),
            segments=len(segments)
        )
    
    def _split_text(self, text: str, max_length: int = 2000) -> List[str]:
        """智能切割文本"""
        if len(text) <= max_length:
            return [text]
        
        segments = []
        current = ""
        
        # 按句子分割
        sentences = re.split(r'([。!?;\n])', text)
        
        for i in range(0, len(sentences), 2):
            sentence = sentences[i]
            delimiter = sentences[i+1] if i+1 < len(sentences) else ""
            full_sentence = sentence + delimiter
            
            if len(current) + len(full_sentence) <= max_length:
                current += full_sentence
            else:
                if current:
                    segments.append(current)
                current = full_sentence
        
        if current:
            segments.append(current)
        
        return segments

ASR服务实现

import dashscope
from dashscope import MultiModalConversation
import requests

class ASRService:
    def __init__(self, db: Session, user_id: int):
        self.db = db
        self.user_id = user_id
    
    async def recognize(self, request: ASRRequest) -> ASRResponse:
        """同步语音识别"""
        messages = [
            {"role": "user", "content": [{"audio": request.audio_url or request.audio_base64}]}
        ]
        
        asr_options = {}
        if request.language:
            asr_options["language"] = request.language
        if request.enable_itn:
            asr_options["enable_itn"] = True
        
        response = MultiModalConversation.call(
            model=request.model,
            messages=messages,
            result_format="message",
            asr_options=asr_options if asr_options else None
        )
        
        # 解析响应
        choice = response.output.choices[0]
        text = choice.message.content[0]["text"]
        annotations = choice.message.annotations[0] if choice.message.annotations else {}
        
        return ASRResponse(
            text=text,
            language=annotations.get("language", "unknown"),
            emotion=annotations.get("emotion"),
            duration=response.usage.get("seconds", 0),
            usage={
                "input_tokens": response.usage.input_tokens_details.get("text_tokens", 0),
                "output_tokens": response.usage.output_tokens_details.get("text_tokens", 0),
                "seconds": response.usage.get("seconds", 0)
            }
        )
    
    async def transcribe(self, request: TranscribeRequest) -> TaskResponse:
        """提交异步转写任务"""
        url = "https://dashscope.aliyuncs.com/api/v1/services/audio/asr/transcription"
        
        headers = {
            "Authorization": f"Bearer {dashscope.api_key}",
            "Content-Type": "application/json",
            "X-DashScope-Async": "enable"
        }
        
        payload = {
            "model": request.model,
            "input": {"file_url": request.file_url},
            "parameters": {
                "channel_id": request.channel_id,
                "enable_itn": request.enable_itn
            }
        }
        
        if request.language:
            payload["parameters"]["language"] = request.language
        if request.context:
            payload["parameters"]["corpus"] = {"text": request.context}
        
        response = requests.post(url, headers=headers, json=payload)
        data = response.json()
        
        # 保存任务记录
        task = ASRTask(
            user_id=self.user_id,
            task_id=data["output"]["task_id"],
            model=request.model,
            file_url=request.file_url,
            status=data["output"]["task_status"]
        )
        self.db.add(task)
        self.db.commit()
        
        return TaskResponse(
            task_id=data["output"]["task_id"],
            task_status=data["output"]["task_status"]
        )
    
    async def get_task_status(self, task_id: str) -> TaskResponse:
        """查询任务状态"""
        url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"
        
        headers = {
            "Authorization": f"Bearer {dashscope.api_key}",
            "X-DashScope-Async": "enable"
        }
        
        response = requests.get(url, headers=headers)
        data = response.json()
        
        # 更新数据库记录
        task = self.db.query(ASRTask).filter(ASRTask.task_id == task_id).first()
        if task:
            task.status = data["output"]["task_status"]
            if data["output"].get("result"):
                task.result_url = data["output"]["result"].get("transcription_url")
            if data.get("usage"):
                task.duration = data["usage"].get("seconds")
            task.updated_at = datetime.utcnow()
            self.db.commit()
        
        return TaskResponse(
            task_id=data["output"]["task_id"],
            task_status=data["output"]["task_status"],
            submit_time=data["output"].get("submit_time"),
            scheduled_time=data["output"].get("scheduled_time"),
            end_time=data["output"].get("end_time"),
            result=data["output"].get("result"),
            usage=data.get("usage")
        )

声音复刻服务实现

from dashscope.audio.tts_v2 import VoiceEnrollmentService

class VoiceCloneService:
    def __init__(self, db: Session, user_id: int):
        self.db = db
        self.user_id = user_id
        self.service = VoiceEnrollmentService()
        self.oss_service = OSSService()
    
    async def create_voice(self, file: UploadFile, request: VoiceCreateRequest) -> VoiceResponse:
        """创建复刻音色"""
        # 验证文件
        self._validate_audio_file(file)
        
        # 上传到OSS
        filename = f"audio/voice/{date.today()}/{uuid4()}{Path(file.filename).suffix}"
        audio_url = await self.oss_service.upload_file(file, filename)
        
        # 调用百炼API创建音色
        voice_id = self.service.create_voice(
            target_model=request.target_model,
            prefix=request.prefix,
            url=audio_url,
            language_hints=request.language_hints
        )
        
        # 保存记录
        voice = VoiceClone(
            user_id=self.user_id,
            voice_id=voice_id,
            target_model=request.target_model,
            prefix=request.prefix,
            status="DEPLOYING",
            audio_url=audio_url
        )
        self.db.add(voice)
        self.db.commit()
        
        return VoiceResponse(
            voice_id=voice_id,
            status="DEPLOYING",
            target_model=request.target_model
        )
    
    async def list_voices(self, prefix: str = None, page: int = 0, page_size: int = 10) -> VoiceListResponse:
        """查询音色列表"""
        voices = self.service.list_voices(
            prefix=prefix,
            page_index=page,
            page_size=page_size
        )
        
        # 同步更新本地数据库状态
        for v in voices:
            local_voice = self.db.query(VoiceClone).filter(
                VoiceClone.voice_id == v["voice_id"],
                VoiceClone.user_id == self.user_id
            ).first()
            if local_voice and local_voice.status != v["status"]:
                local_voice.status = v["status"]
                local_voice.updated_at = datetime.utcnow()
        self.db.commit()
        
        return VoiceListResponse(
            total=len(voices),
            voices=[VoiceResponse(
                voice_id=v["voice_id"],
                status=v["status"],
                gmt_create=v.get("gmt_create"),
                gmt_modified=v.get("gmt_modified")
            ) for v in voices]
        )
    
    async def query_voice(self, voice_id: str) -> VoiceResponse:
        """查询指定音色"""
        # 验证权限
        local_voice = self.db.query(VoiceClone).filter(
            VoiceClone.voice_id == voice_id,
            VoiceClone.user_id == self.user_id
        ).first()
        if not local_voice:
            raise HTTPException(status_code=404, detail="音色不存在")
        
        voice_info = self.service.query_voice(voice_id=voice_id)
        
        # 更新本地状态
        if local_voice.status != voice_info["status"]:
            local_voice.status = voice_info["status"]
            local_voice.updated_at = datetime.utcnow()
            self.db.commit()
        
        return VoiceResponse(
            voice_id=voice_id,
            status=voice_info["status"],
            target_model=voice_info.get("target_model"),
            resource_link=voice_info.get("resource_link"),
            gmt_create=voice_info.get("gmt_create"),
            gmt_modified=voice_info.get("gmt_modified")
        )
    
    async def delete_voice(self, voice_id: str) -> None:
        """删除音色"""
        # 验证权限
        local_voice = self.db.query(VoiceClone).filter(
            VoiceClone.voice_id == voice_id,
            VoiceClone.user_id == self.user_id
        ).first()
        if not local_voice:
            raise HTTPException(status_code=404, detail="音色不存在")
        
        # 调用百炼API删除
        self.service.delete_voice(voice_id=voice_id)
        
        # 删除本地记录
        self.db.delete(local_voice)
        self.db.commit()
    
    def _validate_audio_file(self, file: UploadFile) -> None:
        """验证音频文件"""
        # 检查格式
        allowed_types = ["audio/wav", "audio/mpeg", "audio/mp3", "audio/m4a", "audio/x-m4a"]
        if file.content_type not in allowed_types:
            raise HTTPException(status_code=400, detail="不支持的音频格式,仅支持WAV、MP3、M4A")
        
        # 检查大小(10MB)
        file.file.seek(0, 2)
        size = file.file.tell()
        file.file.seek(0)
        if size > 10 * 1024 * 1024:
            raise HTTPException(status_code=400, detail="文件大小超过10MB限制")

权限控制

  • 所有API需要用户认证
  • 复刻音色只能被创建者访问/修改/删除
  • ASR任务只能被创建者查询
  • 系统音色列表公开访问

注册路由

main.py 中注册:

from app.routers import audio_router
app.include_router(audio_router.router)

依赖关系

  • 依赖已有的OSSService(文件上传)
  • 依赖已有的User模型和认证中间件
  • 依赖已有的ApiResponse响应格式
  • 需要安装:dashscope、pydub(音频处理)

初始化系统音色数据

创建 scripts/init_system_voices.py

# 初始化系统音色数据
SYSTEM_VOICES = [
    {
        "voice_id": "longanyang",
        "name": "龙安洋",
        "trait": "阳光大男孩",
        "age": "20~30岁",
        "category": "社交陪伴",
        "languages": ["中文(普通话)", "英文"],
        "models": ["cosyvoice-v3-flash", "cosyvoice-v3-plus"],
        "ssml_support": True,
        "instruct_support": True,
        "timestamp_support": False
    },
    {
        "voice_id": "longanhuan",
        "name": "龙安欢",
        "trait": "欢脱元气女",
        "age": "20~30岁",
        "category": "社交陪伴",
        "languages": ["中文(普通话)", "英文"],
        "models": ["cosyvoice-v3-flash", "cosyvoice-v3-plus"],
        "ssml_support": True,
        "instruct_support": True,
        "timestamp_support": False
    },
    # ... 更多音色
]