| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- import asyncio
- import logging
- from contextlib import asynccontextmanager
- from datetime import datetime, timezone, timedelta
- from fastapi import FastAPI
- from fastapi.middleware.cors import CORSMiddleware
- from app.config import settings
- from app.routers import domains, monitoring, license as license_router
- from app.routers.license import public_router as public_license_router
- from app.database import async_session
- from app.services.license import _notify_on_expired, _notify_on_warning
- from app.services.domain_fetch import fetch_domain_transactions
- from app.models.license import SuperAdminLicense
- from app.models.domain import MonitoredDomain
- from app.redis import get_redis, close_redis
- from sqlalchemy import select, text
- logger = logging.getLogger(__name__)
- CST = timezone(timedelta(hours=8))
- async def _check_licenses():
- """定期检查 License:过期状态更新 + 7 天预警"""
- while True:
- try:
- async with async_session() as session:
- # 1. 检查已过期
- expired_result = await session.execute(
- select(SuperAdminLicense).where(
- SuperAdminLicense.status == "active",
- SuperAdminLicense.expires_at <= text("NOW()"),
- )
- )
- for lic in expired_result.scalars().all():
- logger.info("检测到 License #%d 已过期,更新状态并发送预警短信", lic.id)
- lic.status = "expired"
- await session.commit()
- await _notify_on_expired(session, lic)
- # 2. 检查剩余 7 天预警(_notify_on_warning 内部通过 Redis 去重)
- warning_result = await session.execute(
- select(SuperAdminLicense).where(
- SuperAdminLicense.status == "active",
- SuperAdminLicense.expires_at > text("NOW()"),
- SuperAdminLicense.expires_at <= text("NOW() + INTERVAL '7 days'"),
- )
- )
- for lic in warning_result.scalars().all():
- days_left = (lic.expires_at - datetime.now(timezone.utc)).days
- logger.info("检测到 License #%d 剩余 %d 天,发送预警短信", lic.id, days_left)
- await _notify_on_warning(session, lic, days_left)
- except Exception:
- logger.exception("定时检查 License 异常")
- await asyncio.sleep(24 * 3600)
- async def _daily_fetch():
- """定时爬取:每分钟检查 Redis 配置,到了目标时间就爬取当天流水"""
- while True:
- try:
- r = await get_redis()
- enabled = await r.get("fetch_schedule:enabled")
- schedule_time = await r.get("fetch_schedule:time")
- if enabled == "true" and schedule_time:
- h, m = map(int, schedule_time.split(":"))
- now = datetime.now(CST)
- today = now.strftime("%Y-%m-%d")
- last_fetch = await r.get("fetch:last_date")
- # 已过目标时间且今天还没爬过
- if now.hour * 60 + now.minute >= h * 60 + m and last_fetch != today:
- logger.info("开始定时爬取当天流水: %s", today)
- async with async_session() as session:
- domain_result = await session.execute(
- select(MonitoredDomain).where(MonitoredDomain.is_active == True)
- )
- for d in domain_result.scalars().all():
- try:
- await fetch_domain_transactions(d.domain, session, fetch_date=today)
- logger.info("域名 %s 当天流水爬取完成", d.domain)
- except Exception:
- logger.exception("域名 %s 当天流水爬取失败", d.domain)
- await r.set("fetch:last_date", today)
- logger.info("当天定时爬取全部完成")
- except Exception:
- logger.exception("定时爬取异常")
- await asyncio.sleep(60)
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- # 启动时立即检查一次
- try:
- async with async_session() as session:
- # 1. 检查已过期
- expired_result = await session.execute(
- select(SuperAdminLicense).where(
- SuperAdminLicense.status == "active",
- SuperAdminLicense.expires_at <= text("NOW()"),
- )
- )
- for lic in expired_result.scalars().all():
- logger.info("启动时检测到 License #%d 已过期,更新状态并发送预警短信", lic.id)
- lic.status = "expired"
- await session.commit()
- await _notify_on_expired(session, lic)
- # 2. 检查剩余 7 天预警(_notify_on_warning 内部通过 Redis 去重)
- from datetime import timedelta
- warning_result = await session.execute(
- select(SuperAdminLicense).where(
- SuperAdminLicense.status == "active",
- SuperAdminLicense.expires_at > text("NOW()"),
- SuperAdminLicense.expires_at <= text("NOW() + INTERVAL '7 days'"),
- )
- )
- for lic in warning_result.scalars().all():
- days_left = (lic.expires_at - datetime.now(timezone.utc)).days
- logger.info("启动时检测到 License #%d 剩余 %d 天,发送预警短信", lic.id, days_left)
- await _notify_on_warning(session, lic, days_left)
- logger.info("启动检查完成")
- except Exception:
- logger.exception("启动时检查 License 异常")
- # 启动后台定时任务
- license_task = asyncio.create_task(_check_licenses())
- fetch_task = asyncio.create_task(_daily_fetch())
- logger.info("后台任务已启动:License 检查 + 定时爬取")
- yield
- license_task.cancel()
- fetch_task.cancel()
- for task in (license_task, fetch_task):
- try:
- await task
- except asyncio.CancelledError:
- pass
- await close_redis()
- app = FastAPI(
- title="域名流水监控",
- version="0.1.0",
- debug=settings.debug,
- lifespan=lifespan,
- )
- # CORS 配置,允许前端 Vite 开发服务器访问
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["http://localhost:5173"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # 注册路由
- app.include_router(domains.router)
- app.include_router(monitoring.router)
- app.include_router(license_router.router)
- app.include_router(public_license_router)
- @app.get("/health")
- async def health():
- """健康检查接口"""
- return {"status": "ok"}
|