""" ScraperService: runs scrape jobs asynchronously using a thread pool executor. Uses the new crawl/main.py scrape_all() which collects prices, model info, rate limits and tool call prices in a single browser session. """ from __future__ import annotations import asyncio import json import os import sys import traceback from typing import Any # Add backend root and crawl dir to path _backend_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) _crawl_dir = os.path.join(_backend_root, "crawl") for _p in (_backend_root, _crawl_dir): if _p not in sys.path: sys.path.insert(0, _p) from main import scrape_all # noqa: E402 (backend/crawl/main.py) class ScraperService: """Manages the lifecycle of a scrape job.""" async def run_job(self, job_id: str, urls: list[str], pool: Any) -> None: loop = asyncio.get_event_loop() async with pool.acquire() as conn: await conn.execute( "UPDATE scrape_jobs SET status = 'running', updated_at = NOW() WHERE id = $1", job_id, ) try: exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE") or None headless = os.environ.get("PLAYWRIGHT_HEADLESS", "true").lower() != "false" def _norm(v) -> str: if v is None: return "null" return json.dumps(v if isinstance(v, (dict, list)) else json.loads(v), sort_keys=True) any_changed = False # 如果 snapshot 里已有的 URL 集合与本次爬取的不一致(多或少),触发变更 async with pool.acquire() as conn: rows = await conn.fetch("SELECT url FROM price_snapshot") existing_snapshot_urls = {row["url"] for row in rows} if existing_snapshot_urls != set(urls): any_changed = True for url in urls: result: dict = await loop.run_in_executor( None, lambda u=url: scrape_all( u, headless=headless, timeout=20000, executable_path=exec_path, modules=["info", "rate", "tool", "price"], ), ) prices = result.get("prices") or {} model_info = result.get("info") or {} rate_limits = result.get("rate_limits") or {} tool_prices = result.get("tool_call_prices") or [] # model_name: 直接用 URL 中提取的 model_id,保持和用户输入一致 model_name = ( result.get("model_id") or url.rstrip("/").split("/")[-1] ) async with pool.acquire() as conn: await conn.execute( """ INSERT INTO scrape_results (job_id, url, model_name, prices, model_info, rate_limits, tool_prices, raw_data) VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7::jsonb, $8::jsonb) """, job_id, url, model_name, json.dumps(prices), json.dumps(model_info), json.dumps(rate_limits), json.dumps(tool_prices), json.dumps(result), ) # 对比旧快照,有变化才 upsert existing = await conn.fetchrow( "SELECT prices, model_info, rate_limits, tool_prices FROM price_snapshot WHERE url = $1", url, ) data_changed = ( existing is None or _norm(existing["prices"]) != _norm(prices) or _norm(existing["model_info"]) != _norm(model_info) or _norm(existing["rate_limits"]) != _norm(rate_limits) or _norm(existing["tool_prices"]) != _norm(tool_prices) ) if data_changed: any_changed = True await conn.execute( """ INSERT INTO price_snapshot (url, model_name, prices, model_info, rate_limits, tool_prices, updated_at) VALUES ($1, $2, $3::jsonb, $4::jsonb, $5::jsonb, $6::jsonb, NOW()) ON CONFLICT (url) DO UPDATE SET model_name = EXCLUDED.model_name, prices = EXCLUDED.prices, model_info = EXCLUDED.model_info, rate_limits = EXCLUDED.rate_limits, tool_prices = EXCLUDED.tool_prices, updated_at = NOW() """, url, model_name, json.dumps(prices), json.dumps(model_info), json.dumps(rate_limits), json.dumps(tool_prices), ) # 删除 snapshot 里不在本次爬取列表中的行(模型被移除的情况) async with pool.acquire() as conn: await conn.execute( "DELETE FROM price_snapshot WHERE url != ALL($1::text[])", urls, ) # 本批次有任何数据变化,全局版本号 +1(从 1 开始) if any_changed: async with pool.acquire() as conn: await conn.execute( """ UPDATE price_snapshot_version SET version = GREATEST(version + 1, 1), updated_at = NOW() WHERE id = 1 """ ) async with pool.acquire() as conn: await conn.execute( "UPDATE scrape_jobs SET status = 'done', updated_at = NOW() WHERE id = $1", job_id, ) except Exception as exc: error_msg = f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}" async with pool.acquire() as conn: await conn.execute( "UPDATE scrape_jobs SET status = 'failed', error = $2, updated_at = NOW() WHERE id = $1", job_id, error_msg, )