scheduler.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. from __future__ import annotations
  2. import asyncio
  3. import logging
  4. from datetime import datetime, timedelta, timezone
  5. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  6. from apscheduler.triggers.interval import IntervalTrigger
  7. from app.db import get_pool
  8. from app.services.scraper import ScraperService
  9. logger = logging.getLogger(__name__)
  10. _scheduler = AsyncIOScheduler()
  11. _scraper = ScraperService()
  12. JOB_ID = "auto_scrape"
  13. async def _run_auto_scrape() -> None:
  14. """爬取所有已注册模型。"""
  15. pool = get_pool()
  16. async with pool.acquire() as conn:
  17. cfg = await conn.fetchrow("SELECT * FROM scrape_schedule WHERE id = 1")
  18. if not cfg or not cfg["enabled"]:
  19. return
  20. async with pool.acquire() as conn:
  21. rows = await conn.fetch("SELECT url FROM models ORDER BY created_at")
  22. urls = [r["url"] for r in rows]
  23. if not urls:
  24. logger.info("[scheduler] 没有已注册的模型,跳过")
  25. return
  26. logger.info(f"[scheduler] 开始自动爬取 {len(urls)} 个模型")
  27. job_id_row = await pool.fetchrow(
  28. "INSERT INTO scrape_jobs (urls, status) VALUES ($1, 'pending') RETURNING id",
  29. urls,
  30. )
  31. asyncio.create_task(_scraper.run_job(str(job_id_row["id"]), urls, pool))
  32. def _reschedule(interval_days: int, start_hour: int) -> None:
  33. if _scheduler.get_job(JOB_ID):
  34. _scheduler.remove_job(JOB_ID)
  35. now = datetime.now(timezone.utc)
  36. next_run = now.replace(hour=start_hour, minute=0, second=0, microsecond=0)
  37. if next_run <= now:
  38. next_run += timedelta(days=1)
  39. _scheduler.add_job(
  40. _run_auto_scrape,
  41. trigger=IntervalTrigger(days=interval_days, start_date=next_run),
  42. id=JOB_ID,
  43. replace_existing=True,
  44. )
  45. logger.info(f"[scheduler] 已设置每 {interval_days} 天 {start_hour}:00 执行,下次: {next_run}")
  46. async def start_scheduler() -> None:
  47. pool = get_pool()
  48. async with pool.acquire() as conn:
  49. cfg = await conn.fetchrow("SELECT * FROM scrape_schedule WHERE id = 1")
  50. interval = cfg["interval_days"] if cfg else 1
  51. start_hour = cfg["start_hour"] if cfg else 2
  52. _reschedule(interval, start_hour)
  53. _scheduler.start()
  54. logger.info("[scheduler] 调度器已启动")
  55. async def stop_scheduler() -> None:
  56. if _scheduler.running:
  57. _scheduler.shutdown(wait=False)
  58. def get_scheduler() -> AsyncIOScheduler:
  59. return _scheduler
  60. def reschedule(interval_days: int, start_hour: int) -> None:
  61. _reschedule(interval_days, start_hour)