| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- from __future__ import annotations
- import asyncio
- import logging
- import os
- from datetime import datetime, timedelta, timezone
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.interval import IntervalTrigger
- from app.db import get_pool
- from app.services.scraper import ScraperService
- logger = logging.getLogger(__name__)
- _scheduler = AsyncIOScheduler()
- _scraper = ScraperService()
- JOB_ID = "auto_scrape"
- async def _run_auto_scrape() -> None:
- """爬取所有已注册模型。"""
- pool = get_pool()
- async with pool.acquire() as conn:
- cfg = await conn.fetchrow("SELECT * FROM scrape_schedule WHERE id = 1")
- if not cfg or not cfg["enabled"]:
- return
- async with pool.acquire() as conn:
- rows = await conn.fetch("SELECT url FROM models ORDER BY created_at")
- urls = [r["url"] for r in rows]
- if not urls:
- logger.info("[scheduler] 没有已注册的模型,跳过")
- return
- logger.info(f"[scheduler] 开始自动爬取 {len(urls)} 个模型")
- job_id_row = await pool.fetchrow(
- "INSERT INTO scrape_jobs (urls, status) VALUES ($1, 'pending') RETURNING id",
- urls,
- )
- asyncio.create_task(_scraper.run_job(str(job_id_row["id"]), urls, pool))
- def _reschedule(interval_days: int, start_hour: int) -> None:
- if _scheduler.get_job(JOB_ID):
- _scheduler.remove_job(JOB_ID)
- # start_hour 是用户输入的本地时间(UTC+8),转换为 UTC
- tz_offset = int(os.environ.get("TZ_OFFSET_HOURS", "8"))
- start_hour_utc = (start_hour - tz_offset) % 24
- now = datetime.now(timezone.utc)
- next_run = now.replace(hour=start_hour_utc, minute=0, second=0, microsecond=0)
- if next_run <= now:
- next_run += timedelta(days=1)
- _scheduler.add_job(
- _run_auto_scrape,
- trigger=IntervalTrigger(days=interval_days, start_date=next_run),
- id=JOB_ID,
- replace_existing=True,
- )
- logger.info(f"[scheduler] 已设置每 {interval_days} 天本地 {start_hour}:00 (UTC {start_hour_utc}:00) 执行,下次: {next_run}")
- async def start_scheduler() -> None:
- pool = get_pool()
- async with pool.acquire() as conn:
- cfg = await conn.fetchrow("SELECT * FROM scrape_schedule WHERE id = 1")
- interval = cfg["interval_days"] if cfg else 1
- start_hour = cfg["start_hour"] if cfg else 2
- _reschedule(interval, start_hour)
- _scheduler.start()
- logger.info("[scheduler] 调度器已启动")
- async def stop_scheduler() -> None:
- if _scheduler.running:
- _scheduler.shutdown(wait=False)
- def get_scheduler() -> AsyncIOScheduler:
- return _scheduler
- def reschedule(interval_days: int, start_hour: int) -> None:
- _reschedule(interval_days, start_hour)
|