"""扣减补偿服务 当租户或超管扣减因异常(网络、DB超时等)失败时,写入 pending_deductions 表, 后台任务定期扫描重试,利用 biz_order_no 幂等机制保证不重复扣减。 """ import logging from datetime import datetime, timezone from decimal import Decimal from sqlalchemy import select, update, func from sqlalchemy.ext.asyncio import AsyncSession from app.models.compensation import PendingDeduction from app.models.monitoring import SuperAdmin logger = logging.getLogger(__name__) async def record_pending( db: AsyncSession, target_type: str, target_id: int, amount: Decimal, biz_order_no: str, error_msg: str = "", ): """写入一条待补偿记录""" record = PendingDeduction( target_type=target_type, target_id=target_id, amount=amount, biz_order_no=biz_order_no, last_error=error_msg[:500] if error_msg else "", ) db.add(record) await db.commit() logger.warning( "写入待补偿记录: type=%s id=%d amount=%s order=%s error=%s", target_type, target_id, amount, biz_order_no, error_msg[:100], ) async def process_pending(db: AsyncSession) -> int: """处理所有待补偿记录,返回成功处理的数量""" # 取最早创建的 50 条(FIFO) result = await db.execute( select(PendingDeduction) .order_by(PendingDeduction.created_at) .limit(50) ) records = result.scalars().all() if not records: return 0 success_count = 0 for rec in records: ok = False error_msg = "" try: if rec.target_type == "tenant": ok, error_msg = await _retry_tenant_deduct(db, rec) elif rec.target_type == "sa": ok, error_msg = await _retry_sa_deduct(db, rec) else: logger.error("未知的 target_type: %s", rec.target_type) error_msg = f"unknown target_type: {rec.target_type}" except Exception as e: logger.exception("补偿重试异常: id=%d", rec.id) error_msg = str(e)[:500] if ok: await db.delete(rec) await db.commit() success_count += 1 logger.info("补偿成功: type=%s id=%d order=%s", rec.target_type, rec.target_id, rec.biz_order_no) else: # 更新重试次数和错误信息 rec.retry_count += 1 rec.last_error = error_msg[:500] rec.updated_at = datetime.now(timezone.utc) await db.commit() if rec.retry_count >= 10: logger.error( "补偿记录重试超过10次: type=%s id=%d order=%s error=%s", rec.target_type, rec.target_id, rec.biz_order_no, error_msg, ) return success_count async def _retry_tenant_deduct(db: AsyncSession, rec: PendingDeduction) -> tuple[bool, str]: """重试租户扣减""" # 本地租户扣减在 AIGC-Space 侧,这里只处理控制面板侧的超管扣减 # 如果未来需要在控制面板侧也管理租户扣减补偿,在这里实现 return False, "租户补偿暂不支持(应在 AIGC-Space 侧处理)" async def _retry_sa_deduct(db: AsyncSession, rec: PendingDeduction) -> tuple[bool, str]: """重试超管扣减""" from app.services.sa_balance import deduct ok, reason = await deduct(db, rec.target_id, rec.amount, rec.biz_order_no) if ok: return True, "" # 余额不足也算"处理完成"(虽然扣不了,但不需要再重试了) if "余额不足" in reason: logger.warning("补偿扣减: 超管 %d 余额不足,跳过不再重试, order=%s", rec.target_id, rec.biz_order_no) return True, "" return False, reason