""" ScraperService: runs scrape jobs asynchronously using a thread pool executor. Delegates all scraping logic to scrape_aliyun_models.py (the working standalone script). """ from __future__ import annotations import asyncio import json import os import sys import traceback from typing import Any # Add backend root to path so we can import scrape_aliyun_models.py directly _backend_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) if _backend_root not in sys.path: sys.path.insert(0, _backend_root) from scrape_aliyun_models import scrape_model_price # noqa: E402 class ScraperService: """Manages the lifecycle of a scrape job.""" async def run_job(self, job_id: str, urls: list[str], pool: Any) -> None: loop = asyncio.get_event_loop() async with pool.acquire() as conn: await conn.execute( "UPDATE scrape_jobs SET status = 'running', updated_at = NOW() WHERE id = $1", job_id, ) try: exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE") or None headless = os.environ.get("PLAYWRIGHT_HEADLESS", "true").lower() != "false" for url in urls: result: dict = await loop.run_in_executor( None, scrape_model_price, url, headless, 20000, exec_path ) # scrape_model_price returns {"url":..., "error":..., "prices":{...}} prices = result.get("prices") or {} model_name = url.rstrip("/").split("/")[-1] async with pool.acquire() as conn: await conn.execute( """ INSERT INTO scrape_results (job_id, url, model_name, prices) VALUES ($1, $2, $3, $4::jsonb) """, job_id, url, model_name, json.dumps(prices), ) async with pool.acquire() as conn: await conn.execute( "UPDATE scrape_jobs SET status = 'done', updated_at = NOW() WHERE id = $1", job_id, ) except Exception as exc: error_msg = f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}" async with pool.acquire() as conn: await conn.execute( """ UPDATE scrape_jobs SET status = 'failed', error = $2, updated_at = NOW() WHERE id = $1 """, job_id, error_msg, )