| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- from fastapi import APIRouter, Depends, HTTPException, Query
- from pydantic import BaseModel
- from sqlalchemy import select, func, desc
- from sqlalchemy.ext.asyncio import AsyncSession
- from app.database import get_db
- from app.models.domain import MonitoredDomain
- from app.models.monitoring import SuperAdmin, FetchScheduleConfig, FetchLog
- from app.schemas.domain import (
- MonitoredDomainCreate,
- MonitoredDomainResponse,
- )
- from app.services.domain_fetch import fetch_domain_transactions
- import logging
- logger = logging.getLogger(__name__)
- router = APIRouter(prefix="/api/domains", tags=["domains"])
- @router.post("/", response_model=MonitoredDomainResponse, status_code=201)
- async def add_domain(
- payload: MonitoredDomainCreate,
- db: AsyncSession = Depends(get_db),
- ):
- """添加需要监控的域名,同时创建对应的超管记录"""
- existing = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.domain == payload.domain)
- )
- if existing.scalar_one_or_none():
- raise HTTPException(status_code=409, detail="域名已在监控中")
- # 如果未指定超管,自动创建一条超管记录
- sa_id = payload.super_admin_id
- if sa_id is None:
- max_id_result = await db.execute(select(func.max(SuperAdmin.id)))
- max_id = max_id_result.scalar() or 0
- new_sa = SuperAdmin(
- id=max_id + 1,
- username=payload.domain,
- nickname=payload.domain,
- remark=payload.remark or None,
- )
- db.add(new_sa)
- await db.flush()
- sa_id = new_sa.id
- record = MonitoredDomain(domain=payload.domain, remark=payload.remark or None, super_admin_id=sa_id)
- db.add(record)
- await db.commit()
- await db.refresh(record)
- return record
- @router.get("/", response_model=list[MonitoredDomainResponse])
- async def list_domains(db: AsyncSession = Depends(get_db)):
- """获取所有已监控的域名列表"""
- result = await db.execute(select(MonitoredDomain))
- return result.scalars().all()
- class MonitoredDomainUpdate(BaseModel):
- """更新域名备注"""
- remark: str = ""
- @router.patch("/{domain_id}", response_model=MonitoredDomainResponse)
- async def update_domain_remark(
- domain_id: int,
- payload: MonitoredDomainUpdate,
- db: AsyncSession = Depends(get_db),
- ):
- """更新域名备注,并同步到关联的超管"""
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise HTTPException(status_code=404, detail="域名不存在")
- record.remark = payload.remark or None
- # 同步到关联的超管
- if record.super_admin_id:
- sa_result = await db.execute(
- select(SuperAdmin).where(SuperAdmin.id == record.super_admin_id)
- )
- sa = sa_result.scalar_one_or_none()
- if sa:
- sa.remark = payload.remark or None
- await db.commit()
- await db.refresh(record)
- return record
- @router.delete("/{domain_id}", status_code=204)
- async def remove_domain(domain_id: int, db: AsyncSession = Depends(get_db)):
- """移除指定 ID 的监控域名"""
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise HTTPException(status_code=404, detail="域名不存在")
- await db.delete(record)
- await db.commit()
- @router.get("/{domain_id}/transactions")
- async def get_domain_transactions(
- domain_id: int,
- fetch_date: str | None = Query(None, description="爬取指定日期,格式 YYYY-MM-DD,不传则查全部"),
- db: AsyncSession = Depends(get_db),
- ):
- """爬取指定域名的监控数据并入库"""
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise HTTPException(status_code=404, detail="域名不存在")
- async def get_domain_transactions(
- domain_id: int,
- fetch_date: str | None = Query(None, description="爬取指定日期,格式 YYYY-MM-DD,不传则查全部"),
- db: AsyncSession = Depends(get_db),
- ):
- """爬取指定域名的监控数据并入库"""
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.id == domain_id)
- )
- record = result.scalar_one_or_none()
- if not record:
- raise HTTPException(status_code=404, detail="域名不存在")
- try:
- data = await fetch_domain_transactions(record.domain, db, fetch_date=fetch_date)
- db.add(FetchLog(domain=record.domain, status="success", message="手动爬取成功"))
- await db.commit()
- return {"status": "ok", "domain": record.domain, "data": data}
- except Exception as e:
- error_msg = str(e)[:500]
- db.add(FetchLog(domain=record.domain, status="failed", message=error_msg))
- await db.commit()
- raise HTTPException(status_code=500, detail=error_msg)
- @router.post("/fetch-all", status_code=202)
- async def fetch_all_transactions(
- fetch_date: str | None = Query(None, description="爬取指定日期,格式 YYYY-MM-DD,不传则爬取当天"),
- db: AsyncSession = Depends(get_db),
- ):
- """批量爬取所有已启用域名的监控数据,默认只爬取当天"""
- if not fetch_date:
- from datetime import datetime, timezone, timedelta
- CST = timezone(timedelta(hours=8))
- fetch_date = datetime.now(CST).strftime("%Y-%m-%d")
- result = await db.execute(
- select(MonitoredDomain).where(MonitoredDomain.is_active == True)
- )
- domains = result.scalars().all()
- errors = []
- for d in domains:
- try:
- await fetch_domain_transactions(d.domain, db, fetch_date=fetch_date)
- db.add(FetchLog(domain=d.domain, status="success", message="手动批量爬取成功"))
- except Exception as e:
- error_msg = str(e)[:500]
- db.add(FetchLog(domain=d.domain, status="failed", message=error_msg))
- errors.append({"domain": d.domain, "error": error_msg})
- await db.commit()
- return {"status": "ok", "total": len(domains), "errors": errors}
- class ScheduleConfigUpdate(BaseModel):
- """更新定时爬取配置"""
- enabled: bool
- schedule_time: str # HH:MM
- @router.get("/schedule")
- async def get_schedule_config(db: AsyncSession = Depends(get_db)):
- """获取定时爬取配置"""
- result = await db.execute(select(FetchScheduleConfig).limit(1))
- config = result.scalar_one_or_none()
- if not config:
- config = FetchScheduleConfig(enabled=False, schedule_time="02:00")
- db.add(config)
- await db.commit()
- await db.refresh(config)
- return {"enabled": config.enabled, "schedule_time": config.schedule_time}
- @router.put("/schedule")
- async def update_schedule_config(
- payload: ScheduleConfigUpdate,
- db: AsyncSession = Depends(get_db),
- ):
- """更新定时爬取配置"""
- result = await db.execute(select(FetchScheduleConfig).limit(1))
- config = result.scalar_one_or_none()
- if not config:
- config = FetchScheduleConfig()
- db.add(config)
- config.enabled = payload.enabled
- config.schedule_time = payload.schedule_time
- await db.commit()
- return {"message": "配置已保存", "enabled": config.enabled, "schedule_time": config.schedule_time}
- @router.get("/fetch-logs")
- async def list_fetch_logs(
- domain: str | None = Query(None, description="按域名筛选"),
- status: str | None = Query(None, description="按状态筛选: success / failed / skipped"),
- page: int = Query(1, ge=1),
- size: int = Query(20, ge=1, le=100),
- db: AsyncSession = Depends(get_db),
- ):
- """获取爬取日志列表"""
- conditions = []
- if domain:
- conditions.append(FetchLog.domain == domain)
- if status:
- conditions.append(FetchLog.status == status)
- count_query = select(func.count()).select_from(FetchLog)
- if conditions:
- count_query = count_query.where(*conditions)
- total = await db.scalar(count_query) or 0
- logs_query = select(FetchLog)
- if conditions:
- logs_query = logs_query.where(*conditions)
- logs_query = logs_query.order_by(desc(FetchLog.created_at)).offset((page - 1) * size).limit(size)
- logs_result = await db.execute(logs_query)
- logs = logs_result.scalars().all()
- return {
- "items": [
- {
- "id": log.id,
- "domain": log.domain,
- "status": log.status,
- "message": log.message,
- "created_at": log.created_at.isoformat() if log.created_at else None,
- }
- for log in logs
- ],
- "total": total,
- "page": page,
- "size": size,
- }
|