|
|
@@ -1,7 +1,7 @@
|
|
|
import asyncio
|
|
|
import logging
|
|
|
from contextlib import asynccontextmanager
|
|
|
-from datetime import datetime, timezone
|
|
|
+from datetime import datetime, timezone, timedelta
|
|
|
|
|
|
from fastapi import FastAPI
|
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
|
@@ -11,12 +11,18 @@ from app.routers import domains, monitoring, license as license_router
|
|
|
from app.routers.license import public_router as public_license_router
|
|
|
from app.database import async_session
|
|
|
from app.services.license import _notify_on_expired, _notify_on_warning
|
|
|
-from sqlalchemy import select, text
|
|
|
+from app.services.domain_fetch import fetch_domain_transactions
|
|
|
from app.models.license import SuperAdminLicense
|
|
|
+from app.models.domain import MonitoredDomain
|
|
|
+from app.redis import get_redis, close_redis
|
|
|
+from sqlalchemy import select, text
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
+CST = timezone(timedelta(hours=8))
|
|
|
+
|
|
|
+
|
|
|
async def _check_licenses():
|
|
|
"""定期检查 License:过期状态更新 + 7 天预警"""
|
|
|
while True:
|
|
|
@@ -35,12 +41,10 @@ async def _check_licenses():
|
|
|
await session.commit()
|
|
|
await _notify_on_expired(session, lic)
|
|
|
|
|
|
- # 2. 检查剩余 7 天预警(未发送过的)
|
|
|
- from datetime import timedelta
|
|
|
+ # 2. 检查剩余 7 天预警(_notify_on_warning 内部通过 Redis 去重)
|
|
|
warning_result = await session.execute(
|
|
|
select(SuperAdminLicense).where(
|
|
|
SuperAdminLicense.status == "active",
|
|
|
- SuperAdminLicense.warning_sent == False,
|
|
|
SuperAdminLicense.expires_at > text("NOW()"),
|
|
|
SuperAdminLicense.expires_at <= text("NOW() + INTERVAL '7 days'"),
|
|
|
)
|
|
|
@@ -49,15 +53,47 @@ async def _check_licenses():
|
|
|
days_left = (lic.expires_at - datetime.now(timezone.utc)).days
|
|
|
logger.info("检测到 License #%d 剩余 %d 天,发送预警短信", lic.id, days_left)
|
|
|
await _notify_on_warning(session, lic, days_left)
|
|
|
- lic.warning_sent = True
|
|
|
- await session.commit()
|
|
|
except Exception:
|
|
|
logger.exception("定时检查 License 异常")
|
|
|
|
|
|
- # 每 24 小时检查一次
|
|
|
await asyncio.sleep(24 * 3600)
|
|
|
|
|
|
|
|
|
+async def _daily_fetch():
|
|
|
+ """定时爬取:每分钟检查 Redis 配置,到了目标时间就爬取当天流水"""
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ r = await get_redis()
|
|
|
+ enabled = await r.get("fetch_schedule:enabled")
|
|
|
+ schedule_time = await r.get("fetch_schedule:time")
|
|
|
+
|
|
|
+ if enabled == "true" and schedule_time:
|
|
|
+ h, m = map(int, schedule_time.split(":"))
|
|
|
+ now = datetime.now(CST)
|
|
|
+ today = now.strftime("%Y-%m-%d")
|
|
|
+ last_fetch = await r.get("fetch:last_date")
|
|
|
+
|
|
|
+ # 已过目标时间且今天还没爬过
|
|
|
+ if now.hour * 60 + now.minute >= h * 60 + m and last_fetch != today:
|
|
|
+ logger.info("开始定时爬取当天流水: %s", today)
|
|
|
+ async with async_session() as session:
|
|
|
+ domain_result = await session.execute(
|
|
|
+ select(MonitoredDomain).where(MonitoredDomain.is_active == True)
|
|
|
+ )
|
|
|
+ for d in domain_result.scalars().all():
|
|
|
+ try:
|
|
|
+ await fetch_domain_transactions(d.domain, session, fetch_date=today)
|
|
|
+ logger.info("域名 %s 当天流水爬取完成", d.domain)
|
|
|
+ except Exception:
|
|
|
+ logger.exception("域名 %s 当天流水爬取失败", d.domain)
|
|
|
+ await r.set("fetch:last_date", today)
|
|
|
+ logger.info("当天定时爬取全部完成")
|
|
|
+ except Exception:
|
|
|
+ logger.exception("定时爬取异常")
|
|
|
+
|
|
|
+ await asyncio.sleep(60)
|
|
|
+
|
|
|
+
|
|
|
@asynccontextmanager
|
|
|
async def lifespan(app: FastAPI):
|
|
|
# 启动时立即检查一次
|
|
|
@@ -76,12 +112,11 @@ async def lifespan(app: FastAPI):
|
|
|
await session.commit()
|
|
|
await _notify_on_expired(session, lic)
|
|
|
|
|
|
- # 2. 检查剩余 7 天预警(未发送过的)
|
|
|
+ # 2. 检查剩余 7 天预警(_notify_on_warning 内部通过 Redis 去重)
|
|
|
from datetime import timedelta
|
|
|
warning_result = await session.execute(
|
|
|
select(SuperAdminLicense).where(
|
|
|
SuperAdminLicense.status == "active",
|
|
|
- SuperAdminLicense.warning_sent == False,
|
|
|
SuperAdminLicense.expires_at > text("NOW()"),
|
|
|
SuperAdminLicense.expires_at <= text("NOW() + INTERVAL '7 days'"),
|
|
|
)
|
|
|
@@ -90,22 +125,24 @@ async def lifespan(app: FastAPI):
|
|
|
days_left = (lic.expires_at - datetime.now(timezone.utc)).days
|
|
|
logger.info("启动时检测到 License #%d 剩余 %d 天,发送预警短信", lic.id, days_left)
|
|
|
await _notify_on_warning(session, lic, days_left)
|
|
|
- lic.warning_sent = True
|
|
|
- await session.commit()
|
|
|
|
|
|
logger.info("启动检查完成")
|
|
|
except Exception:
|
|
|
logger.exception("启动时检查 License 异常")
|
|
|
|
|
|
# 启动后台定时任务
|
|
|
- task = asyncio.create_task(_check_licenses())
|
|
|
- logger.info("后台任务已启动:每 24 小时检查一次 License")
|
|
|
+ license_task = asyncio.create_task(_check_licenses())
|
|
|
+ fetch_task = asyncio.create_task(_daily_fetch())
|
|
|
+ logger.info("后台任务已启动:License 检查 + 定时爬取")
|
|
|
yield
|
|
|
- task.cancel()
|
|
|
- try:
|
|
|
- await task
|
|
|
- except asyncio.CancelledError:
|
|
|
- pass
|
|
|
+ license_task.cancel()
|
|
|
+ fetch_task.cancel()
|
|
|
+ for task in (license_task, fetch_task):
|
|
|
+ try:
|
|
|
+ await task
|
|
|
+ except asyncio.CancelledError:
|
|
|
+ pass
|
|
|
+ await close_redis()
|
|
|
|
|
|
|
|
|
app = FastAPI(
|