请实现一个完整的AI语音模块,集成阿里云百炼平台(DashScope),支持语音合成(TTS)、语音识别(ASR)、声音复刻和音色管理功能。
创建 app/services/tts_service.py:
audio/tts/{日期}/{uuid}.{format}def split_text(text: str, max_length: int = 2000) -> List[str]:
"""按句子边界切割文本,每段不超过max_length"""
# 句子分隔符:。!?;\n
# 优先在分隔符处切割
# 如果单句超过max_length,在逗号处切割
# 返回切割后的文本列表
创建 app/services/asr_service.py:
创建 app/services/voice_clone_service.py:
def validate_audio_file(file: UploadFile) -> None:
"""验证音频文件是否符合复刻要求"""
# 格式:WAV、MP3、M4A
# 大小:≤10MB
# 时长:10-60秒(可选验证)
创建 app/services/system_voice_service.py:
创建 app/models/audio.py:
AudioSynthesis表(audio_synthesis):
VoiceClone表(voice_clone):
ASRTask表(asr_task):
SystemVoice表(system_voice):
创建 app/routers/audio_router.py:
# 语音合成
GET /api/audio/tts/models - 获取TTS模型列表
POST /api/audio/tts/synthesize - 语音合成(短文本)
POST /api/audio/tts/synthesize-long - 长文本语音合成
# 语音识别
GET /api/audio/asr/models - 获取ASR模型列表
POST /api/audio/asr/recognize - 同步语音识别
POST /api/audio/asr/transcribe - 提交异步转写任务
GET /api/audio/asr/task/{task_id} - 查询转写任务状态
# 声音复刻
POST /api/audio/voice/create - 创建复刻音色
GET /api/audio/voice/list - 查询用户音色列表
GET /api/audio/voice/{voice_id} - 查询指定音色
PUT /api/audio/voice/{voice_id} - 更新音色
DELETE /api/audio/voice/{voice_id} - 删除音色
# 系统音色
GET /api/audio/voice/system - 获取系统音色列表
创建 app/schemas/audio_schema.py:
# TTS相关
class TTSRequest(BaseModel):
model: str
voice: str
text: str
stream: bool = False
format: str = "mp3"
sample_rate: int = 22050
volume: int = 50
speech_rate: float = 1.0
pitch_rate: float = 1.0
instruction: Optional[str] = None
class TTSResponse(BaseModel):
audio_url: str
duration: float
format: str
sample_rate: int
characters: int
class LongTTSResponse(BaseModel):
audio_url: str
duration: float
format: str
total_characters: int
segments: int
# ASR相关
class ASRRequest(BaseModel):
model: str
audio_url: Optional[str] = None
audio_base64: Optional[str] = None
language: Optional[str] = None
enable_itn: bool = False
context: Optional[str] = None
class ASRResponse(BaseModel):
text: str
language: str
emotion: Optional[str] = None
duration: int
usage: dict
class TranscribeRequest(BaseModel):
model: str
file_url: str
language: Optional[str] = None
enable_itn: bool = False
context: Optional[str] = None
channel_id: List[int] = [0]
class TaskResponse(BaseModel):
task_id: str
task_status: str
submit_time: Optional[str] = None
scheduled_time: Optional[str] = None
end_time: Optional[str] = None
result: Optional[dict] = None
usage: Optional[dict] = None
# 声音复刻相关
class VoiceCreateRequest(BaseModel):
target_model: str
prefix: str
audio_url: Optional[str] = None
language_hints: Optional[List[str]] = None
class VoiceResponse(BaseModel):
voice_id: str
status: str
target_model: Optional[str] = None
resource_link: Optional[str] = None
gmt_create: Optional[str] = None
gmt_modified: Optional[str] = None
class VoiceListResponse(BaseModel):
total: int
voices: List[VoiceResponse]
# 系统音色相关
class SystemVoiceResponse(BaseModel):
voice_id: str
name: str
trait: str
age: str
category: str
languages: List[str]
models: List[str]
features: dict
# 模型相关
class TTSModelResponse(BaseModel):
id: int
title: str
name: str
description: str
price: str
features: List[str]
class ASRModelResponse(BaseModel):
id: int
title: str
name: str
description: str
call_type: str # sync/async
features: List[str]
创建迁移文件:
013_create_audio_synthesis_table.sql:
CREATE TABLE IF NOT EXISTS aigcspace.audio_synthesis (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES aigcspace.users(id),
model VARCHAR(100) NOT NULL,
voice VARCHAR(100) NOT NULL,
text TEXT NOT NULL,
audio_url VARCHAR(500) NOT NULL,
duration NUMERIC(10,2),
format VARCHAR(20),
characters INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_audio_synthesis_user_id ON aigcspace.audio_synthesis(user_id);
CREATE INDEX idx_audio_synthesis_created_at ON aigcspace.audio_synthesis(created_at);
014_create_voice_clone_table.sql:
CREATE TABLE IF NOT EXISTS aigcspace.voice_clone (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES aigcspace.users(id),
voice_id VARCHAR(200) NOT NULL UNIQUE,
target_model VARCHAR(100) NOT NULL,
prefix VARCHAR(20) NOT NULL,
status VARCHAR(20) DEFAULT 'DEPLOYING',
audio_url VARCHAR(500),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_voice_clone_user_id ON aigcspace.voice_clone(user_id);
CREATE INDEX idx_voice_clone_status ON aigcspace.voice_clone(status);
015_create_asr_task_table.sql:
CREATE TABLE IF NOT EXISTS aigcspace.asr_task (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES aigcspace.users(id),
task_id VARCHAR(100) NOT NULL UNIQUE,
model VARCHAR(100) NOT NULL,
file_url VARCHAR(500) NOT NULL,
status VARCHAR(20) DEFAULT 'PENDING',
result_text TEXT,
result_url VARCHAR(500),
duration INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_asr_task_user_id ON aigcspace.asr_task(user_id);
CREATE INDEX idx_asr_task_task_id ON aigcspace.asr_task(task_id);
CREATE INDEX idx_asr_task_status ON aigcspace.asr_task(status);
016_create_system_voice_table.sql:
CREATE TABLE IF NOT EXISTS aigcspace.system_voice (
id SERIAL PRIMARY KEY,
voice_id VARCHAR(100) NOT NULL UNIQUE,
name VARCHAR(50) NOT NULL,
trait VARCHAR(100),
age VARCHAR(20),
category VARCHAR(50),
languages JSONB DEFAULT '[]',
models JSONB DEFAULT '[]',
ssml_support BOOLEAN DEFAULT FALSE,
instruct_support BOOLEAN DEFAULT FALSE,
timestamp_support BOOLEAN DEFAULT FALSE,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_system_voice_category ON aigcspace.system_voice(category);
CREATE INDEX idx_system_voice_is_active ON aigcspace.system_voice(is_active);
from dashscope.audio.tts_v2 import SpeechSynthesizer, AudioFormat
class TTSService:
def __init__(self, db: Session, user_id: int):
self.db = db
self.user_id = user_id
self.oss_service = OSSService()
async def synthesize(self, request: TTSRequest) -> TTSResponse:
"""非流式语音合成"""
synthesizer = SpeechSynthesizer(
model=request.model,
voice=request.voice,
format=self._get_audio_format(request.format, request.sample_rate),
volume=request.volume,
speech_rate=request.speech_rate,
pitch_rate=request.pitch_rate
)
# 合成音频
audio_data = synthesizer.call(request.text)
# 上传到OSS
filename = f"audio/tts/{date.today()}/{uuid4()}.{request.format}"
audio_url = await self.oss_service.upload_bytes(audio_data, filename)
# 保存记录
# ...
return TTSResponse(
audio_url=audio_url,
duration=self._calculate_duration(audio_data, request.format),
format=request.format,
sample_rate=request.sample_rate,
characters=len(request.text)
)
async def synthesize_long(self, request: TTSRequest) -> LongTTSResponse:
"""长文本语音合成"""
# 切割文本
segments = self._split_text(request.text)
# 逐段合成
audio_parts = []
for segment in segments:
synthesizer = SpeechSynthesizer(
model=request.model,
voice=request.voice,
format=AudioFormat.PCM_22050HZ_MONO_16BIT, # 使用PCM便于合并
volume=request.volume,
speech_rate=request.speech_rate,
pitch_rate=request.pitch_rate
)
audio_data = synthesizer.call(segment)
audio_parts.append(audio_data)
# 合并音频
merged_audio = self._merge_audio(audio_parts)
# 转换格式并上传
final_audio = self._convert_format(merged_audio, request.format)
filename = f"audio/tts/{date.today()}/{uuid4()}.{request.format}"
audio_url = await self.oss_service.upload_bytes(final_audio, filename)
return LongTTSResponse(
audio_url=audio_url,
duration=self._calculate_duration(final_audio, request.format),
format=request.format,
total_characters=len(request.text),
segments=len(segments)
)
def _split_text(self, text: str, max_length: int = 2000) -> List[str]:
"""智能切割文本"""
if len(text) <= max_length:
return [text]
segments = []
current = ""
# 按句子分割
sentences = re.split(r'([。!?;\n])', text)
for i in range(0, len(sentences), 2):
sentence = sentences[i]
delimiter = sentences[i+1] if i+1 < len(sentences) else ""
full_sentence = sentence + delimiter
if len(current) + len(full_sentence) <= max_length:
current += full_sentence
else:
if current:
segments.append(current)
current = full_sentence
if current:
segments.append(current)
return segments
import dashscope
from dashscope import MultiModalConversation
import requests
class ASRService:
def __init__(self, db: Session, user_id: int):
self.db = db
self.user_id = user_id
async def recognize(self, request: ASRRequest) -> ASRResponse:
"""同步语音识别"""
messages = [
{"role": "user", "content": [{"audio": request.audio_url or request.audio_base64}]}
]
asr_options = {}
if request.language:
asr_options["language"] = request.language
if request.enable_itn:
asr_options["enable_itn"] = True
response = MultiModalConversation.call(
model=request.model,
messages=messages,
result_format="message",
asr_options=asr_options if asr_options else None
)
# 解析响应
choice = response.output.choices[0]
text = choice.message.content[0]["text"]
annotations = choice.message.annotations[0] if choice.message.annotations else {}
return ASRResponse(
text=text,
language=annotations.get("language", "unknown"),
emotion=annotations.get("emotion"),
duration=response.usage.get("seconds", 0),
usage={
"input_tokens": response.usage.input_tokens_details.get("text_tokens", 0),
"output_tokens": response.usage.output_tokens_details.get("text_tokens", 0),
"seconds": response.usage.get("seconds", 0)
}
)
async def transcribe(self, request: TranscribeRequest) -> TaskResponse:
"""提交异步转写任务"""
url = "https://dashscope.aliyuncs.com/api/v1/services/audio/asr/transcription"
headers = {
"Authorization": f"Bearer {dashscope.api_key}",
"Content-Type": "application/json",
"X-DashScope-Async": "enable"
}
payload = {
"model": request.model,
"input": {"file_url": request.file_url},
"parameters": {
"channel_id": request.channel_id,
"enable_itn": request.enable_itn
}
}
if request.language:
payload["parameters"]["language"] = request.language
if request.context:
payload["parameters"]["corpus"] = {"text": request.context}
response = requests.post(url, headers=headers, json=payload)
data = response.json()
# 保存任务记录
task = ASRTask(
user_id=self.user_id,
task_id=data["output"]["task_id"],
model=request.model,
file_url=request.file_url,
status=data["output"]["task_status"]
)
self.db.add(task)
self.db.commit()
return TaskResponse(
task_id=data["output"]["task_id"],
task_status=data["output"]["task_status"]
)
async def get_task_status(self, task_id: str) -> TaskResponse:
"""查询任务状态"""
url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}"
headers = {
"Authorization": f"Bearer {dashscope.api_key}",
"X-DashScope-Async": "enable"
}
response = requests.get(url, headers=headers)
data = response.json()
# 更新数据库记录
task = self.db.query(ASRTask).filter(ASRTask.task_id == task_id).first()
if task:
task.status = data["output"]["task_status"]
if data["output"].get("result"):
task.result_url = data["output"]["result"].get("transcription_url")
if data.get("usage"):
task.duration = data["usage"].get("seconds")
task.updated_at = datetime.utcnow()
self.db.commit()
return TaskResponse(
task_id=data["output"]["task_id"],
task_status=data["output"]["task_status"],
submit_time=data["output"].get("submit_time"),
scheduled_time=data["output"].get("scheduled_time"),
end_time=data["output"].get("end_time"),
result=data["output"].get("result"),
usage=data.get("usage")
)
from dashscope.audio.tts_v2 import VoiceEnrollmentService
class VoiceCloneService:
def __init__(self, db: Session, user_id: int):
self.db = db
self.user_id = user_id
self.service = VoiceEnrollmentService()
self.oss_service = OSSService()
async def create_voice(self, file: UploadFile, request: VoiceCreateRequest) -> VoiceResponse:
"""创建复刻音色"""
# 验证文件
self._validate_audio_file(file)
# 上传到OSS
filename = f"audio/voice/{date.today()}/{uuid4()}{Path(file.filename).suffix}"
audio_url = await self.oss_service.upload_file(file, filename)
# 调用百炼API创建音色
voice_id = self.service.create_voice(
target_model=request.target_model,
prefix=request.prefix,
url=audio_url,
language_hints=request.language_hints
)
# 保存记录
voice = VoiceClone(
user_id=self.user_id,
voice_id=voice_id,
target_model=request.target_model,
prefix=request.prefix,
status="DEPLOYING",
audio_url=audio_url
)
self.db.add(voice)
self.db.commit()
return VoiceResponse(
voice_id=voice_id,
status="DEPLOYING",
target_model=request.target_model
)
async def list_voices(self, prefix: str = None, page: int = 0, page_size: int = 10) -> VoiceListResponse:
"""查询音色列表"""
voices = self.service.list_voices(
prefix=prefix,
page_index=page,
page_size=page_size
)
# 同步更新本地数据库状态
for v in voices:
local_voice = self.db.query(VoiceClone).filter(
VoiceClone.voice_id == v["voice_id"],
VoiceClone.user_id == self.user_id
).first()
if local_voice and local_voice.status != v["status"]:
local_voice.status = v["status"]
local_voice.updated_at = datetime.utcnow()
self.db.commit()
return VoiceListResponse(
total=len(voices),
voices=[VoiceResponse(
voice_id=v["voice_id"],
status=v["status"],
gmt_create=v.get("gmt_create"),
gmt_modified=v.get("gmt_modified")
) for v in voices]
)
async def query_voice(self, voice_id: str) -> VoiceResponse:
"""查询指定音色"""
# 验证权限
local_voice = self.db.query(VoiceClone).filter(
VoiceClone.voice_id == voice_id,
VoiceClone.user_id == self.user_id
).first()
if not local_voice:
raise HTTPException(status_code=404, detail="音色不存在")
voice_info = self.service.query_voice(voice_id=voice_id)
# 更新本地状态
if local_voice.status != voice_info["status"]:
local_voice.status = voice_info["status"]
local_voice.updated_at = datetime.utcnow()
self.db.commit()
return VoiceResponse(
voice_id=voice_id,
status=voice_info["status"],
target_model=voice_info.get("target_model"),
resource_link=voice_info.get("resource_link"),
gmt_create=voice_info.get("gmt_create"),
gmt_modified=voice_info.get("gmt_modified")
)
async def delete_voice(self, voice_id: str) -> None:
"""删除音色"""
# 验证权限
local_voice = self.db.query(VoiceClone).filter(
VoiceClone.voice_id == voice_id,
VoiceClone.user_id == self.user_id
).first()
if not local_voice:
raise HTTPException(status_code=404, detail="音色不存在")
# 调用百炼API删除
self.service.delete_voice(voice_id=voice_id)
# 删除本地记录
self.db.delete(local_voice)
self.db.commit()
def _validate_audio_file(self, file: UploadFile) -> None:
"""验证音频文件"""
# 检查格式
allowed_types = ["audio/wav", "audio/mpeg", "audio/mp3", "audio/m4a", "audio/x-m4a"]
if file.content_type not in allowed_types:
raise HTTPException(status_code=400, detail="不支持的音频格式,仅支持WAV、MP3、M4A")
# 检查大小(10MB)
file.file.seek(0, 2)
size = file.file.tell()
file.file.seek(0)
if size > 10 * 1024 * 1024:
raise HTTPException(status_code=400, detail="文件大小超过10MB限制")
在 main.py 中注册:
from app.routers import audio_router
app.include_router(audio_router.router)
创建 scripts/init_system_voices.py:
# 初始化系统音色数据
SYSTEM_VOICES = [
{
"voice_id": "longanyang",
"name": "龙安洋",
"trait": "阳光大男孩",
"age": "20~30岁",
"category": "社交陪伴",
"languages": ["中文(普通话)", "英文"],
"models": ["cosyvoice-v3-flash", "cosyvoice-v3-plus"],
"ssml_support": True,
"instruct_support": True,
"timestamp_support": False
},
{
"voice_id": "longanhuan",
"name": "龙安欢",
"trait": "欢脱元气女",
"age": "20~30岁",
"category": "社交陪伴",
"languages": ["中文(普通话)", "英文"],
"models": ["cosyvoice-v3-flash", "cosyvoice-v3-plus"],
"ssml_support": True,
"instruct_support": True,
"timestamp_support": False
},
# ... 更多音色
]