from datetime import date from sqlalchemy import select, func, cast, Date from sqlalchemy.ext.asyncio import AsyncSession from app.models.monitoring import ( SuperAdmin, SuperAdminTenant, Tenant, Model, UserConsumptionDetail, ) from app.schemas.monitoring import ( DashboardResponse, Overview, SuperAdminData, TenantData, UserConsumption, ConsumptionRecord, ConsumptionDetailResponse, ConsumptionDetailRecord, DailyStatsResponse, DailySAStat, DailyTenantStat, ) async def get_dashboard( db: AsyncSession, start_date: str | None = None, end_date: str | None = None, super_admin_id: int | None = None, super_admin_name: str | None = None, ) -> DashboardResponse: """ 获取监控大屏数据 层级结构:平台 → 超级管理员 → 租户 → 用户 每个用户节点包含消费记录流水(扁平列表)。 """ # 1. 查询所有超级管理员(按ID或名称过滤) sa_stmt = select(SuperAdmin) if super_admin_id is not None: sa_stmt = sa_stmt.where(SuperAdmin.id == super_admin_id) if super_admin_name: sa_stmt = sa_stmt.where( (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) | (SuperAdmin.username.ilike(f"%{super_admin_name}%")) ) sa_result = await db.execute(sa_stmt) super_admins = sa_result.scalars().all() # 2. 查询超级管理员-租户关联关系 sat_stmt = select(SuperAdminTenant) if super_admin_id is not None: sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id == super_admin_id) if super_admin_name: sa_ids = [sa.id for sa in super_admins] if sa_ids: sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids)) sat_result = await db.execute(sat_stmt) sa_tenant_map: dict[int, list[int]] = {} all_tenant_ids: set[int] = set() for row in sat_result.scalars().all(): sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id) all_tenant_ids.add(row.tenant_id) # 3. 查询所有租户信息 tenant_result = await db.execute(select(Tenant).where(Tenant.id.in_(all_tenant_ids))) tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()} # 4. 查询所有模型信息 model_result = await db.execute(select(Model)) model_map: dict[str, Model] = {m.model_code: m for m in model_result.scalars().all()} # 5. 查询消费明细(按时间范围过滤) start_dt = date.fromisoformat(start_date) if start_date else None end_dt = date.fromisoformat(end_date) if end_date else None base_stmt = select(UserConsumptionDetail) if start_dt: base_stmt = base_stmt.where( cast(UserConsumptionDetail.consumption_date, Date) >= start_dt ) if end_dt: base_stmt = base_stmt.where( cast(UserConsumptionDetail.consumption_date, Date) <= end_dt ) if super_admin_id is not None: matched_tenant_ids = sa_tenant_map.get(super_admin_id, []) if matched_tenant_ids: base_stmt = base_stmt.where(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids)) consumption_result = await db.execute(base_stmt.order_by(UserConsumptionDetail.created_at)) consumptions = consumption_result.scalars().all() # 7. 按 租户 → 用户 聚合,每条消费记录作为独立条目 # 结构: {tenant_id: {user_id: [consumption_records]}} agg: dict[int, dict[str, list]] = {} for c in consumptions: agg.setdefault(c.tenant_id, {}) agg[c.tenant_id].setdefault(c.user_id, []) tenant = tenant_map.get(c.tenant_id) model_info = model_map.get(c.model_code) user_discount_val = float(c.user_discount or 1) sa_discount_val = float(c.super_admin_discount or 1) agg[c.tenant_id][c.user_id].append(ConsumptionRecord( user_id=c.user_id, username=c.user_id, tenant_name=tenant.company_name if tenant else None, order_no=c.order_no or "", model_name=model_info.model_name if model_info else c.model_code, model_code=c.model_code, amount=f"{float(c.user_actual_total or 0):.4f}", created_at=str(c.created_at) if c.created_at else "", invoiced=bool(c.invoiced), user_discount=f"{user_discount_val:.4f}", user_actual_price=f"{float(c.user_actual_price or 0):.4f}", tenant_discount=f"{float(c.tenant_discount or 1):.4f}", tenant_actual_price=f"{float(c.tenant_actual_price or 0):.4f}", super_admin_discount=f"{sa_discount_val:.4f}", super_admin_actual_price=f"{float(c.super_admin_actual_price or 0):.4f}", )) # 8. 构建超级管理员数据 sa_data_list: list[SuperAdminData] = [] for sa in super_admins: tenant_ids = sa_tenant_map.get(sa.id, []) tenant_data_list: list[TenantData] = [] for tid in tenant_ids: tenant = tenant_map.get(tid) if not tenant: continue users_map = agg.get(tid, {}) user_list: list[UserConsumption] = [] for uid, records in users_map.items(): user_list.append(UserConsumption( user_id=uid, username=uid, nickname=None, total_consumption=f"{sum(float(r.amount) for r in records):.4f}", tenant_actual_total=f"{sum(float(r.tenant_actual_price) for r in records):.4f}", tenant_name=tenant.company_name if tenant else None, consumption_records=records, )) tenant_total_consumption = sum(float(r.amount) for records in users_map.values() for r in records) tenant_total_charged = sum(float(r.tenant_actual_price) for records in users_map.values() for r in records) tenant_data_list.append(TenantData( tenant_id=tid, company_name=tenant.company_name, subdomain=tenant.subdomain, total_consumption=f"{tenant_total_consumption:.4f}", total_tenant_charged=f"{tenant_total_charged:.4f}", balance=f"{float(tenant.balance or 0):.4f}", user_count=len(users_map), users=user_list, )) sa_total_consumption = sum(float(t.total_consumption) for t in tenant_data_list) sa_total_charged = sum(float(t.total_tenant_charged) for t in tenant_data_list) sa_data_list.append(SuperAdminData( super_admin_id=sa.id, username=sa.username, nickname=sa.nickname, remark=sa.remark, tenant_count=len(tenant_data_list), total_consumption=f"{sa_total_consumption:.4f}", total_tenant_charged=f"{sa_total_charged:.4f}", tenants=tenant_data_list, )) # 9. 构建平台汇总 total_super_admins = len(sa_data_list) total_tenants = sum(sa.tenant_count for sa in sa_data_list) total_users = sum(t.user_count for sa in sa_data_list for t in sa.tenants) total_consumption = sum(float(t.total_consumption) for sa in sa_data_list for t in sa.tenants) total_tenant_charged = sum(float(t.total_tenant_charged) for sa in sa_data_list for t in sa.tenants) total_balance = sum(float(t.balance) for sa in sa_data_list for t in sa.tenants) return DashboardResponse( overview=Overview( total_super_admins=total_super_admins, total_tenants=total_tenants, total_users=total_users, total_consumption=f"{total_consumption:.4f}", total_tenant_charged=f"{total_tenant_charged:.4f}", total_balance=f"{total_balance:.4f}", ), super_admins=sa_data_list, start_date=start_date, end_date=end_date, ) async def get_consumption_details( db: AsyncSession, start_date: str | None = None, end_date: str | None = None, super_admin_name: str | None = None, tenant_name: str | None = None, page: int = 1, page_size: int = 20, ) -> ConsumptionDetailResponse: """ 查询原始消费明细表,每条记录一行,不做任何聚合 用于对账场景 """ # 1. 查出所有 SA、关联关系、租户 sa_stmt = select(SuperAdmin) if super_admin_name: sa_stmt = sa_stmt.where( (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) | (SuperAdmin.username.ilike(f"%{super_admin_name}%")) ) sa_result = await db.execute(sa_stmt) all_sas = sa_result.scalars().all() sa_name_map = {sa.id: sa.nickname or sa.username for sa in all_sas} sat_stmt = select(SuperAdminTenant) if super_admin_name: sa_ids = [sa.id for sa in all_sas] if sa_ids: sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids)) sat_result = await db.execute(sat_stmt) sa_tenant_map: dict[int, list[int]] = {} all_tenant_ids: set[int] = set() for row in sat_result.scalars().all(): sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id) all_tenant_ids.add(row.tenant_id) # 查询所有租户(tenant_name 筛选时独立过滤,不受 SA 关联限制) tenant_stmt = select(Tenant) tenant_conditions = [] if tenant_name: tenant_conditions.append( (Tenant.company_name.ilike(f"%{tenant_name}%")) | (Tenant.subdomain.ilike(f"%{tenant_name}%")) ) if tenant_conditions: tenant_stmt = tenant_stmt.where(*tenant_conditions) tenant_result = await db.execute(tenant_stmt) tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()} # 2. 构建筛选条件(复用) conditions = [] start_dt = date.fromisoformat(start_date) if start_date else None end_dt = date.fromisoformat(end_date) if end_date else None if start_dt: conditions.append(cast(UserConsumptionDetail.consumption_date, Date) >= start_dt) if end_dt: conditions.append(cast(UserConsumptionDetail.consumption_date, Date) <= end_dt) if super_admin_name: matched_tenant_ids = set() for sid in [sa.id for sa in all_sas]: matched_tenant_ids.update(sa_tenant_map.get(sid, [])) if matched_tenant_ids: conditions.append(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids)) if tenant_name: filtered_tenant_ids = set(tenant_map.keys()) if filtered_tenant_ids: conditions.append(UserConsumptionDetail.tenant_id.in_(filtered_tenant_ids)) # 3. 查总数 count_stmt = select(func.count(UserConsumptionDetail.id)).select_from(UserConsumptionDetail) if conditions: count_stmt = count_stmt.where(*conditions) total = await db.scalar(count_stmt) or 0 # 4. 分页查询 stmt = select(UserConsumptionDetail) if conditions: stmt = stmt.where(*conditions) stmt = stmt.order_by(UserConsumptionDetail.consumption_date.desc()) stmt = stmt.offset((page - 1) * page_size).limit(page_size) result = await db.execute(stmt) records = result.scalars().all() # 3. 拼装返回 items: list[ConsumptionDetailRecord] = [] for c in records: tenant = tenant_map.get(c.tenant_id) t_name = tenant.company_name or tenant.subdomain if tenant else "-" t_discount = float(c.tenant_discount or 1) sa_discount = float(c.super_admin_discount or 1) items.append(ConsumptionDetailRecord( user_id=c.user_id, user_name=c.username or c.user_id, tenant_name=t_name, order_no=c.order_no or "", model_code=c.model_code, consumption_date=str(c.consumption_date) if c.consumption_date else "", tenant_consumed=f"{float(c.tenant_actual_total or 0):.4f}", user_discount=f"{float(c.user_discount or 1):.4f}", user_consumed=f"{float(c.user_actual_total or 0):.4f}", tenant_discount=f"{t_discount:.4f}", tenant_actual_price=f"{float(c.tenant_actual_price or 0):.4f}", super_admin_discount=f"{sa_discount:.4f}", super_admin_actual_price=f"{float(c.super_admin_actual_price or 0):.4f}", )) return ConsumptionDetailResponse(total=total, records=items) async def get_daily_stats( db: AsyncSession, start_date: str, end_date: str, super_admin_name: str | None = None, tenant_name: str | None = None, ) -> DailyStatsResponse: """ 查询按日聚合的消费统计:超级管理员 → 租户 → 每日消费金额 按 consumption_date 的日期部分分组,汇总每个租户每天的消费和收取金额 """ # 1. 查出所有 SA、关联关系、租户 sa_stmt = select(SuperAdmin) if super_admin_name: sa_stmt = sa_stmt.where( (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) | (SuperAdmin.username.ilike(f"%{super_admin_name}%")) ) sa_result = await db.execute(sa_stmt) all_sas = sa_result.scalars().all() sa_name_map = {sa.id: sa.nickname or sa.username for sa in all_sas} sat_stmt = select(SuperAdminTenant) if super_admin_name: sa_ids = [sa.id for sa in all_sas] if sa_ids: sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids)) sat_result = await db.execute(sat_stmt) sa_tenant_map: dict[int, list[int]] = {} all_tenant_ids: set[int] = set() for row in sat_result.scalars().all(): sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id) all_tenant_ids.add(row.tenant_id) tenant_stmt = select(Tenant).where(Tenant.id.in_(all_tenant_ids)) if tenant_name: tenant_stmt = tenant_stmt.where( (Tenant.company_name.ilike(f"%{tenant_name}%")) | (Tenant.subdomain.ilike(f"%{tenant_name}%")) ) tenant_result = await db.execute(tenant_stmt) tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()} filtered_tenant_ids = set(tenant_map.keys()) # 2. 按日 + 租户聚合消费明细 start_dt = date.fromisoformat(start_date) end_dt = date.fromisoformat(end_date) stmt = select( cast(UserConsumptionDetail.consumption_date, Date).label("stat_date"), UserConsumptionDetail.tenant_id, func.sum(UserConsumptionDetail.user_actual_total).label("total_consumption"), func.sum(UserConsumptionDetail.tenant_actual_total).label("total_charged"), ).where( cast(UserConsumptionDetail.consumption_date, Date) >= start_dt, cast(UserConsumptionDetail.consumption_date, Date) <= end_dt, ) if super_admin_name: matched_tenant_ids = set() for sid in [sa.id for sa in all_sas]: matched_tenant_ids.update(sa_tenant_map.get(sid, [])) if matched_tenant_ids: stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids)) if tenant_name and filtered_tenant_ids: stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(filtered_tenant_ids)) stmt = stmt.group_by("stat_date", UserConsumptionDetail.tenant_id).order_by("stat_date") result = await db.execute(stmt) rows = result.fetchall() # 3. 组装数据:按 SA → 租户 → 日期分组 # daily[sa_id][tenant_id] = {date: {consumption, charged}} daily: dict[int, dict[int, dict[str, dict]]] = {} for row in rows: stat_date = str(row.stat_date) tid = row.tenant_id # 找到该租户所属的 SA sa_id = None for sid, tids in sa_tenant_map.items(): if tid in tids: sa_id = sid break if sa_id is None: continue daily.setdefault(sa_id, {}) daily[sa_id].setdefault(tid, {}) daily[sa_id][tid][stat_date] = { "consumption": f"{float(row.total_consumption):.4f}", "charged": f"{float(row.total_charged):.4f}", } # 4. 构建返回 sa_stats: list[DailySAStat] = [] for sa in all_sas: sa_id = sa.id tenant_ids = sa_tenant_map.get(sa_id, []) # 收集所有日期 all_dates: set[str] = set() for tid in tenant_ids: if tid in daily.get(sa_id, {}): all_dates.update(daily[sa_id][tid].keys()) sorted_dates = sorted(all_dates) tenant_stats: list[DailyTenantStat] = [] for tid in tenant_ids: tenant = tenant_map.get(tid) if not tenant: continue tname = tenant.company_name or tenant.subdomain for d in sorted_dates: stat = daily.get(sa_id, {}).get(tid, {}).get(d) tenant_stats.append(DailyTenantStat( tenant_name=tname, date=d, consumption=stat["consumption"] if stat else "0.0000", charged=stat["charged"] if stat else "0.0000", )) sa_stats.append(DailySAStat( sa_name=sa.remark or sa.username, date="", consumption="0", charged="0", tenants=tenant_stats, )) return DailyStatsResponse(sa_stats=sa_stats)