| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- import asyncio
- import logging
- from contextlib import asynccontextmanager
- from datetime import datetime, timezone, timedelta
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
- from app.config import settings
- from app.routers import domains, monitoring, license as license_router
- from app.routers.license import public_router as public_license_router
- from app.database import async_session
- from app.services.license import _notify_on_expired, _notify_on_warning
- from app.services.domain_fetch import fetch_domain_transactions
- from app.models.license import SuperAdminLicense
- from app.models.monitoring import FetchScheduleConfig, FetchLog
- from app.models.domain import MonitoredDomain
- from app.redis import close_redis
- from sqlalchemy import select, text
- logger = logging.getLogger(__name__)
- CST = timezone(timedelta(hours=8))
- async def _check_licenses():
- """定期检查 License:过期状态更新 + 7 天预警"""
- while True:
- try:
- async with async_session() as session:
- # 1. 检查已过期
- expired_result = await session.execute(
- select(SuperAdminLicense).where(
- SuperAdminLicense.status == "active",
- SuperAdminLicense.expires_at <= text("NOW()"),
- )
- )
- for lic in expired_result.scalars().all():
- logger.info("检测到 License #%d 已过期,更新状态并发送预警短信", lic.id)
- lic.status = "expired"
- await session.commit()
- await _notify_on_expired(session, lic)
- # 2. 检查剩余 7 天预警(未发送过的)
- warning_result = await session.execute(
- select(SuperAdminLicense).where(
- SuperAdminLicense.status == "active",
- SuperAdminLicense.warning_sent == False,
- SuperAdminLicense.expires_at > text("NOW()"),
- SuperAdminLicense.expires_at <= text("NOW() + INTERVAL '7 days'"),
- )
- )
- for lic in warning_result.scalars().all():
- days_left = (lic.expires_at - datetime.now(timezone.utc)).days
- logger.info("检测到 License #%d 剩余 %d 天,发送预警短信", lic.id, days_left)
- await _notify_on_warning(session, lic, days_left)
- lic.warning_sent = True
- await session.commit()
- except Exception:
- logger.exception("定时检查 License 异常")
- await asyncio.sleep(24 * 3600)
- async def _daily_fetch():
- """定时爬取:每分钟检查 DB 配置,到了目标时间就爬取当天流水"""
- last_fetch_date = None
- while True:
- try:
- async with async_session() as session:
- result = await session.execute(select(FetchScheduleConfig).limit(1))
- config = result.scalar_one_or_none()
- if config and config.enabled:
- h, m = map(int, config.schedule_time.split(":"))
- now = datetime.now(CST)
- today = now.strftime("%Y-%m-%d")
- # 已过目标时间且今天还没爬过
- if now.hour * 60 + now.minute >= h * 60 + m and last_fetch_date != today:
- logger.info("开始定时爬取当天流水: %s", today)
- # 查询今日已失败的域名(跳过)
- failed_result = await session.execute(
- select(FetchLog.domain).where(
- FetchLog.status == "failed",
- FetchLog.created_at >= text(f"'{today} 00:00:00+08'"),
- ).distinct()
- )
- failed_domains = {r[0] for r in failed_result.all()}
- domain_result = await session.execute(
- select(MonitoredDomain).where(MonitoredDomain.is_active == True)
- )
- for d in domain_result.scalars().all():
- if d.domain in failed_domains:
- logger.info("域名 %s 今日已失败,跳过", d.domain)
- continue
- try:
- await fetch_domain_transactions(d.domain, session, fetch_date=today)
- logger.info("域名 %s 当天流水爬取完成", d.domain)
- session.add(FetchLog(
- domain=d.domain, status="success", message="当天流水爬取完成"
- ))
- except Exception as e:
- error_msg = str(e)[:500]
- logger.exception("域名 %s 当天流水爬取失败", d.domain)
- session.add(FetchLog(
- domain=d.domain, status="failed", message=error_msg
- ))
- await session.commit()
- last_fetch_date = today
- logger.info("当天定时爬取全部完成")
- except Exception:
- logger.exception("定时爬取异常")
- await asyncio.sleep(60)
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- # 启动时立即检查一次
- try:
- async with async_session() as session:
- # 1. 检查已过期
- expired_result = await session.execute(
- select(SuperAdminLicense).where(
- SuperAdminLicense.status == "active",
- SuperAdminLicense.expires_at <= text("NOW()"),
- )
- )
- for lic in expired_result.scalars().all():
- logger.info("启动时检测到 License #%d 已过期,更新状态并发送预警短信", lic.id)
- lic.status = "expired"
- await session.commit()
- await _notify_on_expired(session, lic)
- # 2. 检查剩余 7 天预警(未发送过的)
- from datetime import timedelta
- warning_result = await session.execute(
- select(SuperAdminLicense).where(
- SuperAdminLicense.status == "active",
- SuperAdminLicense.warning_sent == False,
- SuperAdminLicense.expires_at > text("NOW()"),
- SuperAdminLicense.expires_at <= text("NOW() + INTERVAL '7 days'"),
- )
- )
- for lic in warning_result.scalars().all():
- days_left = (lic.expires_at - datetime.now(timezone.utc)).days
- logger.info("启动时检测到 License #%d 剩余 %d 天,发送预警短信", lic.id, days_left)
- await _notify_on_warning(session, lic, days_left)
- lic.warning_sent = True
- await session.commit()
- logger.info("启动检查完成")
- except Exception:
- logger.exception("启动时检查 License 异常")
- # 启动后台定时任务
- license_task = asyncio.create_task(_check_licenses())
- fetch_task = asyncio.create_task(_daily_fetch())
- logger.info("后台任务已启动:License 检查 + 定时爬取")
- yield
- license_task.cancel()
- fetch_task.cancel()
- for task in (license_task, fetch_task):
- try:
- await task
- except asyncio.CancelledError:
- pass
- await close_redis()
- app = FastAPI(
- title="域名流水监控",
- version="0.1.0",
- debug=settings.debug,
- lifespan=lifespan,
- )
- # CORS 配置,允许前端 Vite 开发服务器访问
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["http://localhost:5173"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # 注册路由
- app.include_router(domains.router)
- app.include_router(monitoring.router)
- app.include_router(license_router.router)
- app.include_router(public_license_router)
- @app.get("/health")
- async def health():
- """健康检查接口"""
- return {"status": "ok"}
|