| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- """扣减补偿服务
- 当租户或超管扣减因异常(网络、DB超时等)失败时,写入 pending_deductions 表,
- 后台任务定期扫描重试,利用 biz_order_no 幂等机制保证不重复扣减。
- """
- import logging
- from datetime import datetime, timezone
- from decimal import Decimal
- from sqlalchemy import select, update, func
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.models.compensation import PendingDeduction
- from app.models.monitoring import SuperAdmin
- logger = logging.getLogger(__name__)
- async def record_pending(
- db: AsyncSession,
- target_type: str,
- target_id: int,
- amount: Decimal,
- biz_order_no: str,
- error_msg: str = "",
- ):
- """写入一条待补偿记录"""
- record = PendingDeduction(
- target_type=target_type,
- target_id=target_id,
- amount=amount,
- biz_order_no=biz_order_no,
- last_error=error_msg[:500] if error_msg else "",
- )
- db.add(record)
- await db.commit()
- logger.warning(
- "写入待补偿记录: type=%s id=%d amount=%s order=%s error=%s",
- target_type, target_id, amount, biz_order_no, error_msg[:100],
- )
- async def process_pending(db: AsyncSession) -> int:
- """处理所有待补偿记录,返回成功处理的数量"""
- # 取最早创建的 50 条(FIFO)
- result = await db.execute(
- select(PendingDeduction)
- .order_by(PendingDeduction.created_at)
- .limit(50)
- )
- records = result.scalars().all()
- if not records:
- return 0
- success_count = 0
- for rec in records:
- ok = False
- error_msg = ""
- try:
- if rec.target_type == "tenant":
- ok, error_msg = await _retry_tenant_deduct(db, rec)
- elif rec.target_type == "sa":
- ok, error_msg = await _retry_sa_deduct(db, rec)
- else:
- logger.error("未知的 target_type: %s", rec.target_type)
- error_msg = f"unknown target_type: {rec.target_type}"
- except Exception as e:
- logger.exception("补偿重试异常: id=%d", rec.id)
- error_msg = str(e)[:500]
- if ok:
- await db.delete(rec)
- await db.commit()
- success_count += 1
- logger.info("补偿成功: type=%s id=%d order=%s", rec.target_type, rec.target_id, rec.biz_order_no)
- else:
- # 更新重试次数和错误信息
- rec.retry_count += 1
- rec.last_error = error_msg[:500]
- rec.updated_at = datetime.now(timezone.utc)
- await db.commit()
- if rec.retry_count >= 10:
- logger.error(
- "补偿记录重试超过10次: type=%s id=%d order=%s error=%s",
- rec.target_type, rec.target_id, rec.biz_order_no, error_msg,
- )
- return success_count
- async def _retry_tenant_deduct(db: AsyncSession, rec: PendingDeduction) -> tuple[bool, str]:
- """重试租户扣减"""
- # 本地租户扣减在 AIGC-Space 侧,这里只处理控制面板侧的超管扣减
- # 如果未来需要在控制面板侧也管理租户扣减补偿,在这里实现
- return False, "租户补偿暂不支持(应在 AIGC-Space 侧处理)"
- async def _retry_sa_deduct(db: AsyncSession, rec: PendingDeduction) -> tuple[bool, str]:
- """重试超管扣减"""
- from app.services.sa_balance import deduct
- ok, reason = await deduct(db, rec.target_id, rec.amount, rec.biz_order_no)
- if ok:
- return True, ""
- # 余额不足也算"处理完成"(虽然扣不了,但不需要再重试了)
- if "余额不足" in reason:
- logger.warning("补偿扣减: 超管 %d 余额不足,跳过不再重试, order=%s", rec.target_id, rec.biz_order_no)
- return True, ""
- return False, reason
|