| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- """
- 数字人合成服务
- 提供数字人图像检测和视频合成功能
- """
- import logging
- import httpx
- from datetime import datetime
- from decimal import Decimal
- from typing import Optional
- from fastapi import HTTPException
- from sqlalchemy.orm import Session
- from app.models.ai_video import AIVideo
- from app.schemas.video_schema import (
- AvatarDetectRequest, AvatarDetectResponse,
- AvatarGenerateRequest, VideoTaskResponse, VideoTaskResult
- )
- from app.services.oss_service import get_oss_service
- logger = logging.getLogger(__name__)
- DASHSCOPE_BASE_URL = "https://dashscope.aliyuncs.com/api/v1"
- class AvatarService:
- """数字人合成服务"""
- def __init__(self, db: Session, user_id: int, api_key: str):
- self.db = db
- self.user_id = user_id
- self.api_key = api_key
- self.oss_service = get_oss_service()
- async def detect_image(self, image_url: str) -> AvatarDetectResponse:
- """图像检测 - 调用wan2.2-s2v-detect"""
- url = f"{DASHSCOPE_BASE_URL}/services/aigc/image2video/face-detect"
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {self.api_key}"
- }
- body = {
- "model": "wan2.2-s2v-detect",
- "input": {"image_url": image_url}
- }
- async with httpx.AsyncClient(timeout=30.0) as client:
- response = await client.post(url, json=body, headers=headers)
- result = response.json()
- if "code" in result:
- return AvatarDetectResponse(
- check_pass=False,
- humanoid=False,
- message=result.get("message", "检测失败")
- )
- output = result.get("output", {})
- return AvatarDetectResponse(
- check_pass=output.get("check_pass", False),
- humanoid=output.get("humanoid", False),
- message=output.get("message")
- )
- async def generate(self, request: AvatarGenerateRequest) -> VideoTaskResponse:
- """数字人视频合成 - 调用wan2.2-s2v"""
- url = f"{DASHSCOPE_BASE_URL}/services/aigc/image2video/video-synthesis"
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {self.api_key}",
- "X-DashScope-Async": "enable"
- }
- body = {
- "model": "wan2.2-s2v",
- "input": {
- "image_url": request.image_url,
- "audio_url": request.audio_url
- },
- "parameters": {
- "resolution": request.resolution
- }
- }
- async with httpx.AsyncClient(timeout=30.0) as client:
- response = await client.post(url, json=body, headers=headers)
- result = response.json()
- if "code" in result:
- raise Exception(f"创建任务失败: {result.get('message')}")
- task_id = result["output"]["task_id"]
- task_status = result["output"]["task_status"]
- # 保存记录
- record = AIVideo(
- user_id=self.user_id,
- task_id=task_id,
- model_name="wan2.2-s2v",
- video_type="s2v",
- input_params={
- "image_url": request.image_url,
- "audio_url": request.audio_url,
- "resolution": request.resolution
- },
- audio_url=request.audio_url,
- resolution=request.resolution,
- status=task_status,
- submit_time=datetime.now()
- )
- self.db.add(record)
- self.db.commit()
- return VideoTaskResponse(task_id=task_id, task_status=task_status)
- async def get_task_status(self, task_id: str) -> VideoTaskResult:
- """查询任务状态"""
- # 验证任务归属
- record = self.db.query(AIVideo).filter(
- AIVideo.task_id == task_id,
- AIVideo.user_id == self.user_id
- ).first()
- if not record:
- raise Exception("任务不存在")
- # 如果已完成,直接返回
- if record.status in ["SUCCEEDED", "FAILED"]:
- return VideoTaskResult(
- task_id=task_id,
- task_status=record.status,
- video_url=record.video_url,
- video_duration=float(record.video_duration) if record.video_duration else None,
- error_message=record.error_message
- )
- # 查询百炼API
- url = f"{DASHSCOPE_BASE_URL}/tasks/{task_id}"
- headers = {"Authorization": f"Bearer {self.api_key}"}
- async with httpx.AsyncClient(timeout=30.0) as client:
- response = await client.get(url, headers=headers)
- result = response.json()
- output = result.get("output", {})
- task_status = output.get("task_status", "UNKNOWN")
- # 更新数据库
- record.status = task_status
- if task_status == "RUNNING" and not record.scheduled_time:
- record.scheduled_time = datetime.now()
- if task_status == "SUCCEEDED":
- record.end_time = datetime.now()
- # 注意: 视频URL在 output.results.video_url
- results = output.get("results", {})
- video_url = results.get("video_url")
- if video_url:
- # 下载到OSS
- oss_url = await self.oss_service.upload_from_url(video_url, "ai-videos")
- record.video_url = oss_url
- usage = result.get("usage", {})
- video_duration = usage.get("duration", 0)
- record.video_duration = Decimal(str(video_duration))
- # 费用(API调用免费)
- record.bill = Decimal("0")
- elif task_status == "FAILED":
- record.end_time = datetime.now()
- record.error_message = output.get("message", "任务失败")
- self.db.commit()
- return VideoTaskResult(
- task_id=task_id,
- task_status=task_status,
- video_url=record.video_url,
- video_duration=float(record.video_duration) if record.video_duration else None,
- error_message=record.error_message
- )
|