| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- from __future__ import annotations
- import asyncio
- import logging
- from datetime import datetime, timedelta, timezone
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.interval import IntervalTrigger
- from app.db import get_pool
- from app.services.scraper import ScraperService
- logger = logging.getLogger(__name__)
- _scheduler = AsyncIOScheduler()
- _scraper = ScraperService()
- JOB_ID = "auto_scrape"
- async def _run_auto_scrape() -> None:
- """爬取所有已注册模型。"""
- pool = get_pool()
- async with pool.acquire() as conn:
- cfg = await conn.fetchrow("SELECT * FROM scrape_schedule WHERE id = 1")
- if not cfg or not cfg["enabled"]:
- return
- async with pool.acquire() as conn:
- rows = await conn.fetch("SELECT url FROM models ORDER BY created_at")
- urls = [r["url"] for r in rows]
- if not urls:
- logger.info("[scheduler] 没有已注册的模型,跳过")
- return
- logger.info(f"[scheduler] 开始自动爬取 {len(urls)} 个模型")
- job_id_row = await pool.fetchrow(
- "INSERT INTO scrape_jobs (urls, status) VALUES ($1, 'pending') RETURNING id",
- urls,
- )
- asyncio.create_task(_scraper.run_job(str(job_id_row["id"]), urls, pool))
- def _reschedule(interval_days: int, start_hour: int) -> None:
- if _scheduler.get_job(JOB_ID):
- _scheduler.remove_job(JOB_ID)
- now = datetime.now(timezone.utc)
- next_run = now.replace(hour=start_hour, minute=0, second=0, microsecond=0)
- if next_run <= now:
- next_run += timedelta(days=1)
- _scheduler.add_job(
- _run_auto_scrape,
- trigger=IntervalTrigger(days=interval_days, start_date=next_run),
- id=JOB_ID,
- replace_existing=True,
- )
- logger.info(f"[scheduler] 已设置每 {interval_days} 天 {start_hour}:00 执行,下次: {next_run}")
- async def start_scheduler() -> None:
- pool = get_pool()
- async with pool.acquire() as conn:
- cfg = await conn.fetchrow("SELECT * FROM scrape_schedule WHERE id = 1")
- interval = cfg["interval_days"] if cfg else 1
- start_hour = cfg["start_hour"] if cfg else 2
- _reschedule(interval, start_hour)
- _scheduler.start()
- logger.info("[scheduler] 调度器已启动")
- async def stop_scheduler() -> None:
- if _scheduler.running:
- _scheduler.shutdown(wait=False)
- def get_scheduler() -> AsyncIOScheduler:
- return _scheduler
- def reschedule(interval_days: int, start_hour: int) -> None:
- _reschedule(interval_days, start_hour)
|