scraper.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. """
  2. ScraperService: runs scrape jobs asynchronously using a thread pool executor.
  3. Uses the new crawl/main.py scrape_all() which collects prices, model info,
  4. rate limits and tool call prices in a single browser session.
  5. """
  6. from __future__ import annotations
  7. import asyncio
  8. import json
  9. import os
  10. import sys
  11. import traceback
  12. from typing import Any
  13. # Add backend root and crawl dir to path
  14. _backend_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  15. _crawl_dir = os.path.join(_backend_root, "crawl")
  16. for _p in (_backend_root, _crawl_dir):
  17. if _p not in sys.path:
  18. sys.path.insert(0, _p)
  19. from main import scrape_all # noqa: E402 (backend/crawl/main.py)
  20. class ScraperService:
  21. """Manages the lifecycle of a scrape job."""
  22. async def run_job(self, job_id: str, urls: list[str], pool: Any) -> None:
  23. loop = asyncio.get_event_loop()
  24. async with pool.acquire() as conn:
  25. await conn.execute(
  26. "UPDATE scrape_jobs SET status = 'running', updated_at = NOW() WHERE id = $1",
  27. job_id,
  28. )
  29. try:
  30. exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE") or None
  31. headless = os.environ.get("PLAYWRIGHT_HEADLESS", "true").lower() != "false"
  32. for url in urls:
  33. result: dict = await loop.run_in_executor(
  34. None,
  35. lambda u=url: scrape_all(
  36. u,
  37. headless=headless,
  38. timeout=20000,
  39. executable_path=exec_path,
  40. modules=["info", "rate", "tool", "price"],
  41. ),
  42. )
  43. prices = result.get("prices") or {}
  44. model_info = result.get("info") or {}
  45. rate_limits = result.get("rate_limits") or {}
  46. tool_prices = result.get("tool_call_prices") or []
  47. # model_name: 直接用 URL 中提取的 model_id,保持和用户输入一致
  48. model_name = (
  49. result.get("model_id")
  50. or url.rstrip("/").split("/")[-1]
  51. )
  52. async with pool.acquire() as conn:
  53. await conn.execute(
  54. """
  55. INSERT INTO scrape_results
  56. (job_id, url, model_name, prices, model_info, rate_limits, tool_prices, raw_data)
  57. VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7::jsonb, $8::jsonb)
  58. """,
  59. job_id,
  60. url,
  61. model_name,
  62. json.dumps(prices),
  63. json.dumps(model_info),
  64. json.dumps(rate_limits),
  65. json.dumps(tool_prices),
  66. json.dumps(result),
  67. )
  68. async with pool.acquire() as conn:
  69. await conn.execute(
  70. "UPDATE scrape_jobs SET status = 'done', updated_at = NOW() WHERE id = $1",
  71. job_id,
  72. )
  73. except Exception as exc:
  74. error_msg = f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"
  75. async with pool.acquire() as conn:
  76. await conn.execute(
  77. "UPDATE scrape_jobs SET status = 'failed', error = $2, updated_at = NOW() WHERE id = $1",
  78. job_id,
  79. error_msg,
  80. )