| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- from fastapi import APIRouter, Depends, HTTPException, Query
- from pydantic import BaseModel
- from sqlalchemy import select, func, desc
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.database import get_db
- from app.models.domain import MonitoredDomain
- from app.models.monitoring import SuperAdmin, FetchScheduleConfig, FetchLog, SuperAdminTenant
- from app.schemas.domain import (
- MonitoredDomainCreate,
- MonitoredDomainResponse,
- )
- from app.services.domain_fetch import fetch_domain_transactions
- import logging
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/api/domains", tags=["domains"])
- @router.post("/", response_model=MonitoredDomainResponse, status_code=201)
- async def add_domain(
- payload: MonitoredDomainCreate,
- db: AsyncSession = Depends(get_db),
- ):
- """添加需要监控的域名,同时创建对应的超管记录"""
- existing = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.domain == payload.domain)
- )
- if existing.scalar_one_or_none():
- raise HTTPException(status_code=409, detail="域名已在监控中")
- # 如果未指定超管,自动创建一条超管记录
- sa_id = payload.super_admin_id
- if sa_id is None:
- max_id_result = await db.execute(select(func.max(SuperAdmin.id)))
- max_id = max_id_result.scalar() or 0
- new_sa = SuperAdmin(
- id=max_id + 1,
- username=payload.domain,
- nickname=payload.domain,
- remark=payload.remark or None,
- )
- db.add(new_sa)
- await db.flush()
- sa_id = new_sa.id
- record = MonitoredDomain(domain=payload.domain, remark=payload.remark or None, super_admin_id=sa_id)
- db.add(record)
- await db.commit()
- await db.refresh(record)
- return record
- @router.get("/", response_model=list[MonitoredDomainResponse])
- async def list_domains(db: AsyncSession = Depends(get_db)):
- """获取所有已监控的域名列表"""
- result = await db.execute(select(MonitoredDomain))
- return result.scalars().all()
- class MonitoredDomainUpdate(BaseModel):
- """更新域名备注"""
- remark: str = ""
- @router.patch("/{domain_id}", response_model=MonitoredDomainResponse)
- async def update_domain_remark(
- domain_id: int,
- payload: MonitoredDomainUpdate,
- db: AsyncSession = Depends(get_db),
- ):
- """更新域名备注,并同步到关联的超管"""
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise HTTPException(status_code=404, detail="域名不存在")
- record.remark = payload.remark or None
- # 同步到关联的超管
- if record.super_admin_id:
- sa_result = await db.execute(
- select(SuperAdmin).where(SuperAdmin.id == record.super_admin_id)
- )
- sa = sa_result.scalar_one_or_none()
- if sa:
- sa.remark = payload.remark or None
- await db.commit()
- await db.refresh(record)
- return record
- @router.delete("/{domain_id}", status_code=204)
- async def remove_domain(domain_id: int, db: AsyncSession = Depends(get_db)):
- """移除指定 ID 的监控域名,并级联清理所有关联数据"""
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise HTTPException(status_code=404, detail="域名不存在")
- domain_name = record.domain
- sa_id = record.super_admin_id
- # 1. 删除该域名相关的爬取日志
- await db.execute(
- FetchLog.__table__.delete().where(FetchLog.domain == domain_name)
- )
- # 2. 删除域名记录本身
- await db.delete(record)
- # 3. 如果没有其他域名关联这个超管,级联清理超管及其所有关联数据
- if sa_id is not None:
- remaining = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.super_admin_id == sa_id)
- )
- if not remaining.scalar_one_or_none():
- # 3a. 删除该超管的所有 License
- from app.models.license import SuperAdminLicense
- await db.execute(
- SuperAdminLicense.__table__.delete().where(SuperAdminLicense.super_admin_id == sa_id)
- )
- # 3b. 查找该超管关联的所有租户
- tenant_result = await db.execute(
- select(SuperAdminTenant.tenant_id).where(SuperAdminTenant.super_admin_id == sa_id)
- )
- tenant_ids = [row[0] for row in tenant_result.all()]
- # 3c. 先删除超管-租户关联
- await db.execute(
- SuperAdminTenant.__table__.delete().where(SuperAdminTenant.super_admin_id == sa_id)
- )
- # 3d. 删除不再被任何超管关联的租户及其消费明细
- from app.models.monitoring import Tenant, UserConsumptionDetail
- if tenant_ids:
- for tid in tenant_ids:
- # 检查是否还有其他超管关联此租户
- other = await db.execute(
- select(SuperAdminTenant).where(SuperAdminTenant.tenant_id == tid)
- )
- if not other.scalar_one_or_none():
- # 删除消费明细
- await db.execute(
- UserConsumptionDetail.__table__.delete().where(
- UserConsumptionDetail.tenant_id == tid
- )
- )
- # 删除租户
- tenant_row = await db.execute(
- select(Tenant).where(Tenant.id == tid)
- )
- t = tenant_row.scalar_one_or_none()
- if t:
- await db.delete(t)
- # 3e. 删除超管本身
- sa_result = await db.execute(
- select(SuperAdmin).where(SuperAdmin.id == sa_id)
- )
- sa = sa_result.scalar_one_or_none()
- if sa:
- await db.delete(sa)
- await db.commit()
- @router.get("/{domain_id}/transactions")
- async def get_domain_transactions(
- domain_id: int,
- fetch_date: str | None = Query(None, description="爬取指定日期,格式 YYYY-MM-DD,不传则查全部"),
- db: AsyncSession = Depends(get_db),
- ):
- """爬取指定域名的监控数据并入库"""
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise HTTPException(status_code=404, detail="域名不存在")
- try:
- data = await fetch_domain_transactions(record.domain, db, fetch_date=fetch_date)
- db.add(FetchLog(domain=record.domain, status="success", message="手动爬取成功"))
- await db.commit()
- return {"status": "ok", "domain": record.domain, "data": data}
- except Exception as e:
- error_msg = str(e)[:500]
- db.add(FetchLog(domain=record.domain, status="failed", message=error_msg))
- await db.commit()
- raise HTTPException(status_code=500, detail=error_msg)
- @router.post("/fetch-all", status_code=202)
- async def fetch_all_transactions(
- fetch_date: str | None = Query(None, description="爬取指定日期,格式 YYYY-MM-DD,不传则爬取当天"),
- db: AsyncSession = Depends(get_db),
- ):
- """批量爬取所有已启用域名的监控数据,默认只爬取当天"""
- if not fetch_date:
- from datetime import datetime, timezone, timedelta
- CST = timezone(timedelta(hours=8))
- fetch_date = datetime.now(CST).strftime("%Y-%m-%d")
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.is_active == True)
- )
- domains = result.scalars().all()
- errors = []
- for d in domains:
- try:
- await fetch_domain_transactions(d.domain, db, fetch_date=fetch_date)
- db.add(FetchLog(domain=d.domain, status="success", message="手动批量爬取成功"))
- except Exception as e:
- error_msg = str(e)[:500]
- db.add(FetchLog(domain=d.domain, status="failed", message=error_msg))
- errors.append({"domain": d.domain, "error": error_msg})
- await db.commit()
- return {"status": "ok", "total": len(domains), "errors": errors}
- class ScheduleConfigUpdate(BaseModel):
- """更新定时爬取配置"""
- enabled: bool
- schedule_time: str # HH:MM
- @router.get("/schedule")
- async def get_schedule_config(db: AsyncSession = Depends(get_db)):
- """获取定时爬取配置"""
- result = await db.execute(select(FetchScheduleConfig).limit(1))
- config = result.scalar_one_or_none()
- if not config:
- config = FetchScheduleConfig(enabled=False, schedule_time="02:00")
- db.add(config)
- await db.commit()
- await db.refresh(config)
- return {"enabled": config.enabled, "schedule_time": config.schedule_time}
- @router.put("/schedule")
- async def update_schedule_config(
- payload: ScheduleConfigUpdate,
- db: AsyncSession = Depends(get_db),
- ):
- """更新定时爬取配置"""
- result = await db.execute(select(FetchScheduleConfig).limit(1))
- config = result.scalar_one_or_none()
- if not config:
- config = FetchScheduleConfig()
- db.add(config)
- config.enabled = payload.enabled
- config.schedule_time = payload.schedule_time
- await db.commit()
- return {"message": "配置已保存", "enabled": config.enabled, "schedule_time": config.schedule_time}
- @router.get("/fetch-logs")
- async def list_fetch_logs(
- domain: str | None = Query(None, description="按域名筛选"),
- status: str | None = Query(None, description="按状态筛选: success / failed / skipped"),
- page: int = Query(1, ge=1),
- size: int = Query(20, ge=1, le=100),
- db: AsyncSession = Depends(get_db),
- ):
- """获取爬取日志列表"""
- conditions = []
- if domain:
- conditions.append(FetchLog.domain == domain)
- if status:
- conditions.append(FetchLog.status == status)
- count_query = select(func.count()).select_from(FetchLog)
- if conditions:
- count_query = count_query.where(*conditions)
- total = await db.scalar(count_query) or 0
- logs_query = select(FetchLog)
- if conditions:
- logs_query = logs_query.where(*conditions)
- logs_query = logs_query.order_by(desc(FetchLog.created_at)).offset((page - 1) * size).limit(size)
- logs_result = await db.execute(logs_query)
- logs = logs_result.scalars().all()
- return {
- "items": [
- {
- "id": log.id,
- "domain": log.domain,
- "status": log.status,
- "message": log.message,
- "created_at": log.created_at.isoformat() if log.created_at else None,
- }
- for log in logs
- ],
- "total": total,
- "page": page,
- "size": size,
- }
|