""" 数字人合成服务 提供数字人图像检测和视频合成功能 """ import logging import httpx from datetime import datetime from decimal import Decimal from typing import Optional from fastapi import HTTPException from sqlalchemy.orm import Session from app.models.ai_video import AIVideo from app.schemas.video_schema import ( AvatarDetectRequest, AvatarDetectResponse, AvatarGenerateRequest, VideoTaskResponse, VideoTaskResult ) from app.services.oss_service import get_oss_service logger = logging.getLogger(__name__) DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/api/v1" class AvatarService: """数字人合成服务""" def __init__(self, db: Session, user_id: int, api_key: str): self.db = db self.user_id = user_id self.api_key = api_key self.oss_service = get_oss_service() async def detect_image(self, image_url: str) -> AvatarDetectResponse: """图像检测 - 调用wan2.2-s2v-detect""" url = f"{DASHSCOPE_BASE_URL}/services/aigc/image2video/face-detect" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } body = { "model": "wan2.2-s2v-detect", "input": {"image_url": image_url} } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(url, json=body, headers=headers) result = response.json() if "code" in result: return AvatarDetectResponse( check_pass=False, humanoid=False, message=result.get("message", "检测失败") ) output = result.get("output", {}) return AvatarDetectResponse( check_pass=output.get("check_pass", False), humanoid=output.get("humanoid", False), message=output.get("message") ) async def generate(self, request: AvatarGenerateRequest) -> VideoTaskResponse: """数字人视频合成 - 调用wan2.2-s2v""" url = f"{DASHSCOPE_BASE_URL}/services/aigc/image2video/video-synthesis" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}", "X-DashScope-Async": "enable" } body = { "model": "wan2.2-s2v", "input": { "image_url": request.image_url, "audio_url": request.audio_url }, "parameters": { "resolution": request.resolution } } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(url, json=body, headers=headers) result = response.json() if "code" in result: raise Exception(f"创建任务失败: {result.get('message')}") task_id = result["output"]["task_id"] task_status = result["output"]["task_status"] # 保存记录 record = AIVideo( user_id=self.user_id, task_id=task_id, model_name="wan2.2-s2v", video_type="s2v", input_params={ "image_url": request.image_url, "audio_url": request.audio_url, "resolution": request.resolution }, audio_url=request.audio_url, resolution=request.resolution, status=task_status, submit_time=datetime.now() ) self.db.add(record) self.db.commit() return VideoTaskResponse(task_id=task_id, task_status=task_status) async def get_task_status(self, task_id: str) -> VideoTaskResult: """查询任务状态""" # 验证任务归属 record = self.db.query(AIVideo).filter( AIVideo.task_id == task_id, AIVideo.user_id == self.user_id ).first() if not record: raise Exception("任务不存在") # 如果已完成,直接返回 if record.status in ["SUCCEEDED", "FAILED"]: return VideoTaskResult( task_id=task_id, task_status=record.status, video_url=record.video_url, video_duration=float(record.video_duration) if record.video_duration else None, error_message=record.error_message ) # 查询百炼API url = f"{DASHSCOPE_BASE_URL}/tasks/{task_id}" headers = {"Authorization": f"Bearer {self.api_key}"} async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(url, headers=headers) result = response.json() output = result.get("output", {}) task_status = output.get("task_status", "UNKNOWN") # 更新数据库 record.status = task_status if task_status == "RUNNING" and not record.scheduled_time: record.scheduled_time = datetime.now() if task_status == "SUCCEEDED": record.end_time = datetime.now() # 注意: 视频URL在 output.results.video_url results = output.get("results", {}) video_url = results.get("video_url") if video_url: # 下载到OSS oss_url = await self.oss_service.upload_from_url(video_url, "ai-videos") record.video_url = oss_url usage = result.get("usage", {}) video_duration = usage.get("duration", 0) record.video_duration = Decimal(str(video_duration)) # 费用(API调用免费) record.bill = Decimal("0") elif task_status == "FAILED": record.end_time = datetime.now() record.error_message = output.get("message", "任务失败") self.db.commit() return VideoTaskResult( task_id=task_id, task_status=task_status, video_url=record.video_url, video_duration=float(record.video_duration) if record.video_duration else None, error_message=record.error_message )