domain_fetch.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import httpx
  2. from sqlalchemy import select
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from app.models.domain import MonitoredDomain
  5. from app.models.monitoring import (
  6. SuperAdmin,
  7. SuperAdminTenant,
  8. Tenant,
  9. Model,
  10. UserConsumptionDetail,
  11. )
  12. from datetime import datetime
  13. async def fetch_domain_transactions(domain: str, db: AsyncSession, fetch_date: str | None = None) -> dict:
  14. """
  15. 爬取指定域名的监控大屏数据
  16. 请求地址: {domain}/api/public/monitoring/dashboard
  17. 先尝试 HTTPS,失败后自动降级到 HTTP
  18. fetch_date: 指定爬取日期(YYYY-MM-DD),不传则查全部
  19. 爬取后将数据存入本地数据库
  20. """
  21. qs = []
  22. if fetch_date:
  23. qs.append(f"start_date={fetch_date}&end_date={fetch_date}")
  24. query_str = ("?" + "&".join(qs)) if qs else ""
  25. async with httpx.AsyncClient(timeout=30, verify=False) as client:
  26. # 先尝试 HTTPS
  27. url = f"https://{domain}/api/public/monitoring/dashboard{query_str}"
  28. try:
  29. resp = await client.get(url)
  30. resp.raise_for_status()
  31. except httpx.HTTPStatusError as e:
  32. # 如果带了日期参数报 404/500,去掉参数重试
  33. if fetch_date and e.response.status_code in (404, 500):
  34. url = f"https://{domain}/api/public/monitoring/dashboard"
  35. resp = await client.get(url)
  36. resp.raise_for_status()
  37. # 不带参数时全量返回,由 _save_dashboard_data 按日期过滤
  38. data = resp.json()
  39. return _filter_by_date(data, fetch_date)
  40. else:
  41. raise
  42. except Exception:
  43. # HTTPS 连接失败(SSL/网络错误),降级到 HTTP
  44. url = f"http://{domain}/api/public/monitoring/dashboard{query_str}"
  45. try:
  46. resp = await client.get(url)
  47. resp.raise_for_status()
  48. except httpx.HTTPStatusError as e:
  49. if fetch_date and e.response.status_code in (404, 500):
  50. url = f"http://{domain}/api/public/monitoring/dashboard"
  51. resp = await client.get(url)
  52. resp.raise_for_status()
  53. data = resp.json()
  54. return _filter_by_date(data, fetch_date)
  55. else:
  56. raise
  57. data = resp.json()
  58. # 将爬取到的数据存入本地数据库
  59. await _save_dashboard_data(domain, data, db, fetch_date=fetch_date)
  60. return data
  61. def _filter_by_date(data: dict, fetch_date: str | None) -> dict:
  62. """按日期过滤返回数据,只保留 fetch_date 当天的消费记录"""
  63. if not fetch_date:
  64. return data
  65. for sa in data.get("super_admins", []):
  66. for t in sa.get("tenants", []):
  67. filtered_users = []
  68. for u in t.get("users", []):
  69. filtered_records = []
  70. for m in u.get("consumption_records", []):
  71. created_at = m.get("created_at", "")
  72. if created_at and created_at.startswith(fetch_date):
  73. filtered_records.append(m)
  74. if filtered_records:
  75. u["consumption_records"] = filtered_records
  76. filtered_users.append(u)
  77. t["users"] = filtered_users
  78. return data
  79. async def _save_dashboard_data(domain: str, data: dict, db: AsyncSession, fetch_date: str | None = None):
  80. """
  81. 将监控大屏数据存入本地数据库
  82. 数据包含超级管理员 → 租户 → 用户 → 模型消费明细
  83. """
  84. # 获取域名备注
  85. domain_result = await db.execute(
  86. select(MonitoredDomain).where(MonitoredDomain.domain == domain)
  87. )
  88. domain_record = domain_result.scalar_one_or_none()
  89. domain_remark = domain_record.remark if domain_record else None
  90. super_admins = data.get("super_admins", [])
  91. for sa_data in super_admins:
  92. sa_id = sa_data.get("super_admin_id")
  93. if sa_id is None:
  94. continue
  95. # 1. 保存/更新超级管理员
  96. sa_result = await db.execute(
  97. select(SuperAdmin).where(SuperAdmin.id == sa_id)
  98. )
  99. sa = sa_result.scalar_one_or_none()
  100. if not sa:
  101. sa = SuperAdmin(
  102. id=sa_id,
  103. username=sa_data.get("username") or f"unassigned_{sa_id}",
  104. nickname=sa_data.get("nickname"),
  105. remark=domain_remark,
  106. )
  107. db.add(sa)
  108. else:
  109. sa.username = sa_data.get("username") or sa.username
  110. sa.nickname = sa_data.get("nickname") or sa.nickname
  111. # 始终用域名备注同步到超管备注
  112. if domain_remark:
  113. sa.remark = domain_remark
  114. # 同步 super_admin_id 到域名记录
  115. if domain_record:
  116. domain_record.super_admin_id = sa_id
  117. await db.flush()
  118. # 2. 遍历租户
  119. for t_data in sa_data.get("tenants", []):
  120. t_id = t_data.get("tenant_id")
  121. if t_id is None:
  122. continue
  123. # 保存/更新租户
  124. t_result = await db.execute(
  125. select(Tenant).where(Tenant.id == t_id)
  126. )
  127. tenant = t_result.scalar_one_or_none()
  128. if not tenant:
  129. tenant = Tenant(
  130. id=t_id,
  131. company_name=t_data.get("company_name"),
  132. subdomain=t_data.get("subdomain", ""),
  133. balance=t_data.get("balance", 0),
  134. )
  135. db.add(tenant)
  136. else:
  137. tenant.company_name = t_data.get("company_name") or tenant.company_name
  138. tenant.subdomain = t_data.get("subdomain", tenant.subdomain)
  139. tenant.balance = t_data.get("balance", tenant.balance)
  140. # 保存超级管理员-租户关联
  141. sat_result = await db.execute(
  142. select(SuperAdminTenant).where(
  143. SuperAdminTenant.super_admin_id == sa_id,
  144. SuperAdminTenant.tenant_id == t_id,
  145. )
  146. )
  147. if not sat_result.scalar_one_or_none():
  148. db.add(SuperAdminTenant(super_admin_id=sa_id, tenant_id=t_id))
  149. # 3. 遍历用户
  150. for u_data in t_data.get("users", []):
  151. u_id = u_data.get("user_id")
  152. if not u_id:
  153. continue
  154. # 4. 遍历消费明细(远程 API 返回字段为 consumption_records)
  155. for m_data in u_data.get("consumption_records", []):
  156. m_code = m_data.get("model_code")
  157. if not m_code:
  158. continue
  159. # 保存/更新模型信息
  160. m_result = await db.execute(
  161. select(Model).where(Model.model_code == m_code)
  162. )
  163. model = m_result.scalar_one_or_none()
  164. if not model:
  165. model = Model(
  166. model_code=m_code,
  167. model_name=m_data.get("model_name", ""),
  168. original_price=None,
  169. )
  170. db.add(model)
  171. # 保存消费明细
  172. if fetch_date:
  173. consumption_date = datetime.strptime(fetch_date, "%Y-%m-%d")
  174. else:
  175. created_at_str = m_data.get("created_at", "")
  176. if created_at_str:
  177. consumption_date = datetime.fromisoformat(created_at_str)
  178. else:
  179. consumption_date = datetime.now()
  180. db.add(UserConsumptionDetail(
  181. user_id=u_id,
  182. username=u_data.get("username"),
  183. tenant_id=t_id,
  184. model_code=m_code,
  185. call_count=1,
  186. order_no=m_data.get("order_no"),
  187. user_actual_total=m_data.get("amount"),
  188. user_discount=m_data.get("user_discount"),
  189. user_actual_price=m_data.get("user_actual_price"),
  190. tenant_actual_total=m_data.get("tenant_actual_price"),
  191. tenant_discount=m_data.get("tenant_discount"),
  192. tenant_actual_price=m_data.get("tenant_actual_price"),
  193. super_admin_discount=m_data.get("super_admin_discount"),
  194. super_admin_actual_price=m_data.get("super_admin_actual_price"),
  195. original_price=None,
  196. consumption_date=consumption_date,
  197. ))
  198. await db.commit()