| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- """
- ScraperService: runs scrape jobs asynchronously using a thread pool executor.
- Uses the new crawl/main.py scrape_all() which collects prices, model info,
- rate limits and tool call prices in a single browser session.
- """
- from __future__ import annotations
- import asyncio
- import json
- import os
- import sys
- import traceback
- from typing import Any
- # Add backend root and crawl dir to path
- _backend_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- _crawl_dir = os.path.join(_backend_root, "crawl")
- for _p in (_backend_root, _crawl_dir):
- if _p not in sys.path:
- sys.path.insert(0, _p)
- from main import scrape_all # noqa: E402 (backend/crawl/main.py)
- class ScraperService:
- """Manages the lifecycle of a scrape job."""
- async def run_job(self, job_id: str, urls: list[str], pool: Any) -> None:
- loop = asyncio.get_event_loop()
- async with pool.acquire() as conn:
- await conn.execute(
- "UPDATE scrape_jobs SET status = 'running', updated_at = NOW() WHERE id = $1",
- job_id,
- )
- try:
- exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE") or None
- headless = os.environ.get("PLAYWRIGHT_HEADLESS", "true").lower() != "false"
- def _norm(v) -> str:
- if v is None:
- return "null"
- return json.dumps(v if isinstance(v, (dict, list)) else json.loads(v), sort_keys=True)
- any_changed = False
- # 如果 snapshot 里已有的 URL 集合与本次爬取的不一致(多或少),触发变更
- async with pool.acquire() as conn:
- rows = await conn.fetch("SELECT url FROM price_snapshot")
- existing_snapshot_urls = {row["url"] for row in rows}
- if existing_snapshot_urls != set(urls):
- any_changed = True
- for url in urls:
- result: dict = await loop.run_in_executor(
- None,
- lambda u=url: scrape_all(
- u,
- headless=headless,
- timeout=20000,
- executable_path=exec_path,
- modules=["info", "rate", "tool", "price"],
- ),
- )
- prices = result.get("prices") or {}
- model_info = result.get("info") or {}
- rate_limits = result.get("rate_limits") or {}
- tool_prices = result.get("tool_call_prices") or []
- # model_name: 直接用 URL 中提取的 model_id,保持和用户输入一致
- model_name = (
- result.get("model_id")
- or url.rstrip("/").split("/")[-1]
- )
- async with pool.acquire() as conn:
- await conn.execute(
- """
- INSERT INTO scrape_results
- (job_id, url, model_name, prices, model_info, rate_limits, tool_prices, raw_data)
- VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7::jsonb, $8::jsonb)
- """,
- job_id, url, model_name,
- json.dumps(prices), json.dumps(model_info),
- json.dumps(rate_limits), json.dumps(tool_prices),
- json.dumps(result),
- )
- # 对比旧快照,有变化才 upsert
- existing = await conn.fetchrow(
- "SELECT prices, model_info, rate_limits, tool_prices FROM price_snapshot WHERE url = $1",
- url,
- )
- data_changed = (
- existing is None
- or _norm(existing["prices"]) != _norm(prices)
- or _norm(existing["model_info"]) != _norm(model_info)
- or _norm(existing["rate_limits"]) != _norm(rate_limits)
- or _norm(existing["tool_prices"]) != _norm(tool_prices)
- )
- if data_changed:
- any_changed = True
- await conn.execute(
- """
- INSERT INTO price_snapshot
- (url, model_name, prices, model_info, rate_limits, tool_prices, updated_at)
- VALUES ($1, $2, $3::jsonb, $4::jsonb, $5::jsonb, $6::jsonb, NOW())
- ON CONFLICT (url) DO UPDATE SET
- model_name = EXCLUDED.model_name,
- prices = EXCLUDED.prices,
- model_info = EXCLUDED.model_info,
- rate_limits = EXCLUDED.rate_limits,
- tool_prices = EXCLUDED.tool_prices,
- updated_at = NOW()
- """,
- url, model_name,
- json.dumps(prices), json.dumps(model_info),
- json.dumps(rate_limits), json.dumps(tool_prices),
- )
- # 删除 snapshot 里不在本次爬取列表中的行(模型被移除的情况)
- async with pool.acquire() as conn:
- await conn.execute(
- "DELETE FROM price_snapshot WHERE url != ALL($1::text[])",
- urls,
- )
- # 本批次有任何数据变化,全局版本号 +1(从 1 开始)
- if any_changed:
- async with pool.acquire() as conn:
- await conn.execute(
- """
- UPDATE price_snapshot_version
- SET version = GREATEST(version + 1, 1), updated_at = NOW()
- WHERE id = 1
- """
- )
- async with pool.acquire() as conn:
- await conn.execute(
- "UPDATE scrape_jobs SET status = 'done', updated_at = NOW() WHERE id = $1",
- job_id,
- )
- except Exception as exc:
- error_msg = f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"
- async with pool.acquire() as conn:
- await conn.execute(
- "UPDATE scrape_jobs SET status = 'failed', error = $2, updated_at = NOW() WHERE id = $1",
- job_id,
- error_msg,
- )
|