from __future__ import annotations import asyncio import logging import os from datetime import datetime, timedelta, timezone from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from app.db import get_pool from app.services.scraper import ScraperService logger = logging.getLogger(__name__) _scheduler = AsyncIOScheduler() _scraper = ScraperService() JOB_ID = "auto_scrape" async def _run_auto_scrape() -> None: """爬取所有已注册模型。""" pool = get_pool() async with pool.acquire() as conn: cfg = await conn.fetchrow("SELECT * FROM scrape_schedule WHERE id = 1") if not cfg or not cfg["enabled"]: return async with pool.acquire() as conn: rows = await conn.fetch("SELECT url FROM models ORDER BY created_at") urls = [r["url"] for r in rows] if not urls: logger.info("[scheduler] 没有已注册的模型,跳过") return logger.info(f"[scheduler] 开始自动爬取 {len(urls)} 个模型") job_id_row = await pool.fetchrow( "INSERT INTO scrape_jobs (urls, status) VALUES ($1, 'pending') RETURNING id", urls, ) asyncio.create_task(_scraper.run_job(str(job_id_row["id"]), urls, pool)) def _reschedule(interval_days: int, start_hour: int) -> None: if _scheduler.get_job(JOB_ID): _scheduler.remove_job(JOB_ID) # start_hour 是用户输入的本地时间(UTC+8),转换为 UTC tz_offset = int(os.environ.get("TZ_OFFSET_HOURS", "8")) start_hour_utc = (start_hour - tz_offset) % 24 now = datetime.now(timezone.utc) next_run = now.replace(hour=start_hour_utc, minute=0, second=0, microsecond=0) if next_run <= now: next_run += timedelta(days=1) _scheduler.add_job( _run_auto_scrape, trigger=IntervalTrigger(days=interval_days, start_date=next_run), id=JOB_ID, replace_existing=True, ) logger.info(f"[scheduler] 已设置每 {interval_days} 天本地 {start_hour}:00 (UTC {start_hour_utc}:00) 执行,下次: {next_run}") async def start_scheduler() -> None: pool = get_pool() async with pool.acquire() as conn: cfg = await conn.fetchrow("SELECT * FROM scrape_schedule WHERE id = 1") interval = cfg["interval_days"] if cfg else 1 start_hour = cfg["start_hour"] if cfg else 2 _reschedule(interval, start_hour) _scheduler.start() logger.info("[scheduler] 调度器已启动") async def stop_scheduler() -> None: if _scheduler.running: _scheduler.shutdown(wait=False) def get_scheduler() -> AsyncIOScheduler: return _scheduler def reschedule(interval_days: int, start_hour: int) -> None: _reschedule(interval_days, start_hour)