scheduler.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from __future__ import annotations
  2. import asyncio
  3. import logging
  4. import os
  5. from datetime import datetime, timedelta, timezone
  6. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  7. from apscheduler.triggers.interval import IntervalTrigger
  8. from app.db import get_pool
  9. from app.services.scraper import ScraperService
  10. logger = logging.getLogger(__name__)
  11. _scheduler = AsyncIOScheduler()
  12. _scraper = ScraperService()
  13. JOB_ID = "auto_scrape"
  14. async def _run_auto_scrape() -> None:
  15. """爬取所有已注册模型。"""
  16. pool = get_pool()
  17. async with pool.acquire() as conn:
  18. cfg = await conn.fetchrow("SELECT * FROM scrape_schedule WHERE id = 1")
  19. if not cfg or not cfg["enabled"]:
  20. return
  21. async with pool.acquire() as conn:
  22. rows = await conn.fetch("SELECT url FROM models ORDER BY created_at")
  23. urls = [r["url"] for r in rows]
  24. if not urls:
  25. logger.info("[scheduler] 没有已注册的模型,跳过")
  26. return
  27. logger.info(f"[scheduler] 开始自动爬取 {len(urls)} 个模型")
  28. job_id_row = await pool.fetchrow(
  29. "INSERT INTO scrape_jobs (urls, status) VALUES ($1, 'pending') RETURNING id",
  30. urls,
  31. )
  32. asyncio.create_task(_scraper.run_job(str(job_id_row["id"]), urls, pool))
  33. def _reschedule(interval_days: int, start_hour: int) -> None:
  34. if _scheduler.get_job(JOB_ID):
  35. _scheduler.remove_job(JOB_ID)
  36. # start_hour 是用户输入的本地时间(UTC+8),转换为 UTC
  37. tz_offset = int(os.environ.get("TZ_OFFSET_HOURS", "8"))
  38. start_hour_utc = (start_hour - tz_offset) % 24
  39. now = datetime.now(timezone.utc)
  40. next_run = now.replace(hour=start_hour_utc, minute=0, second=0, microsecond=0)
  41. if next_run <= now:
  42. next_run += timedelta(days=1)
  43. _scheduler.add_job(
  44. _run_auto_scrape,
  45. trigger=IntervalTrigger(days=interval_days, start_date=next_run),
  46. id=JOB_ID,
  47. replace_existing=True,
  48. )
  49. logger.info(f"[scheduler] 已设置每 {interval_days} 天本地 {start_hour}:00 (UTC {start_hour_utc}:00) 执行,下次: {next_run}")
  50. async def start_scheduler() -> None:
  51. pool = get_pool()
  52. async with pool.acquire() as conn:
  53. cfg = await conn.fetchrow("SELECT * FROM scrape_schedule WHERE id = 1")
  54. interval = cfg["interval_days"] if cfg else 1
  55. start_hour = cfg["start_hour"] if cfg else 2
  56. _reschedule(interval, start_hour)
  57. _scheduler.start()
  58. logger.info("[scheduler] 调度器已启动")
  59. async def stop_scheduler() -> None:
  60. if _scheduler.running:
  61. _scheduler.shutdown(wait=False)
  62. def get_scheduler() -> AsyncIOScheduler:
  63. return _scheduler
  64. def reschedule(interval_days: int, start_hour: int) -> None:
  65. _reschedule(interval_days, start_hour)