main.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. import asyncio
  2. import logging
  3. from contextlib import asynccontextmanager
  4. from datetime import datetime, timezone, timedelta
  5. from fastapi import FastAPI
  6. from fastapi.middleware.cors import CORSMiddleware
  7. from app.config import settings
  8. from app.routers import domains, monitoring, license as license_router
  9. from app.routers.license import public_router as public_license_router
  10. from app.database import async_session
  11. from app.services.license import _notify_on_expired, _notify_on_warning
  12. from app.services.domain_fetch import fetch_domain_transactions
  13. from app.models.license import SuperAdminLicense
  14. from app.models.domain import MonitoredDomain
  15. from app.redis import get_redis, close_redis
  16. from sqlalchemy import select, text
  17. logger = logging.getLogger(__name__)
  18. CST = timezone(timedelta(hours=8))
  19. async def _check_licenses():
  20. """定期检查 License:过期状态更新 + 7 天预警"""
  21. while True:
  22. try:
  23. async with async_session() as session:
  24. # 1. 检查已过期
  25. expired_result = await session.execute(
  26. select(SuperAdminLicense).where(
  27. SuperAdminLicense.status == "active",
  28. SuperAdminLicense.expires_at <= text("NOW()"),
  29. )
  30. )
  31. for lic in expired_result.scalars().all():
  32. logger.info("检测到 License #%d 已过期,更新状态并发送预警短信", lic.id)
  33. lic.status = "expired"
  34. await session.commit()
  35. await _notify_on_expired(session, lic)
  36. # 2. 检查剩余 7 天预警(_notify_on_warning 内部通过 Redis 去重)
  37. warning_result = await session.execute(
  38. select(SuperAdminLicense).where(
  39. SuperAdminLicense.status == "active",
  40. SuperAdminLicense.expires_at > text("NOW()"),
  41. SuperAdminLicense.expires_at <= text("NOW() + INTERVAL '7 days'"),
  42. )
  43. )
  44. for lic in warning_result.scalars().all():
  45. days_left = (lic.expires_at - datetime.now(timezone.utc)).days
  46. logger.info("检测到 License #%d 剩余 %d 天,发送预警短信", lic.id, days_left)
  47. await _notify_on_warning(session, lic, days_left)
  48. except Exception:
  49. logger.exception("定时检查 License 异常")
  50. await asyncio.sleep(24 * 3600)
  51. async def _daily_fetch():
  52. """定时爬取:每分钟检查 Redis 配置,到了目标时间就爬取当天流水"""
  53. while True:
  54. try:
  55. r = await get_redis()
  56. enabled = await r.get("fetch_schedule:enabled")
  57. schedule_time = await r.get("fetch_schedule:time")
  58. if enabled == "true" and schedule_time:
  59. h, m = map(int, schedule_time.split(":"))
  60. now = datetime.now(CST)
  61. today = now.strftime("%Y-%m-%d")
  62. last_fetch = await r.get("fetch:last_date")
  63. # 已过目标时间且今天还没爬过
  64. if now.hour * 60 + now.minute >= h * 60 + m and last_fetch != today:
  65. logger.info("开始定时爬取当天流水: %s", today)
  66. async with async_session() as session:
  67. domain_result = await session.execute(
  68. select(MonitoredDomain).where(MonitoredDomain.is_active == True)
  69. )
  70. for d in domain_result.scalars().all():
  71. try:
  72. await fetch_domain_transactions(d.domain, session, fetch_date=today)
  73. logger.info("域名 %s 当天流水爬取完成", d.domain)
  74. except Exception:
  75. logger.exception("域名 %s 当天流水爬取失败", d.domain)
  76. await r.set("fetch:last_date", today)
  77. logger.info("当天定时爬取全部完成")
  78. except Exception:
  79. logger.exception("定时爬取异常")
  80. await asyncio.sleep(60)
  81. @asynccontextmanager
  82. async def lifespan(app: FastAPI):
  83. # 启动时立即检查一次
  84. try:
  85. async with async_session() as session:
  86. # 1. 检查已过期
  87. expired_result = await session.execute(
  88. select(SuperAdminLicense).where(
  89. SuperAdminLicense.status == "active",
  90. SuperAdminLicense.expires_at <= text("NOW()"),
  91. )
  92. )
  93. for lic in expired_result.scalars().all():
  94. logger.info("启动时检测到 License #%d 已过期,更新状态并发送预警短信", lic.id)
  95. lic.status = "expired"
  96. await session.commit()
  97. await _notify_on_expired(session, lic)
  98. # 2. 检查剩余 7 天预警(_notify_on_warning 内部通过 Redis 去重)
  99. from datetime import timedelta
  100. warning_result = await session.execute(
  101. select(SuperAdminLicense).where(
  102. SuperAdminLicense.status == "active",
  103. SuperAdminLicense.expires_at > text("NOW()"),
  104. SuperAdminLicense.expires_at <= text("NOW() + INTERVAL '7 days'"),
  105. )
  106. )
  107. for lic in warning_result.scalars().all():
  108. days_left = (lic.expires_at - datetime.now(timezone.utc)).days
  109. logger.info("启动时检测到 License #%d 剩余 %d 天,发送预警短信", lic.id, days_left)
  110. await _notify_on_warning(session, lic, days_left)
  111. logger.info("启动检查完成")
  112. except Exception:
  113. logger.exception("启动时检查 License 异常")
  114. # 启动后台定时任务
  115. license_task = asyncio.create_task(_check_licenses())
  116. fetch_task = asyncio.create_task(_daily_fetch())
  117. logger.info("后台任务已启动:License 检查 + 定时爬取")
  118. yield
  119. license_task.cancel()
  120. fetch_task.cancel()
  121. for task in (license_task, fetch_task):
  122. try:
  123. await task
  124. except asyncio.CancelledError:
  125. pass
  126. await close_redis()
  127. app = FastAPI(
  128. title="域名流水监控",
  129. version="0.1.0",
  130. debug=settings.debug,
  131. lifespan=lifespan,
  132. )
  133. # CORS 配置,允许前端 Vite 开发服务器访问
  134. app.add_middleware(
  135. CORSMiddleware,
  136. allow_origins=["http://localhost:5173"],
  137. allow_credentials=True,
  138. allow_methods=["*"],
  139. allow_headers=["*"],
  140. )
  141. # 注册路由
  142. app.include_router(domains.router)
  143. app.include_router(monitoring.router)
  144. app.include_router(license_router.router)
  145. app.include_router(public_license_router)
  146. @app.get("/health")
  147. async def health():
  148. """健康检查接口"""
  149. return {"status": "ok"}