scraper.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. """
  2. ScraperService: runs scrape jobs asynchronously using a thread pool executor.
  3. Uses the new crawl/main.py scrape_all() which collects prices, model info,
  4. rate limits and tool call prices in a single browser session.
  5. """
  6. from __future__ import annotations
  7. import asyncio
  8. import json
  9. import os
  10. import sys
  11. import traceback
  12. from typing import Any
  13. # Add backend root and crawl dir to path
  14. _backend_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  15. _crawl_dir = os.path.join(_backend_root, "crawl")
  16. for _p in (_backend_root, _crawl_dir):
  17. if _p not in sys.path:
  18. sys.path.insert(0, _p)
  19. from main import scrape_all # noqa: E402 (backend/crawl/main.py)
  20. # 启动时打印当前 INFO_API_RE,用于确认加载的是最新代码
  21. import main as _crawl_main
  22. import logging as _logging
  23. _logging.getLogger(__name__).info(
  24. f"[scraper] INFO_API_RE = {_crawl_main.INFO_API_RE.pattern[:60]}..."
  25. )
  26. class ScraperService:
  27. """Manages the lifecycle of a scrape job."""
  28. async def run_job(self, job_id: str, urls: list[str], pool: Any, api_keys: dict[str, str] | None = None) -> None:
  29. loop = asyncio.get_event_loop()
  30. if api_keys is None:
  31. api_keys = {}
  32. async with pool.acquire() as conn:
  33. await conn.execute(
  34. "UPDATE scrape_jobs SET status = 'running', updated_at = NOW() WHERE id = $1",
  35. job_id,
  36. )
  37. try:
  38. exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE") or None
  39. headless = os.environ.get("PLAYWRIGHT_HEADLESS", "true").lower() != "false"
  40. def _norm(v) -> str:
  41. if v is None:
  42. return "null"
  43. return json.dumps(v if isinstance(v, (dict, list)) else json.loads(v), sort_keys=True)
  44. any_changed = False
  45. # 如果 snapshot 里已有的 URL 集合与本次爬取的不一致(多或少),触发变更
  46. async with pool.acquire() as conn:
  47. rows = await conn.fetch("SELECT url FROM price_snapshot")
  48. existing_snapshot_urls = {row["url"] for row in rows}
  49. if existing_snapshot_urls != set(urls):
  50. any_changed = True
  51. # 查出 url -> name 映射,用于 model_hint
  52. async with pool.acquire() as conn:
  53. name_rows = await conn.fetch(
  54. "SELECT url, name FROM models WHERE url = ANY($1::text[])",
  55. urls,
  56. )
  57. model_names = {row["url"]: row["name"] for row in name_rows}
  58. for url in urls:
  59. api_key = api_keys.get(url)
  60. model_hint = model_names.get(url)
  61. result: dict = await loop.run_in_executor(
  62. None,
  63. lambda u=url, k=api_key, h=model_hint: scrape_all(
  64. u,
  65. headless=headless,
  66. timeout=20000,
  67. executable_path=exec_path,
  68. modules=["info", "rate", "tool", "price", "icon"],
  69. api_key=k,
  70. model_hint=h,
  71. ),
  72. )
  73. prices = result.get("prices") or {}
  74. model_info = result.get("info") or {}
  75. rate_limits = result.get("rate_limits") or {}
  76. tool_prices = result.get("tool_call_prices") or []
  77. icon = result.get("icon") # SVG string or None
  78. # model_name: 直接用 URL 中提取的 model_id,保持和用户输入一致
  79. model_name = (
  80. result.get("model_id")
  81. or url.rstrip("/").split("/")[-1]
  82. )
  83. async with pool.acquire() as conn:
  84. await conn.execute(
  85. """
  86. INSERT INTO scrape_results
  87. (job_id, url, model_name, prices, model_info, rate_limits, tool_prices, raw_data, icon)
  88. VALUES ($1, $2, $3, $4::jsonb, $5::jsonb, $6::jsonb, $7::jsonb, $8::jsonb, $9)
  89. """,
  90. job_id, url, model_name,
  91. json.dumps(prices), json.dumps(model_info),
  92. json.dumps(rate_limits), json.dumps(tool_prices),
  93. json.dumps(result), icon,
  94. )
  95. # 对比旧快照,有变化才 upsert
  96. existing = await conn.fetchrow(
  97. "SELECT prices, model_info, rate_limits, tool_prices, icon FROM price_snapshot WHERE url = $1",
  98. url,
  99. )
  100. data_changed = (
  101. existing is None
  102. or _norm(existing["prices"]) != _norm(prices)
  103. or _norm(existing["model_info"]) != _norm(model_info)
  104. or _norm(existing["rate_limits"]) != _norm(rate_limits)
  105. or _norm(existing["tool_prices"]) != _norm(tool_prices)
  106. or (existing["icon"] or "") != (icon or "")
  107. )
  108. if data_changed:
  109. any_changed = True
  110. await conn.execute(
  111. """
  112. INSERT INTO price_snapshot
  113. (url, model_name, prices, model_info, rate_limits, tool_prices, icon, updated_at)
  114. VALUES ($1, $2, $3::jsonb, $4::jsonb, $5::jsonb, $6::jsonb, $7, NOW())
  115. ON CONFLICT (url) DO UPDATE SET
  116. model_name = EXCLUDED.model_name,
  117. prices = EXCLUDED.prices,
  118. model_info = EXCLUDED.model_info,
  119. rate_limits = EXCLUDED.rate_limits,
  120. tool_prices = EXCLUDED.tool_prices,
  121. icon = EXCLUDED.icon,
  122. updated_at = NOW()
  123. """,
  124. url, model_name,
  125. json.dumps(prices), json.dumps(model_info),
  126. json.dumps(rate_limits), json.dumps(tool_prices), icon,
  127. )
  128. # 删除 snapshot 里不在本次爬取列表中的行(模型被移除的情况)
  129. async with pool.acquire() as conn:
  130. await conn.execute(
  131. "DELETE FROM price_snapshot WHERE url != ALL($1::text[])",
  132. urls,
  133. )
  134. # 本批次有任何数据变化,全局版本号 +1(从 1 开始),同时 bump 所有已有 domain_version 的域名
  135. if any_changed:
  136. async with pool.acquire() as conn:
  137. await conn.execute(
  138. """
  139. UPDATE price_snapshot_version
  140. SET version = GREATEST(version + 1, 1), updated_at = NOW()
  141. WHERE id = 1
  142. """
  143. )
  144. # 同步 bump 所有已有 domain_version 记录的域名
  145. await conn.execute(
  146. """
  147. UPDATE domain_version
  148. SET version = version + 1, updated_at = NOW()
  149. """
  150. )
  151. async with pool.acquire() as conn:
  152. await conn.execute(
  153. "UPDATE scrape_jobs SET status = 'done', updated_at = NOW() WHERE id = $1",
  154. job_id,
  155. )
  156. except Exception as exc:
  157. error_msg = f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"
  158. async with pool.acquire() as conn:
  159. await conn.execute(
  160. "UPDATE scrape_jobs SET status = 'failed', error = $2, updated_at = NOW() WHERE id = $1",
  161. job_id,
  162. error_msg,
  163. )