import httpx from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.domain import MonitoredDomain from app.models.monitoring import ( SuperAdmin, SuperAdminTenant, Tenant, Model, UserConsumptionDetail, ) from datetime import datetime async def fetch_domain_transactions(domain: str, db: AsyncSession, fetch_date: str | None = None) -> dict: """ 爬取指定域名的监控大屏数据 请求地址: {domain}/api/public/monitoring/dashboard 先尝试 HTTPS,失败后自动降级到 HTTP fetch_date: 指定爬取日期(YYYY-MM-DD),不传则查全部 爬取后将数据存入本地数据库 """ qs = [] if fetch_date: qs.append(f"start_date={fetch_date}&end_date={fetch_date}") query_str = ("?" + "&".join(qs)) if qs else "" async with httpx.AsyncClient(timeout=30, verify=False) as client: # 先尝试 HTTPS url = f"https://{domain}/api/public/monitoring/dashboard{query_str}" try: resp = await client.get(url) resp.raise_for_status() except httpx.HTTPStatusError as e: # 如果带了日期参数报 404/500,去掉参数重试 if fetch_date and e.response.status_code in (404, 500): url = f"https://{domain}/api/public/monitoring/dashboard" resp = await client.get(url) resp.raise_for_status() # 不带参数时全量返回,由 _save_dashboard_data 按日期过滤 data = resp.json() return _filter_by_date(data, fetch_date) else: raise except Exception: # HTTPS 连接失败(SSL/网络错误),降级到 HTTP url = f"http://{domain}/api/public/monitoring/dashboard{query_str}" try: resp = await client.get(url) resp.raise_for_status() except httpx.HTTPStatusError as e: if fetch_date and e.response.status_code in (404, 500): url = f"http://{domain}/api/public/monitoring/dashboard" resp = await client.get(url) resp.raise_for_status() data = resp.json() return _filter_by_date(data, fetch_date) else: raise data = resp.json() # 将爬取到的数据存入本地数据库 await _save_dashboard_data(domain, data, db, fetch_date=fetch_date) return data def _filter_by_date(data: dict, fetch_date: str | None) -> dict: """按日期过滤返回数据,只保留 fetch_date 当天的消费记录""" if not fetch_date: return data for sa in data.get("super_admins", []): for t in sa.get("tenants", []): filtered_users = [] for u in t.get("users", []): filtered_records = [] for m in u.get("consumption_records", []): created_at = m.get("created_at", "") if created_at and created_at.startswith(fetch_date): filtered_records.append(m) if filtered_records: u["consumption_records"] = filtered_records filtered_users.append(u) t["users"] = filtered_users return data async def _save_dashboard_data(domain: str, data: dict, db: AsyncSession, fetch_date: str | None = None): """ 将监控大屏数据存入本地数据库 数据包含超级管理员 → 租户 → 用户 → 模型消费明细 """ # 获取域名备注 domain_result = await db.execute( select(MonitoredDomain).where(MonitoredDomain.domain == domain) ) domain_record = domain_result.scalar_one_or_none() domain_remark = domain_record.remark if domain_record else None super_admins = data.get("super_admins", []) for sa_data in super_admins: sa_id = sa_data.get("super_admin_id") if sa_id is None: continue # 1. 保存/更新超级管理员 sa_result = await db.execute( select(SuperAdmin).where(SuperAdmin.id == sa_id) ) sa = sa_result.scalar_one_or_none() if not sa: sa = SuperAdmin( id=sa_id, username=sa_data.get("username") or f"unassigned_{sa_id}", nickname=sa_data.get("nickname"), remark=domain_remark, ) db.add(sa) else: sa.username = sa_data.get("username") or sa.username sa.nickname = sa_data.get("nickname") or sa.nickname # 始终用域名备注同步到超管备注 if domain_remark: sa.remark = domain_remark # 同步 super_admin_id 到域名记录 if domain_record: domain_record.super_admin_id = sa_id await db.flush() # 2. 遍历租户 for t_data in sa_data.get("tenants", []): t_id = t_data.get("tenant_id") if t_id is None: continue # 保存/更新租户 t_result = await db.execute( select(Tenant).where(Tenant.id == t_id) ) tenant = t_result.scalar_one_or_none() if not tenant: tenant = Tenant( id=t_id, company_name=t_data.get("company_name"), subdomain=t_data.get("subdomain", ""), balance=t_data.get("balance", 0), ) db.add(tenant) else: tenant.company_name = t_data.get("company_name") or tenant.company_name tenant.subdomain = t_data.get("subdomain", tenant.subdomain) tenant.balance = t_data.get("balance", tenant.balance) # 保存超级管理员-租户关联 sat_result = await db.execute( select(SuperAdminTenant).where( SuperAdminTenant.super_admin_id == sa_id, SuperAdminTenant.tenant_id == t_id, ) ) if not sat_result.scalar_one_or_none(): db.add(SuperAdminTenant(super_admin_id=sa_id, tenant_id=t_id)) # 3. 遍历用户 for u_data in t_data.get("users", []): u_id = u_data.get("user_id") if not u_id: continue # 4. 遍历消费明细(远程 API 返回字段为 consumption_records) for m_data in u_data.get("consumption_records", []): m_code = m_data.get("model_code") if not m_code: continue # 保存/更新模型信息 m_result = await db.execute( select(Model).where(Model.model_code == m_code) ) model = m_result.scalar_one_or_none() if not model: model = Model( model_code=m_code, model_name=m_data.get("model_name", ""), original_price=None, ) db.add(model) # 保存消费明细 if fetch_date: consumption_date = datetime.strptime(fetch_date, "%Y-%m-%d") else: created_at_str = m_data.get("created_at", "") if created_at_str: consumption_date = datetime.fromisoformat(created_at_str) else: consumption_date = datetime.now() db.add(UserConsumptionDetail( user_id=u_id, username=u_data.get("username"), tenant_id=t_id, model_code=m_code, call_count=1, order_no=m_data.get("order_no"), user_actual_total=m_data.get("amount"), user_discount=m_data.get("user_discount"), user_actual_price=m_data.get("user_actual_price"), tenant_actual_total=m_data.get("tenant_actual_price"), tenant_discount=m_data.get("tenant_discount"), tenant_actual_price=m_data.get("tenant_actual_price"), super_admin_discount=m_data.get("super_admin_discount"), super_admin_actual_price=m_data.get("super_admin_actual_price"), original_price=None, consumption_date=consumption_date, )) await db.commit()