domain_fetch.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import httpx
  2. from sqlalchemy import select
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from app.models.domain import MonitoredDomain
  5. from app.models.monitoring import (
  6. SuperAdmin,
  7. SuperAdminTenant,
  8. Tenant,
  9. Model,
  10. UserConsumptionDetail,
  11. )
  12. from datetime import datetime
  13. async def fetch_domain_transactions(domain: str, db: AsyncSession, fetch_date: str | None = None) -> dict:
  14. """
  15. 爬取指定域名的监控大屏数据
  16. 请求地址: {domain}/api/public/monitoring/dashboard
  17. 先尝试 HTTPS,失败后自动降级到 HTTP
  18. fetch_date: 指定爬取日期(YYYY-MM-DD),不传则查全部
  19. 爬取后将数据存入本地数据库
  20. """
  21. qs = []
  22. if fetch_date:
  23. qs.append(f"start_date={fetch_date}&end_date={fetch_date}")
  24. query_str = ("?" + "&".join(qs)) if qs else ""
  25. async with httpx.AsyncClient(timeout=30, verify=False) as client:
  26. # 先尝试 HTTPS
  27. url = f"https://{domain}/api/public/monitoring/dashboard{query_str}"
  28. try:
  29. resp = await client.get(url)
  30. resp.raise_for_status()
  31. except httpx.HTTPStatusError:
  32. # 服务器返回了 HTTP 错误(如 404/500),不降级,直接抛出
  33. raise
  34. except Exception:
  35. # HTTPS 连接失败(SSL/网络错误),降级到 HTTP
  36. url = f"http://{domain}/api/public/monitoring/dashboard{query_str}"
  37. resp = await client.get(url)
  38. resp.raise_for_status()
  39. data = resp.json()
  40. # 将爬取到的数据存入本地数据库
  41. await _save_dashboard_data(domain, data, db, fetch_date=fetch_date)
  42. return data
  43. async def _save_dashboard_data(domain: str, data: dict, db: AsyncSession, fetch_date: str | None = None):
  44. """
  45. 将监控大屏数据存入本地数据库
  46. 数据包含超级管理员 → 租户 → 用户 → 模型消费明细
  47. """
  48. # 获取域名备注
  49. domain_result = await db.execute(
  50. select(MonitoredDomain).where(MonitoredDomain.domain == domain)
  51. )
  52. domain_record = domain_result.scalar_one_or_none()
  53. domain_remark = domain_record.remark if domain_record else None
  54. super_admins = data.get("super_admins", [])
  55. for sa_data in super_admins:
  56. sa_id = sa_data.get("super_admin_id")
  57. if sa_id is None:
  58. continue
  59. # 1. 保存/更新超级管理员
  60. sa_result = await db.execute(
  61. select(SuperAdmin).where(SuperAdmin.id == sa_id)
  62. )
  63. sa = sa_result.scalar_one_or_none()
  64. if not sa:
  65. sa = SuperAdmin(
  66. id=sa_id,
  67. username=sa_data.get("username") or f"unassigned_{sa_id}",
  68. nickname=sa_data.get("nickname"),
  69. remark=domain_remark,
  70. )
  71. db.add(sa)
  72. else:
  73. sa.username = sa_data.get("username") or sa.username
  74. sa.nickname = sa_data.get("nickname") or sa.nickname
  75. # 始终用域名备注同步到超管备注
  76. if domain_remark:
  77. sa.remark = domain_remark
  78. # 同步 super_admin_id 到域名记录
  79. if domain_record:
  80. domain_record.super_admin_id = sa_id
  81. await db.flush()
  82. # 2. 遍历租户
  83. for t_data in sa_data.get("tenants", []):
  84. t_id = t_data.get("tenant_id")
  85. if t_id is None:
  86. continue
  87. # 保存/更新租户
  88. t_result = await db.execute(
  89. select(Tenant).where(Tenant.id == t_id)
  90. )
  91. tenant = t_result.scalar_one_or_none()
  92. if not tenant:
  93. tenant = Tenant(
  94. id=t_id,
  95. company_name=t_data.get("company_name"),
  96. subdomain=t_data.get("subdomain", ""),
  97. balance=t_data.get("balance", 0),
  98. )
  99. db.add(tenant)
  100. else:
  101. tenant.company_name = t_data.get("company_name") or tenant.company_name
  102. tenant.subdomain = t_data.get("subdomain", tenant.subdomain)
  103. tenant.balance = t_data.get("balance", tenant.balance)
  104. # 保存超级管理员-租户关联
  105. sat_result = await db.execute(
  106. select(SuperAdminTenant).where(
  107. SuperAdminTenant.super_admin_id == sa_id,
  108. SuperAdminTenant.tenant_id == t_id,
  109. )
  110. )
  111. if not sat_result.scalar_one_or_none():
  112. db.add(SuperAdminTenant(super_admin_id=sa_id, tenant_id=t_id))
  113. # 3. 遍历用户
  114. for u_data in t_data.get("users", []):
  115. u_id = u_data.get("user_id")
  116. if not u_id:
  117. continue
  118. # 4. 遍历消费明细(远程 API 返回字段为 consumption_records)
  119. for m_data in u_data.get("consumption_records", []):
  120. m_code = m_data.get("model_code")
  121. if not m_code:
  122. continue
  123. # 保存/更新模型信息
  124. m_result = await db.execute(
  125. select(Model).where(Model.model_code == m_code)
  126. )
  127. model = m_result.scalar_one_or_none()
  128. if not model:
  129. model = Model(
  130. model_code=m_code,
  131. model_name=m_data.get("model_name", ""),
  132. original_price=None,
  133. )
  134. db.add(model)
  135. # 保存消费明细
  136. if fetch_date:
  137. consumption_date = datetime.strptime(fetch_date, "%Y-%m-%d")
  138. else:
  139. created_at_str = m_data.get("created_at", "")
  140. if created_at_str:
  141. consumption_date = datetime.fromisoformat(created_at_str)
  142. else:
  143. consumption_date = datetime.now()
  144. db.add(UserConsumptionDetail(
  145. user_id=u_id,
  146. username=u_data.get("username"),
  147. tenant_id=t_id,
  148. model_code=m_code,
  149. call_count=1,
  150. order_no=m_data.get("order_no"),
  151. user_actual_total=m_data.get("amount"),
  152. user_discount=m_data.get("user_discount"),
  153. user_actual_price=m_data.get("user_actual_price"),
  154. tenant_actual_total=m_data.get("tenant_actual_price"),
  155. tenant_discount=m_data.get("tenant_discount"),
  156. tenant_actual_price=m_data.get("tenant_actual_price"),
  157. super_admin_discount=m_data.get("super_admin_discount"),
  158. super_admin_actual_price=m_data.get("super_admin_actual_price"),
  159. original_price=None,
  160. consumption_date=consumption_date,
  161. ))
  162. await db.commit()