""" 语音识别服务V2 提供异步语音识别的业务逻辑处理 """ import logging import requests from datetime import datetime from typing import List, Optional from sqlalchemy.orm import Session from sqlalchemy import desc from fastapi import HTTPException from app.models.audio import ASRRecognitionV2 from app.schemas.audio_v2 import ( ASRRecognitionV2CreateRequest, ASRRecognitionV2Response, ASRRecognitionV2ListResponse, TaskListQueryParams ) from .base_service import BaseV2Service logger = logging.getLogger(__name__) class ASRRecognitionV2Service(BaseV2Service): """语音识别服务V2(异步模式)""" # DashScope API基础URL DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/api/v1" # 有效的ASR模型 VALID_MODELS = ["qwen3-asr-flash-filetrans", "qwen-audio-asr"] async def create_task( self, request: ASRRecognitionV2CreateRequest ) -> ASRRecognitionV2Response: """ 创建语音识别任务 Args: request: 创建请求 Returns: 任务响应 Raises: HTTPException: 创建失败 """ # 验证模型 if request.model not in self.VALID_MODELS: raise HTTPException( status_code=400, detail=f"无效的模型,支持的模型: {self.VALID_MODELS}" ) try: # 调用DashScope API提交异步任务 url = f"{self.DASHSCOPE_BASE_URL}/services/audio/asr/transcription" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "X-DashScope-Async": "enable" } payload = { "model": request.model, "input": {"file_url": request.file_url} } response = requests.post(url, headers=headers, json=payload, timeout=30) if response.status_code != 200: error_data = response.json() if response.text else {} error_msg = error_data.get("message", f"HTTP {response.status_code}") logger.error(f"提交ASR任务失败: {error_msg}") raise HTTPException( status_code=502, detail=f"提交识别任务失败: {error_msg}" ) data = response.json() output = data.get("output", {}) task_id = output.get("task_id") if not task_id: raise HTTPException( status_code=502, detail="提交识别任务失败,未返回task_id" ) # 保存到数据库 asr_task = ASRRecognitionV2( user_id=self.user_id, task_id=task_id, model=request.model, file_url=request.file_url, status="PENDING" ) self.db.add(asr_task) self.db.commit() self.db.refresh(asr_task) return ASRRecognitionV2Response.from_orm(asr_task) except HTTPException: raise except requests.exceptions.Timeout: raise HTTPException(status_code=504, detail="提交识别任务超时") except Exception as e: logger.error(f"创建ASR任务失败: {type(e).__name__}: {str(e)}") raise HTTPException( status_code=502, detail=f"创建识别任务失败: {str(e)}" ) async def get_task(self, task_id: str) -> ASRRecognitionV2Response: """ 查询任务详情 Args: task_id: 任务ID Returns: 任务响应 Raises: HTTPException: 任务不存在 """ task = self.db.query(ASRRecognitionV2).filter( ASRRecognitionV2.task_id == task_id, ASRRecognitionV2.user_id == self.user_id ).first() if not task: raise HTTPException(status_code=404, detail="任务不存在") # 如果任务未完成,查询最新状态 if task.status in ["PENDING", "PROCESSING"]: await self._update_task_status(task) return ASRRecognitionV2Response.from_orm(task) async def list_tasks( self, params: TaskListQueryParams ) -> ASRRecognitionV2ListResponse: """ 查询任务列表 Args: params: 查询参数 Returns: 任务列表响应 """ query = self.db.query(ASRRecognitionV2).filter( ASRRecognitionV2.user_id == self.user_id ) # 状态筛选 if params.status: query = query.filter(ASRRecognitionV2.status == params.status) # 总数 total = query.count() # 排序 if params.order_by == "created_at": order_column = ASRRecognitionV2.created_at elif params.order_by == "updated_at": order_column = ASRRecognitionV2.updated_at else: order_column = ASRRecognitionV2.created_at if params.order == "desc": query = query.order_by(desc(order_column)) else: query = query.order_by(order_column) # 分页 offset = (params.page - 1) * params.page_size tasks = query.offset(offset).limit(params.page_size).all() items = [ASRRecognitionV2Response.from_orm(task) for task in tasks] return ASRRecognitionV2ListResponse(total=total, items=items) async def _update_task_status(self, task: ASRRecognitionV2) -> None: """ 更新任务状态(从DashScope查询) Args: task: 任务对象 """ try: url = f"{self.DASHSCOPE_BASE_URL}/tasks/{task.task_id}" headers = { "Authorization": f"Bearer {self.api_key}", "X-DashScope-Async": "enable" } response = requests.get(url, headers=headers, timeout=30) if response.status_code != 200: logger.warning(f"查询任务状态失败: {response.status_code}") return data = response.json() output = data.get("output", {}) # 更新状态 new_status = output.get("task_status", task.status) task.status = new_status task.updated_at = datetime.now() # 如果任务完成,提取结果 if new_status == "SUCCEEDED": result = output.get("result", {}) # 提取识别文本 transcripts = result.get("transcripts", []) if transcripts: task.result_text = transcripts[0].get("text", "") # 提取结果URL task.result_url = result.get("transcription_url") # 提取时长 usage = data.get("usage", {}) duration = usage.get("seconds", 0) task.duration = duration task.completed_at = datetime.now() elif new_status == "FAILED": # 提取错误信息 task.error_message = output.get("message", "识别失败") task.completed_at = datetime.now() self.db.commit() except Exception as e: logger.error(f"更新任务状态失败: {type(e).__name__}: {str(e)}")