| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- """超级管理员余额管理服务"""
- import logging
- from decimal import Decimal
- from sqlalchemy import select, func
- from sqlalchemy.ext.asyncio import AsyncSession
- from sqlalchemy.exc import IntegrityError
- from app.models.monitoring import SuperAdmin, SuperAdminBalanceLog
- from app.config import settings
- logger = logging.getLogger(__name__)
- async def get_balance(db: AsyncSession, sa_id: int) -> Decimal:
- """获取超管余额"""
- result = await db.execute(
- select(SuperAdmin.balance).where(SuperAdmin.id == sa_id)
- )
- row = result.scalar_one_or_none()
- if row is None:
- raise ValueError(f"超管 {sa_id} 不存在")
- return Decimal(str(row or 0))
- async def check_balance(db: AsyncSession, sa_id: int) -> bool:
- """检查超管余额是否 > 0"""
- balance = await get_balance(db, sa_id)
- return balance > 0
- async def recharge(
- db: AsyncSession,
- sa_id: int,
- amount: Decimal,
- remark: str = "",
- ) -> dict:
- """充值超管余额。返回 {balance_after, message}"""
- if amount <= 0:
- raise ValueError("充值金额必须大于 0")
- result = await db.execute(
- select(SuperAdmin).where(SuperAdmin.id == sa_id).with_for_update()
- )
- sa = result.scalar_one_or_none()
- if not sa:
- raise ValueError(f"超管 {sa_id} 不存在")
- current = Decimal(str(sa.balance or 0))
- balance_after = current + amount
- sa.balance = balance_after
- # 充值后重置预警标记,允许下次余额下降时再次触发
- sa.balance_warning_sent = False
- sa.balance_depleted_sent = False
- log = SuperAdminBalanceLog(
- super_admin_id=sa_id,
- change_amount=amount,
- balance_after=balance_after,
- biz_type="recharge",
- remark=remark,
- )
- db.add(log)
- await db.commit()
- logger.info("超管 %d 充值 %s 元,余额 %s", sa_id, amount, balance_after)
- return {"balance_after": str(balance_after), "message": "充值成功"}
- async def deduct(
- db: AsyncSession,
- sa_id: int,
- amount: Decimal,
- biz_order_no: str,
- ) -> tuple[bool, str]:
- """扣减超管余额。返回 (success, reason)。幂等:同一 biz_order_no 不重复扣减。"""
- if amount <= 0:
- return False, "扣减金额必须大于 0"
- # 先检查幂等:是否已存在相同 biz_order_no 的记录
- if biz_order_no:
- exist = await db.execute(
- select(SuperAdminBalanceLog.id).where(
- SuperAdminBalanceLog.super_admin_id == sa_id,
- SuperAdminBalanceLog.biz_type == "consume",
- SuperAdminBalanceLog.biz_order_no == biz_order_no,
- )
- )
- if exist.scalar_one_or_none():
- return True, "已扣减(幂等)"
- result = await db.execute(
- select(SuperAdmin).where(SuperAdmin.id == sa_id).with_for_update()
- )
- sa = result.scalar_one_or_none()
- if not sa:
- return False, f"超管 {sa_id} 不存在"
- current = Decimal(str(sa.balance or 0))
- if current < amount:
- logger.warning("超管 %d 余额不足: 当前 %s, 需扣 %s", sa_id, current, amount)
- return False, "余额不足"
- balance_after = current - amount
- sa.balance = balance_after
- log = SuperAdminBalanceLog(
- super_admin_id=sa_id,
- change_amount=-amount,
- balance_after=balance_after,
- biz_type="consume",
- biz_order_no=biz_order_no,
- )
- db.add(log)
- try:
- await db.commit()
- except IntegrityError:
- await db.rollback()
- return True, "已扣减(幂等)"
- logger.info("超管 %d 扣减 %s 元(订单 %s),余额 %s", sa_id, amount, biz_order_no, balance_after)
- return True, "扣减成功"
- async def get_balance_logs(
- db: AsyncSession,
- sa_id: int,
- page: int = 1,
- size: int = 20,
- ) -> dict:
- """分页查询余额变动日志"""
- count_stmt = select(func.count()).select_from(SuperAdminBalanceLog).where(
- SuperAdminBalanceLog.super_admin_id == sa_id
- )
- total_result = await db.execute(count_stmt)
- total = total_result.scalar() or 0
- offset = (page - 1) * size
- stmt = (
- select(SuperAdminBalanceLog)
- .where(SuperAdminBalanceLog.super_admin_id == sa_id)
- .order_by(SuperAdminBalanceLog.created_at.desc())
- .offset(offset)
- .limit(size)
- )
- result = await db.execute(stmt)
- logs = result.scalars().all()
- items = []
- for log in logs:
- items.append({
- "id": log.id,
- "change_amount": str(log.change_amount),
- "balance_after": str(log.balance_after),
- "biz_type": log.biz_type,
- "biz_order_no": log.biz_order_no,
- "remark": log.remark,
- "created_at": log.created_at.isoformat() if log.created_at else None,
- })
- return {"total": total, "items": items}
- async def get_all_sa_balance(db: AsyncSession) -> list[dict]:
- """获取所有超管的余额信息(用于预警检查)"""
- result = await db.execute(
- select(SuperAdmin).order_by(SuperAdmin.id)
- )
- sas = result.scalars().all()
- return [
- {
- "id": sa.id,
- "username": sa.username,
- "remark": sa.remark,
- "phone": sa.phone,
- "balance": Decimal(str(sa.balance or 0)),
- "balance_warning_sent": sa.balance_warning_sent,
- "balance_depleted_sent": sa.balance_depleted_sent,
- }
- for sa in sas
- ]
- async def get_sa_balance_info(db: AsyncSession, sa_id: int) -> dict:
- """获取超管余额详情(余额 + 基本信息)"""
- result = await db.execute(
- select(SuperAdmin).where(SuperAdmin.id == sa_id)
- )
- sa = result.scalar_one_or_none()
- if not sa:
- raise ValueError(f"超管 {sa_id} 不存在")
- return {
- "id": sa.id,
- "username": sa.username,
- "nickname": sa.nickname,
- "remark": sa.remark,
- "phone": sa.phone,
- "balance": str(sa.balance or 0),
- "balance_warning_sent": sa.balance_warning_sent,
- "balance_depleted_sent": sa.balance_depleted_sent,
- }
|