| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- import httpx
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.models.domain import MonitoredDomain
- from app.models.monitoring import (
- SuperAdmin,
- SuperAdminTenant,
- Tenant,
- Model,
- UserConsumptionDetail,
- )
- from datetime import datetime
- async def fetch_domain_transactions(domain: str, db: AsyncSession, fetch_date: str | None = None) -> dict:
- """
- 爬取指定域名的监控大屏数据
- 请求地址: {domain}/api/public/monitoring/dashboard
- 先尝试 HTTPS,失败后自动降级到 HTTP
- fetch_date: 指定爬取日期(YYYY-MM-DD),不传则查全部
- 爬取后将数据存入本地数据库
- """
- qs = []
- if fetch_date:
- qs.append(f"start_date={fetch_date}&end_date={fetch_date}")
- query_str = ("?" + "&".join(qs)) if qs else ""
- async with httpx.AsyncClient(timeout=30, verify=False) as client:
- # 先尝试 HTTPS
- url = f"https://{domain}/api/public/monitoring/dashboard{query_str}"
- try:
- resp = await client.get(url)
- resp.raise_for_status()
- except httpx.HTTPStatusError:
- # 服务器返回了 HTTP 错误(如 404/500),不降级,直接抛出
- raise
- except Exception:
- # HTTPS 连接失败(SSL/网络错误),降级到 HTTP
- url = f"http://{domain}/api/public/monitoring/dashboard{query_str}"
- resp = await client.get(url)
- resp.raise_for_status()
- data = resp.json()
- # 将爬取到的数据存入本地数据库
- await _save_dashboard_data(domain, data, db, fetch_date=fetch_date)
- return data
- async def _save_dashboard_data(domain: str, data: dict, db: AsyncSession, fetch_date: str | None = None):
- """
- 将监控大屏数据存入本地数据库
- 数据包含超级管理员 → 租户 → 用户 → 模型消费明细
- """
- # 获取域名备注
- domain_result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.domain == domain)
- )
- domain_record = domain_result.scalar_one_or_none()
- domain_remark = domain_record.remark if domain_record else None
- super_admins = data.get("super_admins", [])
- for sa_data in super_admins:
- sa_id = sa_data.get("super_admin_id")
- if sa_id is None:
- continue
- # 1. 保存/更新超级管理员
- sa_result = await db.execute(
- select(SuperAdmin).where(SuperAdmin.id == sa_id)
- )
- sa = sa_result.scalar_one_or_none()
- if not sa:
- sa = SuperAdmin(
- id=sa_id,
- username=sa_data.get("username") or f"unassigned_{sa_id}",
- nickname=sa_data.get("nickname"),
- remark=domain_remark,
- )
- db.add(sa)
- else:
- sa.username = sa_data.get("username") or sa.username
- sa.nickname = sa_data.get("nickname") or sa.nickname
- # 始终用域名备注同步到超管备注
- if domain_remark:
- sa.remark = domain_remark
- # 同步 super_admin_id 到域名记录
- if domain_record:
- domain_record.super_admin_id = sa_id
- await db.flush()
- # 2. 遍历租户
- for t_data in sa_data.get("tenants", []):
- t_id = t_data.get("tenant_id")
- if t_id is None:
- continue
- # 保存/更新租户
- t_result = await db.execute(
- select(Tenant).where(Tenant.id == t_id)
- )
- tenant = t_result.scalar_one_or_none()
- if not tenant:
- tenant = Tenant(
- id=t_id,
- company_name=t_data.get("company_name"),
- subdomain=t_data.get("subdomain", ""),
- balance=t_data.get("balance", 0),
- )
- db.add(tenant)
- else:
- tenant.company_name = t_data.get("company_name") or tenant.company_name
- tenant.subdomain = t_data.get("subdomain", tenant.subdomain)
- tenant.balance = t_data.get("balance", tenant.balance)
- # 保存超级管理员-租户关联
- sat_result = await db.execute(
- select(SuperAdminTenant).where(
- SuperAdminTenant.super_admin_id == sa_id,
- SuperAdminTenant.tenant_id == t_id,
- )
- )
- if not sat_result.scalar_one_or_none():
- db.add(SuperAdminTenant(super_admin_id=sa_id, tenant_id=t_id))
- # 3. 遍历用户
- for u_data in t_data.get("users", []):
- u_id = u_data.get("user_id")
- if not u_id:
- continue
- # 4. 遍历消费明细(远程 API 返回字段为 consumption_records)
- for m_data in u_data.get("consumption_records", []):
- m_code = m_data.get("model_code")
- if not m_code:
- continue
- # 保存/更新模型信息
- m_result = await db.execute(
- select(Model).where(Model.model_code == m_code)
- )
- model = m_result.scalar_one_or_none()
- if not model:
- model = Model(
- model_code=m_code,
- model_name=m_data.get("model_name", ""),
- original_price=None,
- )
- db.add(model)
- # 保存消费明细
- if fetch_date:
- consumption_date = datetime.strptime(fetch_date, "%Y-%m-%d")
- else:
- created_at_str = m_data.get("created_at", "")
- if created_at_str:
- consumption_date = datetime.fromisoformat(created_at_str)
- else:
- consumption_date = datetime.now()
- db.add(UserConsumptionDetail(
- user_id=u_id,
- username=u_data.get("username"),
- tenant_id=t_id,
- model_code=m_code,
- call_count=1,
- order_no=m_data.get("order_no"),
- user_actual_total=m_data.get("amount"),
- user_discount=m_data.get("user_discount"),
- user_actual_price=m_data.get("user_actual_price"),
- tenant_actual_total=m_data.get("tenant_actual_price"),
- tenant_discount=m_data.get("tenant_discount"),
- tenant_actual_price=m_data.get("tenant_actual_price"),
- super_admin_discount=m_data.get("super_admin_discount"),
- super_admin_actual_price=m_data.get("super_admin_actual_price"),
- original_price=None,
- consumption_date=consumption_date,
- ))
- await db.commit()
|