scraper.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. """
  2. ScraperService: runs scrape jobs asynchronously using a thread pool executor.
  3. Uses the new crawl/main.py scrape_all() which collects prices, model info,
  4. rate limits and tool call prices in a single browser session.
  5. """
  6. from __future__ import annotations
  7. import asyncio
  8. import json
  9. import os
  10. import sys
  11. import traceback
  12. from typing import Any
  13. # Add backend root and crawl dir to path
  14. _backend_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  15. _crawl_dir = os.path.join(_backend_root, "crawl")
  16. for _p in (_backend_root, _crawl_dir):
  17. if _p not in sys.path:
  18. sys.path.insert(0, _p)
  19. from main import scrape_all # noqa: E402 (backend/crawl/main.py)
  20. class ScraperService:
  21. """Manages the lifecycle of a scrape job."""
  22. async def run_job(self, job_id: str, urls: list[str], pool: Any) -> None:
  23. loop = asyncio.get_event_loop()
  24. async with pool.acquire() as conn:
  25. await conn.execute(
  26. "UPDATE scrape_jobs SET status = 'running', updated_at = NOW() WHERE id = $1",
  27. job_id,
  28. )
  29. try:
  30. exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE") or None
  31. headless = os.environ.get("PLAYWRIGHT_HEADLESS", "true").lower() != "false"
  32. def _norm(v) -> str:
  33. if v is None:
  34. return "null"
  35. return json.dumps(v if isinstance(v, (dict, list)) else json.loads(v), sort_keys=True)
  36. any_changed = False
  37. for url in urls:
  38. result: dict = await loop.run_in_executor(
  39. None,
  40. lambda u=url: scrape_all(
  41. u,
  42. headless=headless,
  43. timeout=20000,
  44. executable_path=exec_path,
  45. modules=["info", "rate", "tool", "price"],
  46. ),
  47. )
  48. prices = result.get("prices") or {}
  49. model_info = result.get("info") or {}
  50. rate_limits = result.get("rate_limits") or {}
  51. tool_prices = result.get("tool_call_prices") or []
  52. # model_name: 直接用 URL 中提取的 model_id,保持和用户输入一致
  53. model_name = (
  54. result.get("model_id")
  55. or url.rstrip("/").split("/")[-1]
  56. )
  57. async with pool.acquire() as conn:
  58. await conn.execute(
  59. """
  60. INSERT INTO scrape_results
  61. (job_id, url, model_name, prices, model_info, rate_limits, tool_prices, raw_data)
  62. VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7::jsonb, $8::jsonb)
  63. """,
  64. job_id, url, model_name,
  65. json.dumps(prices), json.dumps(model_info),
  66. json.dumps(rate_limits), json.dumps(tool_prices),
  67. json.dumps(result),
  68. )
  69. # 对比旧快照,有变化才 upsert
  70. existing = await conn.fetchrow(
  71. "SELECT prices, model_info, rate_limits, tool_prices FROM price_snapshot WHERE url = $1",
  72. url,
  73. )
  74. data_changed = (
  75. existing is None
  76. or _norm(existing["prices"]) != _norm(prices)
  77. or _norm(existing["model_info"]) != _norm(model_info)
  78. or _norm(existing["rate_limits"]) != _norm(rate_limits)
  79. or _norm(existing["tool_prices"]) != _norm(tool_prices)
  80. )
  81. if data_changed:
  82. any_changed = True
  83. await conn.execute(
  84. """
  85. INSERT INTO price_snapshot
  86. (url, model_name, prices, model_info, rate_limits, tool_prices, updated_at)
  87. VALUES ($1, $2, $3::jsonb, $4::jsonb, $5::jsonb, $6::jsonb, NOW())
  88. ON CONFLICT (url) DO UPDATE SET
  89. model_name = EXCLUDED.model_name,
  90. prices = EXCLUDED.prices,
  91. model_info = EXCLUDED.model_info,
  92. rate_limits = EXCLUDED.rate_limits,
  93. tool_prices = EXCLUDED.tool_prices,
  94. updated_at = NOW()
  95. """,
  96. url, model_name,
  97. json.dumps(prices), json.dumps(model_info),
  98. json.dumps(rate_limits), json.dumps(tool_prices),
  99. )
  100. # 本批次有任何数据变化,全局版本号 +1(从 1 开始)
  101. if any_changed:
  102. async with pool.acquire() as conn:
  103. await conn.execute(
  104. """
  105. UPDATE price_snapshot_version
  106. SET version = GREATEST(version + 1, 1), updated_at = NOW()
  107. WHERE id = 1
  108. """
  109. )
  110. async with pool.acquire() as conn:
  111. await conn.execute(
  112. "UPDATE scrape_jobs SET status = 'done', updated_at = NOW() WHERE id = $1",
  113. job_id,
  114. )
  115. except Exception as exc:
  116. error_msg = f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"
  117. async with pool.acquire() as conn:
  118. await conn.execute(
  119. "UPDATE scrape_jobs SET status = 'failed', error = $2, updated_at = NOW() WHERE id = $1",
  120. job_id,
  121. error_msg,
  122. )