import asyncio import logging from contextlib import asynccontextmanager from datetime import datetime, timezone, timedelta from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.config import settings from app.routers import domains, monitoring, license as license_router from app.routers.license import public_router as public_license_router from app.database import async_session from app.services.license import _notify_on_expired, _notify_on_warning from app.services.domain_fetch import fetch_domain_transactions from app.models.license import SuperAdminLicense from app.models.monitoring import FetchScheduleConfig, FetchLog from app.models.domain import MonitoredDomain from app.redis import close_redis from sqlalchemy import select, text logger = logging.getLogger(__name__) CST = timezone(timedelta(hours=8)) async def _check_licenses(): """定期检查 License:过期状态更新 + 7 天预警""" while True: try: async with async_session() as session: # 1. 检查已过期 expired_result = await session.execute( select(SuperAdminLicense).where( SuperAdminLicense.status == "active", SuperAdminLicense.expires_at <= text("NOW()"), ) ) for lic in expired_result.scalars().all(): logger.info("检测到 License #%d 已过期,更新状态并发送预警短信", lic.id) lic.status = "expired" await session.commit() await _notify_on_expired(session, lic) # 2. 检查剩余 7 天预警(未发送过的) warning_result = await session.execute( select(SuperAdminLicense).where( SuperAdminLicense.status == "active", SuperAdminLicense.warning_sent == False, SuperAdminLicense.expires_at > text("NOW()"), SuperAdminLicense.expires_at <= text("NOW() + INTERVAL '7 days'"), ) ) for lic in warning_result.scalars().all(): days_left = (lic.expires_at - datetime.now(timezone.utc)).days logger.info("检测到 License #%d 剩余 %d 天,发送预警短信", lic.id, days_left) await _notify_on_warning(session, lic, days_left) lic.warning_sent = True await session.commit() except Exception: logger.exception("定时检查 License 异常") await asyncio.sleep(24 * 3600) async def _daily_fetch(): """定时爬取:每分钟检查 DB 配置,到了目标时间就爬取当天流水""" last_fetch_date = None while True: try: async with async_session() as session: result = await session.execute(select(FetchScheduleConfig).limit(1)) config = result.scalar_one_or_none() if config and config.enabled: h, m = map(int, config.schedule_time.split(":")) now = datetime.now(CST) today = now.strftime("%Y-%m-%d") # 已过目标时间且今天还没爬过 if now.hour * 60 + now.minute >= h * 60 + m and last_fetch_date != today: logger.info("开始定时爬取当天流水: %s", today) # 查询今日已失败的域名(跳过) failed_result = await session.execute( select(FetchLog.domain).where( FetchLog.status == "failed", FetchLog.created_at >= text(f"'{today} 00:00:00+08'"), ).distinct() ) failed_domains = {r[0] for r in failed_result.all()} domain_result = await session.execute( select(MonitoredDomain).where(MonitoredDomain.is_active == True) ) for d in domain_result.scalars().all(): if d.domain in failed_domains: logger.info("域名 %s 今日已失败,跳过", d.domain) continue try: await fetch_domain_transactions(d.domain, session, fetch_date=today) logger.info("域名 %s 当天流水爬取完成", d.domain) session.add(FetchLog( domain=d.domain, status="success", message="当天流水爬取完成" )) except Exception as e: error_msg = str(e)[:500] logger.exception("域名 %s 当天流水爬取失败", d.domain) session.add(FetchLog( domain=d.domain, status="failed", message=error_msg )) await session.commit() last_fetch_date = today logger.info("当天定时爬取全部完成") except Exception: logger.exception("定时爬取异常") await asyncio.sleep(60) @asynccontextmanager async def lifespan(app: FastAPI): # 启动时立即检查一次 try: async with async_session() as session: # 1. 检查已过期 expired_result = await session.execute( select(SuperAdminLicense).where( SuperAdminLicense.status == "active", SuperAdminLicense.expires_at <= text("NOW()"), ) ) for lic in expired_result.scalars().all(): logger.info("启动时检测到 License #%d 已过期,更新状态并发送预警短信", lic.id) lic.status = "expired" await session.commit() await _notify_on_expired(session, lic) # 2. 检查剩余 7 天预警(未发送过的) from datetime import timedelta warning_result = await session.execute( select(SuperAdminLicense).where( SuperAdminLicense.status == "active", SuperAdminLicense.warning_sent == False, SuperAdminLicense.expires_at > text("NOW()"), SuperAdminLicense.expires_at <= text("NOW() + INTERVAL '7 days'"), ) ) for lic in warning_result.scalars().all(): days_left = (lic.expires_at - datetime.now(timezone.utc)).days logger.info("启动时检测到 License #%d 剩余 %d 天,发送预警短信", lic.id, days_left) await _notify_on_warning(session, lic, days_left) lic.warning_sent = True await session.commit() logger.info("启动检查完成") except Exception: logger.exception("启动时检查 License 异常") # 启动后台定时任务 license_task = asyncio.create_task(_check_licenses()) fetch_task = asyncio.create_task(_daily_fetch()) logger.info("后台任务已启动:License 检查 + 定时爬取") yield license_task.cancel() fetch_task.cancel() for task in (license_task, fetch_task): try: await task except asyncio.CancelledError: pass await close_redis() app = FastAPI( title="域名流水监控", version="0.1.0", debug=settings.debug, lifespan=lifespan, ) # CORS 配置,允许前端 Vite 开发服务器访问 app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 注册路由 app.include_router(domains.router) app.include_router(monitoring.router) app.include_router(license_router.router) app.include_router(public_license_router) @app.get("/health") async def health(): """健康检查接口""" return {"status": "ok"}