asr_recognition_service.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. """
  2. 语音识别服务V2
  3. 提供异步语音识别的业务逻辑处理
  4. """
  5. import logging
  6. import requests
  7. from datetime import datetime
  8. from typing import List, Optional
  9. from sqlalchemy.orm import Session
  10. from sqlalchemy import desc
  11. from fastapi import HTTPException
  12. from app.models.audio import ASRRecognitionV2
  13. from app.schemas.audio_v2 import (
  14. ASRRecognitionV2CreateRequest,
  15. ASRRecognitionV2Response,
  16. ASRRecognitionV2ListResponse,
  17. TaskListQueryParams
  18. )
  19. from .base_service import BaseV2Service
  20. logger = logging.getLogger(__name__)
  21. class ASRRecognitionV2Service(BaseV2Service):
  22. """语音识别服务V2(异步模式)"""
  23. # DashScope API基础URL
  24. DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/api/v1"
  25. # 有效的ASR模型
  26. VALID_MODELS = ["qwen3-asr-flash-filetrans", "qwen-audio-asr"]
  27. async def create_task(
  28. self,
  29. request: ASRRecognitionV2CreateRequest
  30. ) -> ASRRecognitionV2Response:
  31. """
  32. 创建语音识别任务
  33. Args:
  34. request: 创建请求
  35. Returns:
  36. 任务响应
  37. Raises:
  38. HTTPException: 创建失败
  39. """
  40. # 验证模型
  41. if request.model not in self.VALID_MODELS:
  42. raise HTTPException(
  43. status_code=400,
  44. detail=f"无效的模型,支持的模型: {self.VALID_MODELS}"
  45. )
  46. try:
  47. # 调用DashScope API提交异步任务
  48. url = f"{self.DASHSCOPE_BASE_URL}/services/audio/asr/transcription"
  49. headers = {
  50. "Authorization": f"Bearer {self.api_key}",
  51. "Content-Type": "application/json",
  52. "X-DashScope-Async": "enable"
  53. }
  54. payload = {
  55. "model": request.model,
  56. "input": {"file_url": request.file_url}
  57. }
  58. response = requests.post(url, headers=headers, json=payload, timeout=30)
  59. if response.status_code != 200:
  60. error_data = response.json() if response.text else {}
  61. error_msg = error_data.get("message", f"HTTP {response.status_code}")
  62. logger.error(f"提交ASR任务失败: {error_msg}")
  63. raise HTTPException(
  64. status_code=502,
  65. detail=f"提交识别任务失败: {error_msg}"
  66. )
  67. data = response.json()
  68. output = data.get("output", {})
  69. task_id = output.get("task_id")
  70. if not task_id:
  71. raise HTTPException(
  72. status_code=502,
  73. detail="提交识别任务失败,未返回task_id"
  74. )
  75. # 保存到数据库
  76. asr_task = ASRRecognitionV2(
  77. user_id=self.user_id,
  78. task_id=task_id,
  79. model=request.model,
  80. file_url=request.file_url,
  81. status="PENDING"
  82. )
  83. self.db.add(asr_task)
  84. self.db.commit()
  85. self.db.refresh(asr_task)
  86. return ASRRecognitionV2Response.from_orm(asr_task)
  87. except HTTPException:
  88. raise
  89. except requests.exceptions.Timeout:
  90. raise HTTPException(status_code=504, detail="提交识别任务超时")
  91. except Exception as e:
  92. logger.error(f"创建ASR任务失败: {type(e).__name__}: {str(e)}")
  93. raise HTTPException(
  94. status_code=502,
  95. detail=f"创建识别任务失败: {str(e)}"
  96. )
  97. async def get_task(self, task_id: str) -> ASRRecognitionV2Response:
  98. """
  99. 查询任务详情
  100. Args:
  101. task_id: 任务ID
  102. Returns:
  103. 任务响应
  104. Raises:
  105. HTTPException: 任务不存在
  106. """
  107. task = self.db.query(ASRRecognitionV2).filter(
  108. ASRRecognitionV2.task_id == task_id,
  109. ASRRecognitionV2.user_id == self.user_id
  110. ).first()
  111. if not task:
  112. raise HTTPException(status_code=404, detail="任务不存在")
  113. # 如果任务未完成,查询最新状态
  114. if task.status in ["PENDING", "PROCESSING"]:
  115. await self._update_task_status(task)
  116. return ASRRecognitionV2Response.from_orm(task)
  117. async def list_tasks(
  118. self,
  119. params: TaskListQueryParams
  120. ) -> ASRRecognitionV2ListResponse:
  121. """
  122. 查询任务列表
  123. Args:
  124. params: 查询参数
  125. Returns:
  126. 任务列表响应
  127. """
  128. query = self.db.query(ASRRecognitionV2).filter(
  129. ASRRecognitionV2.user_id == self.user_id
  130. )
  131. # 状态筛选
  132. if params.status:
  133. query = query.filter(ASRRecognitionV2.status == params.status)
  134. # 总数
  135. total = query.count()
  136. # 排序
  137. if params.order_by == "created_at":
  138. order_column = ASRRecognitionV2.created_at
  139. elif params.order_by == "updated_at":
  140. order_column = ASRRecognitionV2.updated_at
  141. else:
  142. order_column = ASRRecognitionV2.created_at
  143. if params.order == "desc":
  144. query = query.order_by(desc(order_column))
  145. else:
  146. query = query.order_by(order_column)
  147. # 分页
  148. offset = (params.page - 1) * params.page_size
  149. tasks = query.offset(offset).limit(params.page_size).all()
  150. items = [ASRRecognitionV2Response.from_orm(task) for task in tasks]
  151. return ASRRecognitionV2ListResponse(total=total, items=items)
  152. async def _update_task_status(self, task: ASRRecognitionV2) -> None:
  153. """
  154. 更新任务状态(从DashScope查询)
  155. Args:
  156. task: 任务对象
  157. """
  158. try:
  159. url = f"{self.DASHSCOPE_BASE_URL}/tasks/{task.task_id}"
  160. headers = {
  161. "Authorization": f"Bearer {self.api_key}",
  162. "X-DashScope-Async": "enable"
  163. }
  164. response = requests.get(url, headers=headers, timeout=30)
  165. if response.status_code != 200:
  166. logger.warning(f"查询任务状态失败: {response.status_code}")
  167. return
  168. data = response.json()
  169. output = data.get("output", {})
  170. # 更新状态
  171. new_status = output.get("task_status", task.status)
  172. task.status = new_status
  173. task.updated_at = datetime.now()
  174. # 如果任务完成,提取结果
  175. if new_status == "SUCCEEDED":
  176. result = output.get("result", {})
  177. # 提取识别文本
  178. transcripts = result.get("transcripts", [])
  179. if transcripts:
  180. task.result_text = transcripts[0].get("text", "")
  181. # 提取结果URL
  182. task.result_url = result.get("transcription_url")
  183. # 提取时长
  184. usage = data.get("usage", {})
  185. duration = usage.get("seconds", 0)
  186. task.duration = duration
  187. task.completed_at = datetime.now()
  188. elif new_status == "FAILED":
  189. # 提取错误信息
  190. task.error_message = output.get("message", "识别失败")
  191. task.completed_at = datetime.now()
  192. self.db.commit()
  193. except Exception as e:
  194. logger.error(f"更新任务状态失败: {type(e).__name__}: {str(e)}")