monitoring.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. from datetime import date
  2. from sqlalchemy import select, func, cast, Date
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from app.models.monitoring import (
  5. SuperAdmin,
  6. SuperAdminTenant,
  7. Tenant,
  8. Model,
  9. UserConsumptionDetail,
  10. )
  11. from app.schemas.monitoring import (
  12. DashboardResponse,
  13. Overview,
  14. SuperAdminData,
  15. TenantData,
  16. UserConsumption,
  17. ConsumptionRecord,
  18. ConsumptionDetailResponse,
  19. ConsumptionDetailRecord,
  20. DailyStatsResponse,
  21. DailySAStat,
  22. DailyTenantStat,
  23. )
  24. async def get_dashboard(
  25. db: AsyncSession,
  26. start_date: str | None = None,
  27. end_date: str | None = None,
  28. super_admin_id: int | None = None,
  29. super_admin_name: str | None = None,
  30. ) -> DashboardResponse:
  31. """
  32. 获取监控大屏数据
  33. 层级结构:平台 → 超级管理员 → 租户 → 用户
  34. 每个用户节点包含消费记录流水(扁平列表)。
  35. """
  36. # 1. 查询所有超级管理员(按ID或名称过滤)
  37. sa_stmt = select(SuperAdmin)
  38. if super_admin_id is not None:
  39. sa_stmt = sa_stmt.where(SuperAdmin.id == super_admin_id)
  40. if super_admin_name:
  41. sa_stmt = sa_stmt.where(
  42. (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) |
  43. (SuperAdmin.username.ilike(f"%{super_admin_name}%"))
  44. )
  45. sa_result = await db.execute(sa_stmt)
  46. super_admins = sa_result.scalars().all()
  47. # 2. 查询超级管理员-租户关联关系
  48. sat_stmt = select(SuperAdminTenant)
  49. if super_admin_id is not None:
  50. sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id == super_admin_id)
  51. if super_admin_name:
  52. sa_ids = [sa.id for sa in super_admins]
  53. if sa_ids:
  54. sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids))
  55. sat_result = await db.execute(sat_stmt)
  56. sa_tenant_map: dict[int, list[int]] = {}
  57. all_tenant_ids: set[int] = set()
  58. for row in sat_result.scalars().all():
  59. sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id)
  60. all_tenant_ids.add(row.tenant_id)
  61. # 3. 查询所有租户信息
  62. tenant_result = await db.execute(select(Tenant).where(Tenant.id.in_(all_tenant_ids)))
  63. tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()}
  64. # 4. 查询所有模型信息
  65. model_result = await db.execute(select(Model))
  66. model_map: dict[str, Model] = {m.model_code: m for m in model_result.scalars().all()}
  67. # 5. 查询消费明细(按时间范围过滤)
  68. start_dt = date.fromisoformat(start_date) if start_date else None
  69. end_dt = date.fromisoformat(end_date) if end_date else None
  70. base_stmt = select(UserConsumptionDetail)
  71. if start_dt:
  72. base_stmt = base_stmt.where(
  73. cast(UserConsumptionDetail.consumption_date, Date) >= start_dt
  74. )
  75. if end_dt:
  76. base_stmt = base_stmt.where(
  77. cast(UserConsumptionDetail.consumption_date, Date) <= end_dt
  78. )
  79. if super_admin_id is not None:
  80. matched_tenant_ids = sa_tenant_map.get(super_admin_id, [])
  81. if matched_tenant_ids:
  82. base_stmt = base_stmt.where(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids))
  83. consumption_result = await db.execute(base_stmt.order_by(UserConsumptionDetail.created_at))
  84. consumptions = consumption_result.scalars().all()
  85. # 7. 按 租户 → 用户 聚合,每条消费记录作为独立条目
  86. # 结构: {tenant_id: {user_id: [consumption_records]}}
  87. agg: dict[int, dict[str, list]] = {}
  88. for c in consumptions:
  89. agg.setdefault(c.tenant_id, {})
  90. agg[c.tenant_id].setdefault(c.user_id, [])
  91. tenant = tenant_map.get(c.tenant_id)
  92. model_info = model_map.get(c.model_code)
  93. user_discount_val = float(c.user_discount or 1)
  94. sa_discount_val = float(c.super_admin_discount or 1)
  95. agg[c.tenant_id][c.user_id].append(ConsumptionRecord(
  96. user_id=c.user_id,
  97. username=c.user_id,
  98. tenant_name=tenant.company_name if tenant else None,
  99. order_no=c.order_no or "",
  100. model_name=model_info.model_name if model_info else c.model_code,
  101. model_code=c.model_code,
  102. amount=f"{float(c.user_actual_total or 0):.4f}",
  103. created_at=str(c.created_at) if c.created_at else "",
  104. invoiced=bool(c.invoiced),
  105. user_discount=f"{user_discount_val:.4f}",
  106. user_actual_price=f"{float(c.user_actual_price or 0):.4f}",
  107. tenant_discount=f"{float(c.tenant_discount or 1):.4f}",
  108. tenant_actual_price=f"{float(c.tenant_actual_price or 0):.4f}",
  109. super_admin_discount=f"{sa_discount_val:.4f}",
  110. super_admin_actual_price=f"{float(c.super_admin_actual_price or 0):.4f}",
  111. ))
  112. # 8. 构建超级管理员数据
  113. sa_data_list: list[SuperAdminData] = []
  114. for sa in super_admins:
  115. tenant_ids = sa_tenant_map.get(sa.id, [])
  116. tenant_data_list: list[TenantData] = []
  117. for tid in tenant_ids:
  118. tenant = tenant_map.get(tid)
  119. if not tenant:
  120. continue
  121. users_map = agg.get(tid, {})
  122. user_list: list[UserConsumption] = []
  123. for uid, records in users_map.items():
  124. user_list.append(UserConsumption(
  125. user_id=uid,
  126. username=uid,
  127. nickname=None,
  128. total_consumption=f"{sum(float(r.amount) for r in records):.4f}",
  129. tenant_actual_total=f"{sum(float(r.tenant_actual_price) for r in records):.4f}",
  130. tenant_name=tenant.company_name if tenant else None,
  131. consumption_records=records,
  132. ))
  133. tenant_total_consumption = sum(float(r.amount) for records in users_map.values() for r in records)
  134. tenant_total_charged = sum(float(r.tenant_actual_price) for records in users_map.values() for r in records)
  135. tenant_data_list.append(TenantData(
  136. tenant_id=tid,
  137. company_name=tenant.company_name,
  138. subdomain=tenant.subdomain,
  139. total_consumption=f"{tenant_total_consumption:.4f}",
  140. total_tenant_charged=f"{tenant_total_charged:.4f}",
  141. balance=f"{float(tenant.balance or 0):.4f}",
  142. user_count=len(users_map),
  143. users=user_list,
  144. ))
  145. sa_total_consumption = sum(float(t.total_consumption) for t in tenant_data_list)
  146. sa_total_charged = sum(float(t.total_tenant_charged) for t in tenant_data_list)
  147. sa_data_list.append(SuperAdminData(
  148. super_admin_id=sa.id,
  149. username=sa.username,
  150. nickname=sa.nickname,
  151. remark=sa.remark,
  152. tenant_count=len(tenant_data_list),
  153. total_consumption=f"{sa_total_consumption:.4f}",
  154. total_tenant_charged=f"{sa_total_charged:.4f}",
  155. tenants=tenant_data_list,
  156. ))
  157. # 9. 构建平台汇总
  158. total_super_admins = len(sa_data_list)
  159. total_tenants = sum(sa.tenant_count for sa in sa_data_list)
  160. total_users = sum(t.user_count for sa in sa_data_list for t in sa.tenants)
  161. total_consumption = sum(float(t.total_consumption) for sa in sa_data_list for t in sa.tenants)
  162. total_tenant_charged = sum(float(t.total_tenant_charged) for sa in sa_data_list for t in sa.tenants)
  163. total_balance = sum(float(t.balance) for sa in sa_data_list for t in sa.tenants)
  164. return DashboardResponse(
  165. overview=Overview(
  166. total_super_admins=total_super_admins,
  167. total_tenants=total_tenants,
  168. total_users=total_users,
  169. total_consumption=f"{total_consumption:.4f}",
  170. total_tenant_charged=f"{total_tenant_charged:.4f}",
  171. total_balance=f"{total_balance:.4f}",
  172. ),
  173. super_admins=sa_data_list,
  174. start_date=start_date,
  175. end_date=end_date,
  176. )
  177. async def get_consumption_details(
  178. db: AsyncSession,
  179. start_date: str | None = None,
  180. end_date: str | None = None,
  181. super_admin_name: str | None = None,
  182. tenant_name: str | None = None,
  183. ) -> ConsumptionDetailResponse:
  184. """
  185. 查询原始消费明细表,每条记录一行,不做任何聚合
  186. 用于对账场景
  187. """
  188. # 1. 查出所有 SA、关联关系、租户
  189. sa_stmt = select(SuperAdmin)
  190. if super_admin_name:
  191. sa_stmt = sa_stmt.where(
  192. (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) |
  193. (SuperAdmin.username.ilike(f"%{super_admin_name}%"))
  194. )
  195. sa_result = await db.execute(sa_stmt)
  196. all_sas = sa_result.scalars().all()
  197. sa_name_map = {sa.id: sa.nickname or sa.username for sa in all_sas}
  198. sat_stmt = select(SuperAdminTenant)
  199. if super_admin_name:
  200. sa_ids = [sa.id for sa in all_sas]
  201. if sa_ids:
  202. sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids))
  203. sat_result = await db.execute(sat_stmt)
  204. sa_tenant_map: dict[int, list[int]] = {}
  205. all_tenant_ids: set[int] = set()
  206. for row in sat_result.scalars().all():
  207. sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id)
  208. all_tenant_ids.add(row.tenant_id)
  209. # 查询所有租户(tenant_name 筛选时独立过滤,不受 SA 关联限制)
  210. tenant_stmt = select(Tenant)
  211. tenant_conditions = []
  212. if tenant_name:
  213. tenant_conditions.append(
  214. (Tenant.company_name.ilike(f"%{tenant_name}%")) |
  215. (Tenant.subdomain.ilike(f"%{tenant_name}%"))
  216. )
  217. if tenant_conditions:
  218. tenant_stmt = tenant_stmt.where(*tenant_conditions)
  219. tenant_result = await db.execute(tenant_stmt)
  220. tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()}
  221. # 2. 查原始消费明细
  222. stmt = select(UserConsumptionDetail)
  223. start_dt = date.fromisoformat(start_date) if start_date else None
  224. end_dt = date.fromisoformat(end_date) if end_date else None
  225. if start_dt:
  226. stmt = stmt.where(cast(UserConsumptionDetail.consumption_date, Date) >= start_dt)
  227. if end_dt:
  228. stmt = stmt.where(cast(UserConsumptionDetail.consumption_date, Date) <= end_dt)
  229. if super_admin_name:
  230. matched_tenant_ids = set()
  231. for sid in [sa.id for sa in all_sas]:
  232. matched_tenant_ids.update(sa_tenant_map.get(sid, []))
  233. if matched_tenant_ids:
  234. stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids))
  235. if tenant_name:
  236. filtered_tenant_ids = set(tenant_map.keys())
  237. if filtered_tenant_ids:
  238. stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(filtered_tenant_ids))
  239. stmt = stmt.order_by(UserConsumptionDetail.consumption_date.desc())
  240. result = await db.execute(stmt)
  241. records = result.scalars().all()
  242. # 3. 拼装返回
  243. items: list[ConsumptionDetailRecord] = []
  244. for c in records:
  245. tenant = tenant_map.get(c.tenant_id)
  246. t_name = tenant.company_name or tenant.subdomain if tenant else "-"
  247. t_discount = float(c.tenant_discount or 1)
  248. sa_discount = float(c.super_admin_discount or 1)
  249. items.append(ConsumptionDetailRecord(
  250. user_id=c.user_id,
  251. user_name=c.username or c.user_id,
  252. tenant_name=t_name,
  253. order_no=c.order_no or "",
  254. model_code=c.model_code,
  255. consumption_date=str(c.consumption_date) if c.consumption_date else "",
  256. tenant_consumed=f"{float(c.tenant_actual_total or 0):.4f}",
  257. user_discount=f"{float(c.user_discount or 1):.4f}",
  258. user_consumed=f"{float(c.user_actual_total or 0):.4f}",
  259. tenant_discount=f"{t_discount:.4f}",
  260. tenant_actual_price=f"{float(c.tenant_actual_price or 0):.4f}",
  261. super_admin_discount=f"{sa_discount:.4f}",
  262. super_admin_actual_price=f"{float(c.super_admin_actual_price or 0):.4f}",
  263. ))
  264. return ConsumptionDetailResponse(total=len(items), records=items)
  265. async def get_daily_stats(
  266. db: AsyncSession,
  267. start_date: str,
  268. end_date: str,
  269. super_admin_name: str | None = None,
  270. tenant_name: str | None = None,
  271. ) -> DailyStatsResponse:
  272. """
  273. 查询按日聚合的消费统计:超级管理员 → 租户 → 每日消费金额
  274. 按 consumption_date 的日期部分分组,汇总每个租户每天的消费和收取金额
  275. """
  276. # 1. 查出所有 SA、关联关系、租户
  277. sa_stmt = select(SuperAdmin)
  278. if super_admin_name:
  279. sa_stmt = sa_stmt.where(
  280. (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) |
  281. (SuperAdmin.username.ilike(f"%{super_admin_name}%"))
  282. )
  283. sa_result = await db.execute(sa_stmt)
  284. all_sas = sa_result.scalars().all()
  285. sa_name_map = {sa.id: sa.nickname or sa.username for sa in all_sas}
  286. sat_stmt = select(SuperAdminTenant)
  287. if super_admin_name:
  288. sa_ids = [sa.id for sa in all_sas]
  289. if sa_ids:
  290. sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids))
  291. sat_result = await db.execute(sat_stmt)
  292. sa_tenant_map: dict[int, list[int]] = {}
  293. all_tenant_ids: set[int] = set()
  294. for row in sat_result.scalars().all():
  295. sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id)
  296. all_tenant_ids.add(row.tenant_id)
  297. tenant_stmt = select(Tenant).where(Tenant.id.in_(all_tenant_ids))
  298. if tenant_name:
  299. tenant_stmt = tenant_stmt.where(
  300. (Tenant.company_name.ilike(f"%{tenant_name}%")) |
  301. (Tenant.subdomain.ilike(f"%{tenant_name}%"))
  302. )
  303. tenant_result = await db.execute(tenant_stmt)
  304. tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()}
  305. filtered_tenant_ids = set(tenant_map.keys())
  306. # 2. 按日 + 租户聚合消费明细
  307. start_dt = date.fromisoformat(start_date)
  308. end_dt = date.fromisoformat(end_date)
  309. stmt = select(
  310. cast(UserConsumptionDetail.consumption_date, Date).label("stat_date"),
  311. UserConsumptionDetail.tenant_id,
  312. func.sum(UserConsumptionDetail.user_actual_total).label("total_consumption"),
  313. func.sum(UserConsumptionDetail.tenant_actual_total).label("total_charged"),
  314. ).where(
  315. cast(UserConsumptionDetail.consumption_date, Date) >= start_dt,
  316. cast(UserConsumptionDetail.consumption_date, Date) <= end_dt,
  317. )
  318. if super_admin_name:
  319. matched_tenant_ids = set()
  320. for sid in [sa.id for sa in all_sas]:
  321. matched_tenant_ids.update(sa_tenant_map.get(sid, []))
  322. if matched_tenant_ids:
  323. stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids))
  324. if tenant_name and filtered_tenant_ids:
  325. stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(filtered_tenant_ids))
  326. stmt = stmt.group_by("stat_date", UserConsumptionDetail.tenant_id).order_by("stat_date")
  327. result = await db.execute(stmt)
  328. rows = result.fetchall()
  329. # 3. 组装数据:按 SA → 租户 → 日期分组
  330. # daily[sa_id][tenant_id] = {date: {consumption, charged}}
  331. daily: dict[int, dict[int, dict[str, dict]]] = {}
  332. for row in rows:
  333. stat_date = str(row.stat_date)
  334. tid = row.tenant_id
  335. # 找到该租户所属的 SA
  336. sa_id = None
  337. for sid, tids in sa_tenant_map.items():
  338. if tid in tids:
  339. sa_id = sid
  340. break
  341. if sa_id is None:
  342. continue
  343. daily.setdefault(sa_id, {})
  344. daily[sa_id].setdefault(tid, {})
  345. daily[sa_id][tid][stat_date] = {
  346. "consumption": f"{float(row.total_consumption):.4f}",
  347. "charged": f"{float(row.total_charged):.4f}",
  348. }
  349. # 4. 构建返回
  350. sa_stats: list[DailySAStat] = []
  351. for sa in all_sas:
  352. sa_id = sa.id
  353. tenant_ids = sa_tenant_map.get(sa_id, [])
  354. # 收集所有日期
  355. all_dates: set[str] = set()
  356. for tid in tenant_ids:
  357. if tid in daily.get(sa_id, {}):
  358. all_dates.update(daily[sa_id][tid].keys())
  359. sorted_dates = sorted(all_dates)
  360. tenant_stats: list[DailyTenantStat] = []
  361. for tid in tenant_ids:
  362. tenant = tenant_map.get(tid)
  363. if not tenant:
  364. continue
  365. tname = tenant.company_name or tenant.subdomain
  366. for d in sorted_dates:
  367. stat = daily.get(sa_id, {}).get(tid, {}).get(d)
  368. tenant_stats.append(DailyTenantStat(
  369. tenant_name=tname,
  370. date=d,
  371. consumption=stat["consumption"] if stat else "0.0000",
  372. charged=stat["charged"] if stat else "0.0000",
  373. ))
  374. sa_stats.append(DailySAStat(
  375. sa_name=sa.remark or sa.username,
  376. date="",
  377. consumption="0",
  378. charged="0",
  379. tenants=tenant_stats,
  380. ))
  381. return DailyStatsResponse(sa_stats=sa_stats)