monitoring.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. from datetime import date
  2. from sqlalchemy import select, func, cast, Date
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from app.models.monitoring import (
  5. SuperAdmin,
  6. SuperAdminTenant,
  7. Tenant,
  8. Model,
  9. UserConsumptionDetail,
  10. )
  11. from app.schemas.monitoring import (
  12. DashboardResponse,
  13. Overview,
  14. SuperAdminData,
  15. TenantData,
  16. UserConsumption,
  17. ConsumptionRecord,
  18. ConsumptionDetailResponse,
  19. ConsumptionDetailRecord,
  20. DailyStatsResponse,
  21. DailySAStat,
  22. DailyTenantStat,
  23. )
  24. async def get_dashboard(
  25. db: AsyncSession,
  26. start_date: str | None = None,
  27. end_date: str | None = None,
  28. super_admin_id: int | None = None,
  29. super_admin_name: str | None = None,
  30. ) -> DashboardResponse:
  31. """
  32. 获取监控大屏数据
  33. 层级结构:平台 → 超级管理员 → 租户 → 用户
  34. 每个用户节点包含消费记录流水(扁平列表)。
  35. """
  36. # 1. 查询所有超级管理员(按ID或名称过滤)
  37. sa_stmt = select(SuperAdmin)
  38. if super_admin_id is not None:
  39. sa_stmt = sa_stmt.where(SuperAdmin.id == super_admin_id)
  40. if super_admin_name:
  41. sa_stmt = sa_stmt.where(
  42. (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) |
  43. (SuperAdmin.username.ilike(f"%{super_admin_name}%"))
  44. )
  45. sa_result = await db.execute(sa_stmt)
  46. super_admins = sa_result.scalars().all()
  47. # 2. 查询超级管理员-租户关联关系
  48. sat_stmt = select(SuperAdminTenant)
  49. if super_admin_id is not None:
  50. sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id == super_admin_id)
  51. if super_admin_name:
  52. sa_ids = [sa.id for sa in super_admins]
  53. if sa_ids:
  54. sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids))
  55. sat_result = await db.execute(sat_stmt)
  56. sa_tenant_map: dict[int, list[int]] = {}
  57. all_tenant_ids: set[int] = set()
  58. for row in sat_result.scalars().all():
  59. sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id)
  60. all_tenant_ids.add(row.tenant_id)
  61. # 3. 查询所有租户信息
  62. tenant_result = await db.execute(select(Tenant).where(Tenant.id.in_(all_tenant_ids)))
  63. tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()}
  64. # 4. 查询所有模型信息
  65. model_result = await db.execute(select(Model))
  66. model_map: dict[str, Model] = {m.model_code: m for m in model_result.scalars().all()}
  67. # 5. 查询消费明细(按时间范围过滤)
  68. start_dt = date.fromisoformat(start_date) if start_date else None
  69. end_dt = date.fromisoformat(end_date) if end_date else None
  70. base_stmt = select(UserConsumptionDetail)
  71. if start_dt:
  72. base_stmt = base_stmt.where(
  73. cast(UserConsumptionDetail.consumption_date, Date) >= start_dt
  74. )
  75. if end_dt:
  76. base_stmt = base_stmt.where(
  77. cast(UserConsumptionDetail.consumption_date, Date) <= end_dt
  78. )
  79. if super_admin_id is not None:
  80. matched_tenant_ids = sa_tenant_map.get(super_admin_id, [])
  81. if matched_tenant_ids:
  82. base_stmt = base_stmt.where(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids))
  83. consumption_result = await db.execute(base_stmt.order_by(UserConsumptionDetail.created_at))
  84. consumptions = consumption_result.scalars().all()
  85. # 7. 按 租户 → 用户 聚合,每条消费记录作为独立条目
  86. # 结构: {tenant_id: {user_id: [consumption_records]}}
  87. agg: dict[int, dict[str, list]] = {}
  88. for c in consumptions:
  89. agg.setdefault(c.tenant_id, {})
  90. agg[c.tenant_id].setdefault(c.user_id, [])
  91. tenant = tenant_map.get(c.tenant_id)
  92. model_info = model_map.get(c.model_code)
  93. user_discount_val = float(c.user_discount or 1)
  94. sa_discount_val = float(c.super_admin_discount or 1)
  95. agg[c.tenant_id][c.user_id].append(ConsumptionRecord(
  96. user_id=c.user_id,
  97. username=c.user_id,
  98. tenant_name=tenant.company_name if tenant else None,
  99. order_no=c.order_no or "",
  100. model_name=model_info.model_name if model_info else c.model_code,
  101. model_code=c.model_code,
  102. amount=f"{float(c.user_actual_total or 0):.4f}",
  103. created_at=str(c.created_at) if c.created_at else "",
  104. invoiced=bool(c.invoiced),
  105. user_discount=f"{user_discount_val:.4f}",
  106. user_actual_price=f"{float(c.user_actual_price or 0):.4f}",
  107. tenant_discount=f"{float(c.tenant_discount or 1):.4f}",
  108. tenant_actual_price=f"{float(c.tenant_actual_price or 0):.4f}",
  109. super_admin_discount=f"{sa_discount_val:.4f}",
  110. super_admin_actual_price=f"{float(c.super_admin_actual_price or 0):.4f}",
  111. ))
  112. # 8. 构建超级管理员数据
  113. sa_data_list: list[SuperAdminData] = []
  114. for sa in super_admins:
  115. tenant_ids = sa_tenant_map.get(sa.id, [])
  116. tenant_data_list: list[TenantData] = []
  117. for tid in tenant_ids:
  118. tenant = tenant_map.get(tid)
  119. if not tenant:
  120. continue
  121. users_map = agg.get(tid, {})
  122. user_list: list[UserConsumption] = []
  123. for uid, records in users_map.items():
  124. user_list.append(UserConsumption(
  125. user_id=uid,
  126. username=uid,
  127. nickname=None,
  128. total_consumption=f"{sum(float(r.amount) for r in records):.4f}",
  129. tenant_actual_total=f"{sum(float(r.tenant_actual_price) for r in records):.4f}",
  130. tenant_name=tenant.company_name if tenant else None,
  131. consumption_records=records,
  132. ))
  133. tenant_total_consumption = sum(float(r.amount) for records in users_map.values() for r in records)
  134. tenant_total_charged = sum(float(r.tenant_actual_price) for records in users_map.values() for r in records)
  135. tenant_data_list.append(TenantData(
  136. tenant_id=tid,
  137. company_name=tenant.company_name,
  138. subdomain=tenant.subdomain,
  139. total_consumption=f"{tenant_total_consumption:.4f}",
  140. total_tenant_charged=f"{tenant_total_charged:.4f}",
  141. balance=f"{float(tenant.balance or 0):.4f}",
  142. user_count=len(users_map),
  143. users=user_list,
  144. ))
  145. sa_total_consumption = sum(float(t.total_consumption) for t in tenant_data_list)
  146. sa_total_charged = sum(float(t.total_tenant_charged) for t in tenant_data_list)
  147. sa_data_list.append(SuperAdminData(
  148. super_admin_id=sa.id,
  149. username=sa.username,
  150. nickname=sa.nickname,
  151. remark=sa.remark,
  152. tenant_count=len(tenant_data_list),
  153. total_consumption=f"{sa_total_consumption:.4f}",
  154. total_tenant_charged=f"{sa_total_charged:.4f}",
  155. tenants=tenant_data_list,
  156. ))
  157. # 9. 构建平台汇总
  158. total_super_admins = len(sa_data_list)
  159. total_tenants = sum(sa.tenant_count for sa in sa_data_list)
  160. total_users = sum(t.user_count for sa in sa_data_list for t in sa.tenants)
  161. total_consumption = sum(float(t.total_consumption) for sa in sa_data_list for t in sa.tenants)
  162. total_tenant_charged = sum(float(t.total_tenant_charged) for sa in sa_data_list for t in sa.tenants)
  163. total_balance = sum(float(t.balance) for sa in sa_data_list for t in sa.tenants)
  164. return DashboardResponse(
  165. overview=Overview(
  166. total_super_admins=total_super_admins,
  167. total_tenants=total_tenants,
  168. total_users=total_users,
  169. total_consumption=f"{total_consumption:.4f}",
  170. total_tenant_charged=f"{total_tenant_charged:.4f}",
  171. total_balance=f"{total_balance:.4f}",
  172. ),
  173. super_admins=sa_data_list,
  174. start_date=start_date,
  175. end_date=end_date,
  176. )
  177. async def get_consumption_details(
  178. db: AsyncSession,
  179. start_date: str | None = None,
  180. end_date: str | None = None,
  181. super_admin_name: str | None = None,
  182. tenant_name: str | None = None,
  183. page: int = 1,
  184. page_size: int = 20,
  185. ) -> ConsumptionDetailResponse:
  186. """
  187. 查询原始消费明细表,每条记录一行,不做任何聚合
  188. 用于对账场景
  189. """
  190. # 1. 查出所有 SA、关联关系、租户
  191. sa_stmt = select(SuperAdmin)
  192. if super_admin_name:
  193. sa_stmt = sa_stmt.where(
  194. (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) |
  195. (SuperAdmin.username.ilike(f"%{super_admin_name}%"))
  196. )
  197. sa_result = await db.execute(sa_stmt)
  198. all_sas = sa_result.scalars().all()
  199. sa_name_map = {sa.id: sa.nickname or sa.username for sa in all_sas}
  200. sat_stmt = select(SuperAdminTenant)
  201. if super_admin_name:
  202. sa_ids = [sa.id for sa in all_sas]
  203. if sa_ids:
  204. sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids))
  205. sat_result = await db.execute(sat_stmt)
  206. sa_tenant_map: dict[int, list[int]] = {}
  207. all_tenant_ids: set[int] = set()
  208. for row in sat_result.scalars().all():
  209. sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id)
  210. all_tenant_ids.add(row.tenant_id)
  211. # 查询所有租户(tenant_name 筛选时独立过滤,不受 SA 关联限制)
  212. tenant_stmt = select(Tenant)
  213. tenant_conditions = []
  214. if tenant_name:
  215. tenant_conditions.append(
  216. (Tenant.company_name.ilike(f"%{tenant_name}%")) |
  217. (Tenant.subdomain.ilike(f"%{tenant_name}%"))
  218. )
  219. if tenant_conditions:
  220. tenant_stmt = tenant_stmt.where(*tenant_conditions)
  221. tenant_result = await db.execute(tenant_stmt)
  222. tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()}
  223. # 2. 构建筛选条件(复用)
  224. conditions = []
  225. start_dt = date.fromisoformat(start_date) if start_date else None
  226. end_dt = date.fromisoformat(end_date) if end_date else None
  227. if start_dt:
  228. conditions.append(cast(UserConsumptionDetail.consumption_date, Date) >= start_dt)
  229. if end_dt:
  230. conditions.append(cast(UserConsumptionDetail.consumption_date, Date) <= end_dt)
  231. if super_admin_name:
  232. matched_tenant_ids = set()
  233. for sid in [sa.id for sa in all_sas]:
  234. matched_tenant_ids.update(sa_tenant_map.get(sid, []))
  235. if matched_tenant_ids:
  236. conditions.append(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids))
  237. if tenant_name:
  238. filtered_tenant_ids = set(tenant_map.keys())
  239. if filtered_tenant_ids:
  240. conditions.append(UserConsumptionDetail.tenant_id.in_(filtered_tenant_ids))
  241. # 3. 查总数
  242. count_stmt = select(func.count(UserConsumptionDetail.id)).select_from(UserConsumptionDetail)
  243. if conditions:
  244. count_stmt = count_stmt.where(*conditions)
  245. total = await db.scalar(count_stmt) or 0
  246. # 4. 分页查询
  247. stmt = select(UserConsumptionDetail)
  248. if conditions:
  249. stmt = stmt.where(*conditions)
  250. stmt = stmt.order_by(UserConsumptionDetail.consumption_date.desc())
  251. stmt = stmt.offset((page - 1) * page_size).limit(page_size)
  252. result = await db.execute(stmt)
  253. records = result.scalars().all()
  254. # 3. 拼装返回
  255. items: list[ConsumptionDetailRecord] = []
  256. for c in records:
  257. tenant = tenant_map.get(c.tenant_id)
  258. t_name = tenant.company_name or tenant.subdomain if tenant else "-"
  259. t_discount = float(c.tenant_discount or 1)
  260. sa_discount = float(c.super_admin_discount or 1)
  261. items.append(ConsumptionDetailRecord(
  262. user_id=c.user_id,
  263. user_name=c.username or c.user_id,
  264. tenant_name=t_name,
  265. order_no=c.order_no or "",
  266. model_code=c.model_code,
  267. consumption_date=str(c.consumption_date) if c.consumption_date else "",
  268. tenant_consumed=f"{float(c.tenant_actual_total or 0):.4f}",
  269. user_discount=f"{float(c.user_discount or 1):.4f}",
  270. user_consumed=f"{float(c.user_actual_total or 0):.4f}",
  271. tenant_discount=f"{t_discount:.4f}",
  272. tenant_actual_price=f"{float(c.tenant_actual_price or 0):.4f}",
  273. super_admin_discount=f"{sa_discount:.4f}",
  274. super_admin_actual_price=f"{float(c.super_admin_actual_price or 0):.4f}",
  275. ))
  276. return ConsumptionDetailResponse(total=total, records=items)
  277. async def get_daily_stats(
  278. db: AsyncSession,
  279. start_date: str,
  280. end_date: str,
  281. super_admin_name: str | None = None,
  282. tenant_name: str | None = None,
  283. ) -> DailyStatsResponse:
  284. """
  285. 查询按日聚合的消费统计:超级管理员 → 租户 → 每日消费金额
  286. 按 consumption_date 的日期部分分组,汇总每个租户每天的消费和收取金额
  287. """
  288. # 1. 查出所有 SA、关联关系、租户
  289. sa_stmt = select(SuperAdmin)
  290. if super_admin_name:
  291. sa_stmt = sa_stmt.where(
  292. (SuperAdmin.nickname.ilike(f"%{super_admin_name}%")) |
  293. (SuperAdmin.username.ilike(f"%{super_admin_name}%"))
  294. )
  295. sa_result = await db.execute(sa_stmt)
  296. all_sas = sa_result.scalars().all()
  297. sa_name_map = {sa.id: sa.nickname or sa.username for sa in all_sas}
  298. sat_stmt = select(SuperAdminTenant)
  299. if super_admin_name:
  300. sa_ids = [sa.id for sa in all_sas]
  301. if sa_ids:
  302. sat_stmt = sat_stmt.where(SuperAdminTenant.super_admin_id.in_(sa_ids))
  303. sat_result = await db.execute(sat_stmt)
  304. sa_tenant_map: dict[int, list[int]] = {}
  305. all_tenant_ids: set[int] = set()
  306. for row in sat_result.scalars().all():
  307. sa_tenant_map.setdefault(row.super_admin_id, []).append(row.tenant_id)
  308. all_tenant_ids.add(row.tenant_id)
  309. tenant_stmt = select(Tenant).where(Tenant.id.in_(all_tenant_ids))
  310. if tenant_name:
  311. tenant_stmt = tenant_stmt.where(
  312. (Tenant.company_name.ilike(f"%{tenant_name}%")) |
  313. (Tenant.subdomain.ilike(f"%{tenant_name}%"))
  314. )
  315. tenant_result = await db.execute(tenant_stmt)
  316. tenant_map: dict[int, Tenant] = {t.id: t for t in tenant_result.scalars().all()}
  317. filtered_tenant_ids = set(tenant_map.keys())
  318. # 2. 按日 + 租户聚合消费明细
  319. start_dt = date.fromisoformat(start_date)
  320. end_dt = date.fromisoformat(end_date)
  321. stmt = select(
  322. cast(UserConsumptionDetail.consumption_date, Date).label("stat_date"),
  323. UserConsumptionDetail.tenant_id,
  324. func.sum(UserConsumptionDetail.user_actual_total).label("total_consumption"),
  325. func.sum(UserConsumptionDetail.tenant_actual_total).label("total_charged"),
  326. ).where(
  327. cast(UserConsumptionDetail.consumption_date, Date) >= start_dt,
  328. cast(UserConsumptionDetail.consumption_date, Date) <= end_dt,
  329. )
  330. if super_admin_name:
  331. matched_tenant_ids = set()
  332. for sid in [sa.id for sa in all_sas]:
  333. matched_tenant_ids.update(sa_tenant_map.get(sid, []))
  334. if matched_tenant_ids:
  335. stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(matched_tenant_ids))
  336. if tenant_name and filtered_tenant_ids:
  337. stmt = stmt.where(UserConsumptionDetail.tenant_id.in_(filtered_tenant_ids))
  338. stmt = stmt.group_by("stat_date", UserConsumptionDetail.tenant_id).order_by("stat_date")
  339. result = await db.execute(stmt)
  340. rows = result.fetchall()
  341. # 3. 组装数据:按 SA → 租户 → 日期分组
  342. # daily[sa_id][tenant_id] = {date: {consumption, charged}}
  343. daily: dict[int, dict[int, dict[str, dict]]] = {}
  344. for row in rows:
  345. stat_date = str(row.stat_date)
  346. tid = row.tenant_id
  347. # 找到该租户所属的 SA
  348. sa_id = None
  349. for sid, tids in sa_tenant_map.items():
  350. if tid in tids:
  351. sa_id = sid
  352. break
  353. if sa_id is None:
  354. continue
  355. daily.setdefault(sa_id, {})
  356. daily[sa_id].setdefault(tid, {})
  357. daily[sa_id][tid][stat_date] = {
  358. "consumption": f"{float(row.total_consumption):.4f}",
  359. "charged": f"{float(row.total_charged):.4f}",
  360. }
  361. # 4. 构建返回
  362. sa_stats: list[DailySAStat] = []
  363. for sa in all_sas:
  364. sa_id = sa.id
  365. tenant_ids = sa_tenant_map.get(sa_id, [])
  366. # 收集所有日期
  367. all_dates: set[str] = set()
  368. for tid in tenant_ids:
  369. if tid in daily.get(sa_id, {}):
  370. all_dates.update(daily[sa_id][tid].keys())
  371. sorted_dates = sorted(all_dates)
  372. tenant_stats: list[DailyTenantStat] = []
  373. for tid in tenant_ids:
  374. tenant = tenant_map.get(tid)
  375. if not tenant:
  376. continue
  377. tname = tenant.company_name or tenant.subdomain
  378. for d in sorted_dates:
  379. stat = daily.get(sa_id, {}).get(tid, {}).get(d)
  380. tenant_stats.append(DailyTenantStat(
  381. tenant_name=tname,
  382. date=d,
  383. consumption=stat["consumption"] if stat else "0.0000",
  384. charged=stat["charged"] if stat else "0.0000",
  385. ))
  386. sa_stats.append(DailySAStat(
  387. sa_name=sa.remark or sa.username,
  388. date="",
  389. consumption="0",
  390. charged="0",
  391. tenants=tenant_stats,
  392. ))
  393. return DailyStatsResponse(sa_stats=sa_stats)