| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416 |
- from datetime import date
- from sqlalchemy import select, func, cast, Date
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.models.monitoring import (
- SuperAdmin,
- SuperAdminTenant,
- Tenant,
- Model,
- UserConsumptionDetail,
- )
- from app.schemas.monitoring import (
- DashboardResponse,
- Overview,
- SuperAdminData,
- TenantData,
- UserConsumption,
- ConsumptionRecord,
- ConsumptionDetailResponse,
- ConsumptionDetailRecord,
- DailyStatsResponse,
- DailySAStat,
- DailyTenantStat,
- )
- async def get_dashboard(
- db: AsyncSession,
- start_date: str | None = None,
- end_date: str | None = None,
- super_admin_id: int | None = None,
- super_admin_name: str | None = None,
- ) -> DashboardResponse:
- """
- 获取监控大屏数据
- 层级结构:平台 → 超级管理员 → 租户 → 用户
- 每个用户节点包含消费记录流水(扁平列表)。
- """
- # 1. 查询所有超级管理员(按ID或名称过滤)
- sa_stmt = select(SuperAdmin)
- if super_admin_id is not None:
- sa_stmt = sa_stmt.where(SuperAdmin.id == super_admin_id)
- if super_admin_name:
- sa_stmt = sa_stmt.where(
- (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) |
- (SuperAdmin.username.ilike(f"%{super_admin_name}%"))
- )
- sa_result = await db.execute(sa_stmt)
- super_admins = sa_result.scalars().all()
- # 2. 查询超级管理员-租户关联关系
- sat_stmt = select(SuperAdminTenant)
- if super_admin_id is not None:
- sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id == super_admin_id)
- if super_admin_name:
- sa_ids = [sa.id for sa in super_admins]
- if sa_ids:
- sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids))
- sat_result = await db.execute(sat_stmt)
- sa_tenant_map: dict[int, list[int]] = {}
- all_tenant_ids: set[int] = set()
- for row in sat_result.scalars().all():
- sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id)
- all_tenant_ids.add(row.tenant_id)
- # 3. 查询所有租户信息
- tenant_result = await db.execute(select(Tenant).where(Tenant.id.in_(all_tenant_ids)))
- tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()}
- # 4. 查询所有模型信息
- model_result = await db.execute(select(Model))
- model_map: dict[str, Model] = {m.model_code: m for m in model_result.scalars().all()}
- # 5. 查询消费明细(按时间范围过滤)
- start_dt = date.fromisoformat(start_date) if start_date else None
- end_dt = date.fromisoformat(end_date) if end_date else None
- base_stmt = select(UserConsumptionDetail)
- if start_dt:
- base_stmt = base_stmt.where(
- cast(UserConsumptionDetail.consumption_date, Date) >= start_dt
- )
- if end_dt:
- base_stmt = base_stmt.where(
- cast(UserConsumptionDetail.consumption_date, Date) <= end_dt
- )
- if super_admin_id is not None:
- matched_tenant_ids = sa_tenant_map.get(super_admin_id, [])
- if matched_tenant_ids:
- base_stmt = base_stmt.where(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids))
- consumption_result = await db.execute(base_stmt.order_by(UserConsumptionDetail.created_at))
- consumptions = consumption_result.scalars().all()
- # 7. 按 租户 → 用户 聚合,每条消费记录作为独立条目
- # 结构: {tenant_id: {user_id: [consumption_records]}}
- agg: dict[int, dict[str, list]] = {}
- for c in consumptions:
- agg.setdefault(c.tenant_id, {})
- agg[c.tenant_id].setdefault(c.user_id, [])
- tenant = tenant_map.get(c.tenant_id)
- model_info = model_map.get(c.model_code)
- user_discount_val = float(c.user_discount or 1)
- sa_discount_val = float(c.super_admin_discount or 1)
- agg[c.tenant_id][c.user_id].append(ConsumptionRecord(
- user_id=c.user_id,
- username=c.user_id,
- tenant_name=tenant.company_name if tenant else None,
- order_no=c.order_no or "",
- model_name=model_info.model_name if model_info else c.model_code,
- model_code=c.model_code,
- amount=f"{float(c.user_actual_total or 0):.4f}",
- created_at=str(c.created_at) if c.created_at else "",
- invoiced=bool(c.invoiced),
- user_discount=f"{user_discount_val:.4f}",
- user_actual_price=f"{float(c.user_actual_price or 0):.4f}",
- tenant_discount=f"{float(c.tenant_discount or 1):.4f}",
- tenant_actual_price=f"{float(c.tenant_actual_price or 0):.4f}",
- super_admin_discount=f"{sa_discount_val:.4f}",
- super_admin_actual_price=f"{float(c.super_admin_actual_price or 0):.4f}",
- ))
- # 8. 构建超级管理员数据
- sa_data_list: list[SuperAdminData] = []
- for sa in super_admins:
- tenant_ids = sa_tenant_map.get(sa.id, [])
- tenant_data_list: list[TenantData] = []
- for tid in tenant_ids:
- tenant = tenant_map.get(tid)
- if not tenant:
- continue
- users_map = agg.get(tid, {})
- user_list: list[UserConsumption] = []
- for uid, records in users_map.items():
- user_list.append(UserConsumption(
- user_id=uid,
- username=uid,
- nickname=None,
- total_consumption=f"{sum(float(r.amount) for r in records):.4f}",
- tenant_actual_total=f"{sum(float(r.tenant_actual_price) for r in records):.4f}",
- tenant_name=tenant.company_name if tenant else None,
- consumption_records=records,
- ))
- tenant_total_consumption = sum(float(r.amount) for records in users_map.values() for r in records)
- tenant_total_charged = sum(float(r.tenant_actual_price) for records in users_map.values() for r in records)
- tenant_data_list.append(TenantData(
- tenant_id=tid,
- company_name=tenant.company_name,
- subdomain=tenant.subdomain,
- total_consumption=f"{tenant_total_consumption:.4f}",
- total_tenant_charged=f"{tenant_total_charged:.4f}",
- balance=f"{float(tenant.balance or 0):.4f}",
- user_count=len(users_map),
- users=user_list,
- ))
- sa_total_consumption = sum(float(t.total_consumption) for t in tenant_data_list)
- sa_total_charged = sum(float(t.total_tenant_charged) for t in tenant_data_list)
- sa_data_list.append(SuperAdminData(
- super_admin_id=sa.id,
- username=sa.username,
- nickname=sa.nickname,
- remark=sa.remark,
- tenant_count=len(tenant_data_list),
- total_consumption=f"{sa_total_consumption:.4f}",
- total_tenant_charged=f"{sa_total_charged:.4f}",
- tenants=tenant_data_list,
- ))
- # 9. 构建平台汇总
- total_super_admins = len(sa_data_list)
- total_tenants = sum(sa.tenant_count for sa in sa_data_list)
- total_users = sum(t.user_count for sa in sa_data_list for t in sa.tenants)
- total_consumption = sum(float(t.total_consumption) for sa in sa_data_list for t in sa.tenants)
- total_tenant_charged = sum(float(t.total_tenant_charged) for sa in sa_data_list for t in sa.tenants)
- total_balance = sum(float(t.balance) for sa in sa_data_list for t in sa.tenants)
- return DashboardResponse(
- overview=Overview(
- total_super_admins=total_super_admins,
- total_tenants=total_tenants,
- total_users=total_users,
- total_consumption=f"{total_consumption:.4f}",
- total_tenant_charged=f"{total_tenant_charged:.4f}",
- total_balance=f"{total_balance:.4f}",
- ),
- super_admins=sa_data_list,
- start_date=start_date,
- end_date=end_date,
- )
- async def get_consumption_details(
- db: AsyncSession,
- start_date: str | None = None,
- end_date: str | None = None,
- super_admin_name: str | None = None,
- tenant_name: str | None = None,
- ) -> ConsumptionDetailResponse:
- """
- 查询原始消费明细表,每条记录一行,不做任何聚合
- 用于对账场景
- """
- # 1. 查出所有 SA、关联关系、租户
- sa_stmt = select(SuperAdmin)
- if super_admin_name:
- sa_stmt = sa_stmt.where(
- (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) |
- (SuperAdmin.username.ilike(f"%{super_admin_name}%"))
- )
- sa_result = await db.execute(sa_stmt)
- all_sas = sa_result.scalars().all()
- sa_name_map = {sa.id: sa.nickname or sa.username for sa in all_sas}
- sat_stmt = select(SuperAdminTenant)
- if super_admin_name:
- sa_ids = [sa.id for sa in all_sas]
- if sa_ids:
- sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids))
- sat_result = await db.execute(sat_stmt)
- sa_tenant_map: dict[int, list[int]] = {}
- all_tenant_ids: set[int] = set()
- for row in sat_result.scalars().all():
- sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id)
- all_tenant_ids.add(row.tenant_id)
- # 查询所有租户(tenant_name 筛选时独立过滤,不受 SA 关联限制)
- tenant_stmt = select(Tenant)
- tenant_conditions = []
- if tenant_name:
- tenant_conditions.append(
- (Tenant.company_name.ilike(f"%{tenant_name}%")) |
- (Tenant.subdomain.ilike(f"%{tenant_name}%"))
- )
- if tenant_conditions:
- tenant_stmt = tenant_stmt.where(*tenant_conditions)
- tenant_result = await db.execute(tenant_stmt)
- tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()}
- # 2. 查原始消费明细
- stmt = select(UserConsumptionDetail)
- start_dt = date.fromisoformat(start_date) if start_date else None
- end_dt = date.fromisoformat(end_date) if end_date else None
- if start_dt:
- stmt = stmt.where(cast(UserConsumptionDetail.consumption_date, Date) >= start_dt)
- if end_dt:
- stmt = stmt.where(cast(UserConsumptionDetail.consumption_date, Date) <= end_dt)
- if super_admin_name:
- matched_tenant_ids = set()
- for sid in [sa.id for sa in all_sas]:
- matched_tenant_ids.update(sa_tenant_map.get(sid, []))
- if matched_tenant_ids:
- stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids))
- if tenant_name:
- filtered_tenant_ids = set(tenant_map.keys())
- if filtered_tenant_ids:
- stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(filtered_tenant_ids))
- stmt = stmt.order_by(UserConsumptionDetail.consumption_date.desc())
- result = await db.execute(stmt)
- records = result.scalars().all()
- # 3. 拼装返回
- items: list[ConsumptionDetailRecord] = []
- for c in records:
- tenant = tenant_map.get(c.tenant_id)
- t_name = tenant.company_name or tenant.subdomain if tenant else "-"
- t_discount = float(c.tenant_discount or 1)
- sa_discount = float(c.super_admin_discount or 1)
- items.append(ConsumptionDetailRecord(
- user_id=c.user_id,
- user_name=c.username or c.user_id,
- tenant_name=t_name,
- order_no=c.order_no or "",
- model_code=c.model_code,
- consumption_date=str(c.consumption_date) if c.consumption_date else "",
- tenant_consumed=f"{float(c.tenant_actual_total or 0):.4f}",
- user_discount=f"{float(c.user_discount or 1):.4f}",
- user_consumed=f"{float(c.user_actual_total or 0):.4f}",
- tenant_discount=f"{t_discount:.4f}",
- tenant_actual_price=f"{float(c.tenant_actual_price or 0):.4f}",
- super_admin_discount=f"{sa_discount:.4f}",
- super_admin_actual_price=f"{float(c.super_admin_actual_price or 0):.4f}",
- ))
- return ConsumptionDetailResponse(total=len(items), records=items)
- async def get_daily_stats(
- db: AsyncSession,
- start_date: str,
- end_date: str,
- super_admin_name: str | None = None,
- tenant_name: str | None = None,
- ) -> DailyStatsResponse:
- """
- 查询按日聚合的消费统计:超级管理员 → 租户 → 每日消费金额
- 按 consumption_date 的日期部分分组,汇总每个租户每天的消费和收取金额
- """
- # 1. 查出所有 SA、关联关系、租户
- sa_stmt = select(SuperAdmin)
- if super_admin_name:
- sa_stmt = sa_stmt.where(
- (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) |
- (SuperAdmin.username.ilike(f"%{super_admin_name}%"))
- )
- sa_result = await db.execute(sa_stmt)
- all_sas = sa_result.scalars().all()
- sa_name_map = {sa.id: sa.nickname or sa.username for sa in all_sas}
- sat_stmt = select(SuperAdminTenant)
- if super_admin_name:
- sa_ids = [sa.id for sa in all_sas]
- if sa_ids:
- sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids))
- sat_result = await db.execute(sat_stmt)
- sa_tenant_map: dict[int, list[int]] = {}
- all_tenant_ids: set[int] = set()
- for row in sat_result.scalars().all():
- sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id)
- all_tenant_ids.add(row.tenant_id)
- tenant_stmt = select(Tenant).where(Tenant.id.in_(all_tenant_ids))
- if tenant_name:
- tenant_stmt = tenant_stmt.where(
- (Tenant.company_name.ilike(f"%{tenant_name}%")) |
- (Tenant.subdomain.ilike(f"%{tenant_name}%"))
- )
- tenant_result = await db.execute(tenant_stmt)
- tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()}
- filtered_tenant_ids = set(tenant_map.keys())
- # 2. 按日 + 租户聚合消费明细
- start_dt = date.fromisoformat(start_date)
- end_dt = date.fromisoformat(end_date)
- stmt = select(
- cast(UserConsumptionDetail.consumption_date, Date).label("stat_date"),
- UserConsumptionDetail.tenant_id,
- func.sum(UserConsumptionDetail.user_actual_total).label("total_consumption"),
- func.sum(UserConsumptionDetail.tenant_actual_total).label("total_charged"),
- ).where(
- cast(UserConsumptionDetail.consumption_date, Date) >= start_dt,
- cast(UserConsumptionDetail.consumption_date, Date) <= end_dt,
- )
- if super_admin_name:
- matched_tenant_ids = set()
- for sid in [sa.id for sa in all_sas]:
- matched_tenant_ids.update(sa_tenant_map.get(sid, []))
- if matched_tenant_ids:
- stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids))
- if tenant_name and filtered_tenant_ids:
- stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(filtered_tenant_ids))
- stmt = stmt.group_by("stat_date", UserConsumptionDetail.tenant_id).order_by("stat_date")
- result = await db.execute(stmt)
- rows = result.fetchall()
- # 3. 组装数据:按 SA → 租户 → 日期分组
- # daily[sa_id][tenant_id] = {date: {consumption, charged}}
- daily: dict[int, dict[int, dict[str, dict]]] = {}
- for row in rows:
- stat_date = str(row.stat_date)
- tid = row.tenant_id
- # 找到该租户所属的 SA
- sa_id = None
- for sid, tids in sa_tenant_map.items():
- if tid in tids:
- sa_id = sid
- break
- if sa_id is None:
- continue
- daily.setdefault(sa_id, {})
- daily[sa_id].setdefault(tid, {})
- daily[sa_id][tid][stat_date] = {
- "consumption": f"{float(row.total_consumption):.4f}",
- "charged": f"{float(row.total_charged):.4f}",
- }
- # 4. 构建返回
- sa_stats: list[DailySAStat] = []
- for sa in all_sas:
- sa_id = sa.id
- tenant_ids = sa_tenant_map.get(sa_id, [])
- # 收集所有日期
- all_dates: set[str] = set()
- for tid in tenant_ids:
- if tid in daily.get(sa_id, {}):
- all_dates.update(daily[sa_id][tid].keys())
- sorted_dates = sorted(all_dates)
- tenant_stats: list[DailyTenantStat] = []
- for tid in tenant_ids:
- tenant = tenant_map.get(tid)
- if not tenant:
- continue
- tname = tenant.company_name or tenant.subdomain
- for d in sorted_dates:
- stat = daily.get(sa_id, {}).get(tid, {}).get(d)
- tenant_stats.append(DailyTenantStat(
- tenant_name=tname,
- date=d,
- consumption=stat["consumption"] if stat else "0.0000",
- charged=stat["charged"] if stat else "0.0000",
- ))
- sa_stats.append(DailySAStat(
- sa_name=sa.remark or sa.username,
- date="",
- consumption="0",
- charged="0",
- tenants=tenant_stats,
- ))
- return DailyStatsResponse(sa_stats=sa_stats)
|