scraper.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. """
  2. ScraperService: runs scrape jobs asynchronously using a thread pool executor.
  3. Delegates all scraping logic to scrape_aliyun_models.py (the working standalone script).
  4. """
  5. from __future__ import annotations
  6. import asyncio
  7. import json
  8. import os
  9. import sys
  10. import traceback
  11. from typing import Any
  12. # Add backend root to path so we can import scrape_aliyun_models.py directly
  13. _backend_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  14. if _backend_root not in sys.path:
  15. sys.path.insert(0, _backend_root)
  16. from scrape_aliyun_models import scrape_model_price # noqa: E402
  17. class ScraperService:
  18. """Manages the lifecycle of a scrape job."""
  19. async def run_job(self, job_id: str, urls: list[str], pool: Any) -> None:
  20. loop = asyncio.get_event_loop()
  21. async with pool.acquire() as conn:
  22. await conn.execute(
  23. "UPDATE scrape_jobs SET status = 'running', updated_at = NOW() WHERE id = $1",
  24. job_id,
  25. )
  26. try:
  27. exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE") or None
  28. headless = os.environ.get("PLAYWRIGHT_HEADLESS", "true").lower() != "false"
  29. for url in urls:
  30. result: dict = await loop.run_in_executor(
  31. None, scrape_model_price, url, headless, 20000, exec_path
  32. )
  33. # scrape_model_price returns {"url":..., "error":..., "prices":{...}}
  34. prices = result.get("prices") or {}
  35. model_name = url.rstrip("/").split("/")[-1]
  36. async with pool.acquire() as conn:
  37. await conn.execute(
  38. """
  39. INSERT INTO scrape_results (job_id, url, model_name, prices)
  40. VALUES ($1, $2, $3, $4::jsonb)
  41. """,
  42. job_id,
  43. url,
  44. model_name,
  45. json.dumps(prices),
  46. )
  47. async with pool.acquire() as conn:
  48. await conn.execute(
  49. "UPDATE scrape_jobs SET status = 'done', updated_at = NOW() WHERE id = $1",
  50. job_id,
  51. )
  52. except Exception as exc:
  53. error_msg = f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"
  54. async with pool.acquire() as conn:
  55. await conn.execute(
  56. """
  57. UPDATE scrape_jobs
  58. SET status = 'failed', error = $2, updated_at = NOW()
  59. WHERE id = $1
  60. """,
  61. job_id,
  62. error_msg,
  63. )