""" AI语音API路由 提供语音合成(TTS)、语音识别(ASR)、声音复刻和音色管理的RESTful API端点 需求: 6.1-6.13, 7.1, 8.1-8.7 """ from datetime import datetime from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from app.database import get_db, SessionLocal from app.models.user import User from app.middleware import get_current_user_from_request from app.schemas.model_schema import ApiResponse from app.schemas.audio_schema import ( # TTS相关 TTSRequest, TTSResponse, LongTTSResponse, TTSModelResponse, # ASR相关 ASRRequest, ASRResponse, TranscribeRequest, TaskResponse, ASRModelResponse, # 声音复刻相关 VoiceCreateRequest, VoiceUpdateRequest, VoiceResponse, VoiceListResponse, # 系统音色相关 SystemVoiceResponse, # 创作历史相关 AudioHistoryItem, AudioHistoryListResponse, UpdateAudioNameRequest, ) from app.models.audio import AudioSynthesis, ASRTask, ASRRecognition from app.services.tts_service import TTSService from app.services.asr_service import ASRService from app.services.voice_clone_service import VoiceCloneService from app.services.system_voice_service import SystemVoiceService from app.services.oss_service import get_oss_service from app.services.system_config_manager import get_config_int router = APIRouter(prefix="/api/audio", tags=["AI语音"]) # ==================== TTS端点 ==================== @router.get("/tts/models", response_model=ApiResponse[List[TTSModelResponse]]) def get_tts_models( db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 获取TTS模型列表 需求: 6.1 """ service = TTSService(db, current_user.id, current_user.apikey) models = service.get_tts_models() return ApiResponse( code=200, message="success", data=models ) @router.post("/tts/synthesize", response_model=ApiResponse[TTSResponse]) async def synthesize_speech( request: TTSRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 语音合成(非流式),需要余额检查 将文本转换为语音,返回OSS上的音频文件URL 文本长度不超过2000字符 需求: 6.2 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") # 检查文本长度限制 max_chars = get_config_int("max_audio_chars", 5000) if len(request.text) > max_chars: raise HTTPException(status_code=400, detail=f"文本长度超过限制(最大{max_chars}字符)") # 如果请求流式输出,返回流式响应 if request.stream: from app.services.crypto_utils import get_effective_api_key effective_key = get_effective_api_key(db, request.model, current_user.apikey) stream_db = SessionLocal() async def audio_stream_and_close(): try: service = TTSService(stream_db, current_user.id, effective_key) async for chunk in service.synthesize_stream(request): yield chunk finally: stream_db.close() return StreamingResponse( audio_stream_and_close(), media_type=f"audio/{request.format}", headers={ "Content-Disposition": f"attachment; filename=audio.{request.format}" } ) from app.services.crypto_utils import get_effective_api_key effective_key = get_effective_api_key(db, request.model, current_user.apikey) service = TTSService(db, current_user.id, effective_key) result = await service.synthesize(request) return ApiResponse( code=200, message="success", data=result ) @router.post("/tts/synthesize-long", response_model=ApiResponse[LongTTSResponse]) async def synthesize_long_speech( request: TTSRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 长文本语音合成,需要余额检查 支持超过2000字符的长文本,自动按句子边界切割并合并 需求: 6.3 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") # 检查文本长度限制(长文本允许更大,最低放宽到20万字符) max_chars = max(get_config_int("max_audio_chars", 5000) * 10, 200000) if len(request.text) > max_chars: raise HTTPException(status_code=400, detail=f"文本长度超过限制(最大{max_chars}字符)") from app.services.crypto_utils import get_effective_api_key effective_key = get_effective_api_key(db, request.model, current_user.apikey) service = TTSService(db, current_user.id, effective_key) result = await service.synthesize_long(request) return ApiResponse( code=200, message="success", data=result ) # ==================== ASR端点 ==================== @router.get("/asr/models", response_model=ApiResponse[List[ASRModelResponse]]) def get_asr_models( db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 获取ASR模型列表 需求: 6.4 """ service = ASRService(db, current_user.id, current_user.apikey) models = service.get_asr_models() return ApiResponse( code=200, message="success", data=models ) @router.post("/asr/recognize", response_model=ApiResponse[ASRResponse]) async def recognize_speech( request: ASRRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 同步语音识别(JSON方式),需要余额检查 适用于短音频识别,低延迟,实时返回识别结果 限制:音频文件大小不超过10MB,且时长不超过5分钟 支持 audio_url 或 audio_base64 方式 需求: 6.5 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") from app.services.crypto_utils import get_effective_api_key effective_key = get_effective_api_key(db, request.model, current_user.apikey) service = ASRService(db, current_user.id, effective_key) result = await service.recognize(request) return ApiResponse( code=200, message="success", data=result ) @router.post("/asr/recognize/file", response_model=ApiResponse[ASRResponse]) async def recognize_speech_with_file( file: UploadFile = File(..., description="音频文件(MP3/WAV/M4A,最大10MB,时长不超过5分钟)"), model: str = Form(..., description="识别模型:qwen3-asr-flash、qwen-audio-asr"), language: Optional[str] = Form(default=None, description="指定语种:zh、en、ja、ko等,不指定则自动检测"), enable_itn: bool = Form(default=False, description="是否启用逆文本标准化(仅中英文)"), context: Optional[str] = Form(default=None, description="上下文提示,提升特定场景识别准确率"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request), oss_service = Depends(get_oss_service) ): """ 同步语音识别(文件上传方式),需要余额检查 适用于短音频识别,实时返回识别结果 限制:音频文件大小不超过10MB,且时长不超过5分钟 流程:上传文件到OSS -> 获取URL -> 调用识别服务 需求: 6.5 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") # 验证文件大小(最大10MB) MAX_FILE_SIZE = 10 * 1024 * 1024 file_content = await file.read() if len(file_content) > MAX_FILE_SIZE: raise HTTPException( status_code=400, detail=f"文件大小超过限制(最大10MB,且时长不超过5分钟),当前文件大小:{len(file_content) / 1024 / 1024:.2f}MB" ) # 验证文件格式 allowed_extensions = ['.mp3', '.wav', '.m4a'] file_extension = None if file.filename: file_extension = '.' + file.filename.split('.')[-1].lower() if file_extension and file_extension not in allowed_extensions: raise HTTPException( status_code=400, detail=f"不支持的音频格式。支持的格式:{', '.join(allowed_extensions)}" ) # 上传文件到OSS获取URL try: audio_url = oss_service.upload_file( file_content, prefix="audio/asr", original_filename=file.filename ) except RuntimeError as e: raise HTTPException( status_code=500, detail=f"文件上传失败:{str(e)}" ) # 构建 ASRRequest 对象 request = ASRRequest( model=model, audio_url=audio_url, language=language, enable_itn=enable_itn, context=context ) # 调用识别服务 from app.services.crypto_utils import get_effective_api_key effective_key = get_effective_api_key(db, model, current_user.apikey) service = ASRService(db, current_user.id, effective_key) result = await service.recognize(request) return ApiResponse( code=200, message="success", data=result ) @router.post("/asr/transcribe", response_model=ApiResponse[TaskResponse]) async def transcribe_audio( request: TranscribeRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 提交异步转写任务(JSON方式),需要余额检查 适用于长音频转写,支持最长12小时录音,具备情感识别与句粒度时间戳功能 限制:音频文件大小不超过2GB,且时长不超过12小时 需要提供公网可访问的音频文件URL 需求: 6.6 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") from app.services.crypto_utils import get_effective_api_key effective_key = get_effective_api_key(db, request.model, current_user.apikey) service = ASRService(db, current_user.id, effective_key) result = await service.transcribe(request) return ApiResponse( code=200, message="success", data=result ) @router.get("/asr/upload-url", response_model=ApiResponse[dict]) def get_asr_upload_url( filename: str = Query(..., description="原始文件名,用于保留扩展名"), content_type: Optional[str] = Query(default=None, description="文件 Content-Type"), current_user: User = Depends(get_current_user_from_request) ): """ 获取音频文件直传 OSS 的预签名 PUT URL(用于异步转写) 前端拿到 upload_url 后,直接 PUT 文件到 OSS, 上传完成后将 public_url 作为 file_url 传给 /asr/transcribe 接口。 预签名 URL 有效期 15 分钟(大文件上传留足时间)。 """ oss = get_oss_service() result = oss.generate_presigned_put_url( prefix="audio/asr/transcribe", original_filename=filename, expires=900, content_type=content_type or None ) return ApiResponse(code=200, message="success", data=result) async def transcribe_audio_with_file( file: UploadFile = File(..., description="音频文件(MP3/WAV/M4A等,最大2GB,时长不超过12小时)"), model: str = Form(..., description="识别模型:qwen3-asr-flash-filetrans"), language: Optional[str] = Form(default=None, description="指定语种:zh、en、ja、ko等,不指定则自动检测"), enable_itn: bool = Form(default=False, description="是否启用逆文本标准化(仅中英文)"), context: Optional[str] = Form(default=None, description="上下文提示,提升特定场景识别准确率"), channel_id: Optional[str] = Form(default="0", description="多音轨文件的音轨索引,逗号分隔,如:0,1"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request), oss_service = Depends(get_oss_service) ): """ 提交异步转写任务(文件上传方式),需要余额检查 适用于长音频转写,支持最长12小时录音,具备情感识别与句粒度时间戳功能 限制:音频文件大小不超过2GB,且时长不超过12小时 流程:上传文件到OSS -> 获取URL -> 提交异步转写任务 需求: 6.6 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") # 验证文件大小(长音频最大2GB) MAX_FILE_SIZE = 2 * 1024 * 1024 * 1024 file_content = await file.read() if len(file_content) > MAX_FILE_SIZE: raise HTTPException( status_code=400, detail=f"文件大小超过限制(最大2GB,且时长不超过12小时),当前文件大小:{len(file_content) / 1024 / 1024 / 1024:.2f}GB" ) # 验证文件格式(支持常见音频格式) allowed_extensions = ['.mp3', '.wav', '.m4a', '.flac', '.aac', '.ogg', '.opus', '.amr'] file_extension = None if file.filename: file_extension = '.' + file.filename.split('.')[-1].lower() if file_extension and file_extension not in allowed_extensions: raise HTTPException( status_code=400, detail=f"不支持的音频格式。支持的格式:{', '.join(allowed_extensions)}" ) # 验证模型:查数据库,call_type 为 async 的 STT 模型都允许 from app.services.asr_service import ASRService asr_svc = ASRService(db, current_user.id, current_user.apikey) async_models = [m.title for m in asr_svc.get_asr_models() if m.call_type == "async"] if model not in async_models: raise HTTPException( status_code=400, detail=f"异步转写仅支持模型:{', '.join(async_models) or 'paraformer-v1, paraformer-v2'}" ) # 解析channel_id channel_ids = [0] # 默认值 if channel_id: try: channel_ids = [int(cid.strip()) for cid in channel_id.split(",") if cid.strip()] if not channel_ids: channel_ids = [0] except ValueError: raise HTTPException( status_code=400, detail="channel_id格式错误,应为逗号分隔的整数,如:0,1" ) # 上传文件到OSS获取URL try: audio_url = oss_service.upload_file( file_content, prefix="audio/asr/transcribe", original_filename=file.filename ) except RuntimeError as e: raise HTTPException( status_code=500, detail=f"文件上传失败:{str(e)}" ) # 构建 TranscribeRequest 对象 request = TranscribeRequest( model=model, file_url=audio_url, language=language, enable_itn=enable_itn, context=context, channel_id=channel_ids ) # 调用转写服务 from app.services.crypto_utils import get_effective_api_key effective_key = get_effective_api_key(db, model, current_user.apikey) service = ASRService(db, current_user.id, effective_key) result = await service.transcribe(request) return ApiResponse( code=200, message="success", data=result ) @router.get("/asr/task/{task_id}", response_model=ApiResponse[TaskResponse]) async def get_task_status( task_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 查询转写任务状态 根据任务ID查询异步转写任务的状态和结果 需求: 6.7 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") # 先查询任务获取 model,以便使用正确的 API Key from app.models.audio import ASRTask local_task = db.query(ASRTask).filter( ASRTask.task_id == task_id, ASRTask.user_id == current_user.id ).first() if not local_task: raise HTTPException(status_code=404, detail="任务不存在") # 使用和提交任务时相同的 API Key 获取逻辑 from app.services.crypto_utils import get_effective_api_key effective_key = get_effective_api_key(db, local_task.model, current_user.apikey) service = ASRService(db, current_user.id, effective_key) result = await service.get_task_status(task_id) return ApiResponse( code=200, message="success", data=result ) # ==================== 声音复刻端点 ==================== @router.get("/voice/upload-url", response_model=ApiResponse[dict]) def get_voice_upload_url( filename: str = Query(..., description="原始文件名,用于保留扩展名"), content_type: Optional[str] = Query(default=None, description="文件 Content-Type"), current_user: User = Depends(get_current_user_from_request) ): """ 获取声音文件直传 OSS 的预签名 PUT URL 前端拿到 upload_url 后,直接用 PUT 请求上传文件到 OSS, 上传完成后将 public_url 作为 audio_url 传给 /voice/create 接口。 预签名 URL 有效期 5 分钟。 """ oss = get_oss_service() result = oss.generate_presigned_put_url( prefix="audio/voice", original_filename=filename, expires=300, content_type=content_type or None ) return ApiResponse(code=200, message="success", data=result) @router.post("/voice/create", response_model=ApiResponse[VoiceResponse]) async def create_voice( file: Optional[UploadFile] = File(default=None, description="音频文件(WAV/MP3/M4A,最大10MB)"), target_model: str = Form(..., description="目标模型:cosyvoice-v3-plus、cosyvoice-v3-flash、cosyvoice-v2"), prefix: str = Form(..., description="音色名称前缀,仅允许数字、字母和下划线,不超过10字符"), voice_name: Optional[str] = Form(default=None, description="音色名称(用户输入的中文名称)"), audio_url: Optional[str] = Form(default=None, description="音频文件URL(与file二选一)"), language_hints: Optional[str] = Form(default=None, description="语言提示,逗号分隔:en,fr,de,ja,ko,ru"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 创建复刻音色 上传音频文件创建专属音色,支持文件上传或URL方式 需求: 6.8 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") # 解析language_hints lang_hints = None if language_hints: lang_hints = [h.strip() for h in language_hints.split(",") if h.strip()] # 构建请求对象 request = VoiceCreateRequest( target_model=target_model, prefix=prefix, voice_name=voice_name, audio_url=audio_url, language_hints=lang_hints ) from app.services.crypto_utils import get_effective_api_key effective_key = get_effective_api_key(db, target_model, current_user.apikey) service = VoiceCloneService(db, current_user.id, effective_key) result = await service.create_voice(request, file) return ApiResponse( code=200, message="success", data=result ) @router.get("/voice/list", response_model=ApiResponse[VoiceListResponse]) async def list_voices( prefix: Optional[str] = Query(default=None, description="按前缀筛选"), page: int = Query(default=0, ge=0, description="页码(从0开始)"), page_size: int = Query(default=10, ge=1, le=100, description="每页数量"), model: Optional[str] = Query(default=None, description="按目标模型筛选(如:cosyvoice-v3-flash、cosyvoice-v3-plus)"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 查询用户音色列表 返回当前用户创建的所有复刻音色,支持按模型筛选 需求: 6.9 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") service = VoiceCloneService(db, current_user.id, current_user.apikey) result = await service.list_voices(prefix, page, page_size, model) return ApiResponse( code=200, message="success", data=result ) @router.get("/voice/system", response_model=ApiResponse[List[SystemVoiceResponse]]) def get_system_voices( model: Optional[str] = Query(default=None, description="按模型筛选"), category: Optional[str] = Query(default=None, description="按场景分类筛选"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 获取系统音色列表 返回系统预置的音色列表,支持按模型和场景分类筛选 需求: 6.13 """ service = SystemVoiceService(db) voices = service.get_system_voices(model, category) return ApiResponse( code=200, message="success", data=voices ) @router.get("/voice/{voice_id}", response_model=ApiResponse[VoiceResponse]) async def get_voice( voice_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 查询指定音色详情 根据音色ID查询音色的详细信息和状态 需求: 6.10 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") service = VoiceCloneService(db, current_user.id, current_user.apikey) result = await service.query_voice(voice_id) return ApiResponse( code=200, message="success", data=result ) @router.put("/voice/{voice_id}", response_model=ApiResponse[VoiceResponse]) async def update_voice( voice_id: str, file: Optional[UploadFile] = File(default=None, description="新的音频文件"), audio_url: Optional[str] = Form(default=None, description="新的音频URL(与file二选一)"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 更新音色 使用新的音频文件更新已有音色 需求: 6.11 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") service = VoiceCloneService(db, current_user.id, current_user.apikey) result = await service.update_voice(voice_id, file, audio_url) return ApiResponse( code=200, message="success", data=result ) @router.delete("/voice/{voice_id}", response_model=ApiResponse[None]) async def delete_voice( voice_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 删除音色 删除指定的复刻音色 需求: 6.12 """ if not current_user.apikey: raise HTTPException(status_code=403, detail="未配置API密钥,请在用户设置中配置apikey") service = VoiceCloneService(db, current_user.id, current_user.apikey) await service.delete_voice(voice_id) return ApiResponse( code=200, message="success", data=None ) # ==================== 创作历史端点 ==================== @router.get("/history/synthesis", response_model=ApiResponse[AudioHistoryListResponse]) def get_synthesis_history( page: int = Query(default=0, ge=0, description="页码(从0开始)"), page_size: int = Query(default=10, ge=1, le=100, description="每页数量"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 获取当前用户的语音合成历史记录 目前仅返回TTS合成记录,后续可扩展为统一的创作历史。 """ # 查询当前用户的合成记录,按时间倒序,排除被拒绝的内容 query = db.query(AudioSynthesis).filter( AudioSynthesis.user_id == current_user.id ).filter( (AudioSynthesis.review_status != 'rejected') | (AudioSynthesis.review_status.is_(None)) ).order_by(AudioSynthesis.created_at.desc()) total = query.count() records = query.offset(page * page_size).limit(page_size).all() items: List[AudioHistoryItem] = [] for record in records: # 名称优先使用自定义名称,否则使用文件名或文本截断 name = record.custom_name if not name: if record.audio_url: name = record.audio_url.split("/")[-1] or "语音合成音频" elif record.text: text_preview = record.text.strip() name = (text_preview[:20] + "...") if len(text_preview) > 20 else text_preview else: name = f"TTS合成-{record.id}" items.append( AudioHistoryItem( id=record.id, name=name, custom_name=record.custom_name, mode="声音合成", duration=float(record.duration) if record.duration is not None else None, characters=record.characters, status="已完成", audio_url=record.audio_url, created_at=record.created_at.isoformat() if record.created_at else "", completed_at=record.completed_at.isoformat() if record.completed_at else None ) ) return ApiResponse( code=200, message="success", data=AudioHistoryListResponse( total=total, items=items ) ) @router.get("/history/recognition", response_model=ApiResponse[AudioHistoryListResponse]) def get_recognition_history( page: int = Query(default=0, ge=0, description="页码(从0开始)"), page_size: int = Query(default=10, ge=1, le=100, description="每页数量"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 获取当前用户的语音识别历史记录 返回同步识别和异步转写任务的历史记录,按时间倒序合并 """ from sqlalchemy import union_all, select, literal # 查询同步识别记录 sync_query = select( ASRRecognition.id, ASRRecognition.user_id, ASRRecognition.model, ASRRecognition.audio_url, ASRRecognition.result_text, ASRRecognition.detected_language, ASRRecognition.duration, ASRRecognition.created_at, literal("同步识别").label("mode"), literal("已完成").label("status") ).filter( ASRRecognition.user_id == current_user.id ) # 查询异步转写任务记录 async_query = select( ASRTask.id, ASRTask.user_id, ASRTask.model, ASRTask.file_url.label("audio_url"), ASRTask.result_text, literal(None).label("detected_language"), ASRTask.duration, ASRTask.created_at, literal("异步转写").label("mode"), ASRTask.status ).filter( ASRTask.user_id == current_user.id ) # 分别查询同步识别和异步转写记录(数据库层分页,避免全量加载) # 先各自 COUNT,再按比例分页 sync_total = db.query(ASRRecognition).filter( ASRRecognition.user_id == current_user.id ).count() async_total = db.query(ASRTask).filter( ASRTask.user_id == current_user.id ).count() total = sync_total + async_total # 计算分页偏移 offset = page * page_size # 从两张表各取足够的数据,合并后再切片 # 取 offset + page_size 条,保证合并后能切出正确的一页 fetch_limit = offset + page_size sync_records = db.query(ASRRecognition).filter( ASRRecognition.user_id == current_user.id ).order_by(ASRRecognition.created_at.desc()).limit(fetch_limit).all() async_records = db.query(ASRTask).filter( ASRTask.user_id == current_user.id ).order_by(ASRTask.created_at.desc()).limit(fetch_limit).all() # 合并记录并按时间排序 all_records = [] for record in sync_records: all_records.append(('sync', record)) for record in async_records: all_records.append(('async', record)) # 按创建时间倒序排序 all_records.sort(key=lambda x: x[1].created_at if x[1].created_at else datetime.min, reverse=True) # 分页切片 paginated_records = all_records[offset:offset + page_size] items: List[AudioHistoryItem] = [] for record_type, record in paginated_records: if record_type == 'sync': # 同步识别记录 rec = record # type: ASRRecognition name = "" if rec.audio_url: from urllib.parse import unquote decoded = unquote(rec.audio_url) path_part = decoded.split("?")[0] name = path_part.split("/")[-1] or "语音识别音频" elif rec.audio_base64: name = "Base64音频" if not name: name = f"同步识别-{rec.id}" characters = len(rec.result_text) if rec.result_text else None recognition_text = None if rec.result_text: recognition_text = rec.result_text[:100] + "..." if len(rec.result_text) > 100 else rec.result_text items.append( AudioHistoryItem( id=f"sync-{rec.id}", # 添加前缀避免与异步任务ID冲突 name=name, mode="同步识别", duration=float(rec.duration) if rec.duration is not None else None, characters=characters, status="已完成", audio_url=rec.audio_url, created_at=rec.created_at.isoformat() if rec.created_at else "", recognition_text=recognition_text, language=rec.detected_language ) ) else: # 异步转写任务记录 task = record # type: ASRTask name = "" if task.file_url: from urllib.parse import unquote, urlparse # file_url 可能是带签名的完整 OSS URL,路径部分可能是 URL 编码 # 先解码,再取路径最后一段,再去掉查询参数 decoded = unquote(task.file_url) path_part = decoded.split("?")[0] # 去掉签名参数 name = path_part.split("/")[-1] or "语音识别音频" if not name: name = f"识别任务-{task.task_id[:8]}" status_map = { 'SUCCEEDED': '已完成', 'PENDING': '排队中', 'RUNNING': '处理中', 'FAILED': '失败' } display_status = status_map.get(task.status, task.status) characters = len(task.result_text) if task.result_text else None recognition_text = task.result_text # 完整文本,供下载使用 items.append( AudioHistoryItem( id=f"async-{task.id}", name=name, mode="异步转写", duration=float(task.duration) if task.duration is not None else None, characters=characters, status=display_status, audio_url=None, # 异步转写结果是文本,不提供音频下载 created_at=task.created_at.isoformat() if task.created_at else "", recognition_text=recognition_text, language=None ) ) return ApiResponse( code=200, message="success", data=AudioHistoryListResponse( total=total, items=items ) ) @router.put("/history/synthesis/{record_id}/name", response_model=ApiResponse[None]) def update_synthesis_name( record_id: int, request: UpdateAudioNameRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user_from_request) ): """ 更新语音合成记录的自定义名称 """ record = db.query(AudioSynthesis).filter( AudioSynthesis.id == record_id, AudioSynthesis.user_id == current_user.id ).first() if not record: raise HTTPException(status_code=404, detail="记录不存在") record.custom_name = request.custom_name.strip() db.commit() return ApiResponse( code=200, message="success", data=None )