| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- from fastapi import APIRouter, Depends, HTTPException, Query
- from pydantic import BaseModel
- from sqlalchemy import select, func
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.database import get_db
- from app.models.domain import MonitoredDomain
- from app.models.monitoring import SuperAdmin
- from app.schemas.domain import (
- MonitoredDomainCreate,
- MonitoredDomainResponse,
- )
- from app.services.domain_fetch import fetch_domain_transactions
- from app.redis import get_redis
- router = APIRouter(prefix="/api/domains", tags=["domains"])
- @router.post("/", response_model=MonitoredDomainResponse, status_code=201)
- async def add_domain(
- payload: MonitoredDomainCreate,
- db: AsyncSession = Depends(get_db),
- ):
- """添加需要监控的域名,同时创建对应的超管记录"""
- existing = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.domain == payload.domain)
- )
- if existing.scalar_one_or_none():
- raise HTTPException(status_code=409, detail="域名已在监控中")
- # 如果未指定超管,自动创建一条超管记录
- sa_id = payload.super_admin_id
- if sa_id is None:
- max_id_result = await db.execute(select(func.max(SuperAdmin.id)))
- max_id = max_id_result.scalar() or 0
- new_sa = SuperAdmin(
- id=max_id + 1,
- username=payload.domain,
- nickname=payload.domain,
- remark=payload.remark or None,
- )
- db.add(new_sa)
- await db.flush()
- sa_id = new_sa.id
- record = MonitoredDomain(domain=payload.domain, remark=payload.remark or None, super_admin_id=sa_id)
- db.add(record)
- await db.commit()
- await db.refresh(record)
- return record
- @router.get("/", response_model=list[MonitoredDomainResponse])
- async def list_domains(db: AsyncSession = Depends(get_db)):
- """获取所有已监控的域名列表"""
- result = await db.execute(select(MonitoredDomain))
- return result.scalars().all()
- class MonitoredDomainUpdate(BaseModel):
- """更新域名备注"""
- remark: str = ""
- @router.patch("/{domain_id}", response_model=MonitoredDomainResponse)
- async def update_domain_remark(
- domain_id: int,
- payload: MonitoredDomainUpdate,
- db: AsyncSession = Depends(get_db),
- ):
- """更新域名备注,并同步到关联的超管"""
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise HTTPException(status_code=404, detail="域名不存在")
- record.remark = payload.remark or None
- # 同步到关联的超管
- if record.super_admin_id:
- sa_result = await db.execute(
- select(SuperAdmin).where(SuperAdmin.id == record.super_admin_id)
- )
- sa = sa_result.scalar_one_or_none()
- if sa:
- sa.remark = payload.remark or None
- await db.commit()
- await db.refresh(record)
- return record
- @router.delete("/{domain_id}", status_code=204)
- async def remove_domain(domain_id: int, db: AsyncSession = Depends(get_db)):
- """移除指定 ID 的监控域名"""
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise HTTPException(status_code=404, detail="域名不存在")
- await db.delete(record)
- await db.commit()
- @router.get("/{domain_id}/transactions")
- async def get_domain_transactions(
- domain_id: int,
- fetch_date: str | None = Query(None, description="爬取指定日期,格式 YYYY-MM-DD,不传则查全部"),
- db: AsyncSession = Depends(get_db),
- ):
- """爬取指定域名的监控数据并入库"""
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise HTTPException(status_code=404, detail="域名不存在")
- data = await fetch_domain_transactions(record.domain, db, fetch_date=fetch_date)
- return {"status": "ok", "domain": record.domain, "data": data}
- @router.post("/fetch-all", status_code=202)
- async def fetch_all_transactions(
- fetch_date: str | None = Query(None, description="爬取指定日期,格式 YYYY-MM-DD,不传则爬取当天"),
- db: AsyncSession = Depends(get_db),
- ):
- """批量爬取所有已启用域名的监控数据,默认只爬取当天"""
- if not fetch_date:
- from datetime import datetime, timezone, timedelta
- CST = timezone(timedelta(hours=8))
- fetch_date = datetime.now(CST).strftime("%Y-%m-%d")
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.is_active == True)
- )
- domains = result.scalars().all()
- errors = []
- for d in domains:
- try:
- await fetch_domain_transactions(d.domain, db, fetch_date=fetch_date)
- except Exception as e:
- errors.append({"domain": d.domain, "error": str(e)})
- return {"status": "ok", "total": len(domains), "errors": errors}
- class ScheduleConfigUpdate(BaseModel):
- """更新定时爬取配置"""
- enabled: bool
- schedule_time: str # HH:MM
- @router.get("/schedule")
- async def get_schedule_config():
- """获取定时爬取配置(Redis)"""
- r = await get_redis()
- enabled = await r.get("fetch_schedule:enabled")
- schedule_time = await r.get("fetch_schedule:time")
- return {
- "enabled": enabled == "true",
- "schedule_time": schedule_time or "02:00",
- }
- @router.put("/schedule")
- async def update_schedule_config(payload: ScheduleConfigUpdate):
- """更新定时爬取配置(Redis)"""
- r = await get_redis()
- await r.set("fetch_schedule:enabled", "true" if payload.enabled else "false")
- await r.set("fetch_schedule:time", payload.schedule_time)
- return {"message": "配置已保存", "enabled": payload.enabled, "schedule_time": payload.schedule_time}
|