scrape.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from __future__ import annotations
  2. import asyncio
  3. from datetime import datetime
  4. from typing import List, Optional
  5. import json
  6. from fastapi import APIRouter, HTTPException
  7. from pydantic import BaseModel
  8. from app.db import get_pool
  9. from app.services.scraper import ScraperService
  10. router = APIRouter(tags=["scrape"])
  11. _scraper = ScraperService()
  12. class ScrapeRequest(BaseModel):
  13. urls: List[str]
  14. class ScrapeJobOut(BaseModel):
  15. job_id: str
  16. status: str
  17. error: Optional[str] = None
  18. created_at: datetime
  19. class ScrapeResultOut(BaseModel):
  20. url: str
  21. model_name: str
  22. prices: dict
  23. model_info: Optional[dict] = None
  24. rate_limits: Optional[dict] = None
  25. tool_prices: Optional[list] = None
  26. scraped_at: datetime
  27. class ScrapeJobDetailOut(BaseModel):
  28. job_id: str
  29. status: str
  30. error: Optional[str] = None
  31. created_at: datetime
  32. results: Optional[List[ScrapeResultOut]] = None
  33. @router.post("/scrape", response_model=ScrapeJobOut, status_code=202)
  34. async def create_scrape_job(body: ScrapeRequest) -> ScrapeJobOut:
  35. pool = get_pool()
  36. async with pool.acquire() as conn:
  37. row = await conn.fetchrow(
  38. """
  39. INSERT INTO scrape_jobs (urls, status)
  40. VALUES ($1, 'pending')
  41. RETURNING id, status, error, created_at
  42. """,
  43. body.urls,
  44. )
  45. job_id = str(row["id"])
  46. asyncio.create_task(_scraper.run_job(job_id, body.urls, pool))
  47. return ScrapeJobOut(
  48. job_id=job_id,
  49. status=row["status"],
  50. error=row["error"],
  51. created_at=row["created_at"],
  52. )
  53. @router.get("/scrape", response_model=List[ScrapeJobOut])
  54. async def list_scrape_jobs() -> List[ScrapeJobOut]:
  55. pool = get_pool()
  56. async with pool.acquire() as conn:
  57. rows = await conn.fetch(
  58. "SELECT id, status, error, created_at FROM scrape_jobs ORDER BY created_at DESC"
  59. )
  60. return [
  61. ScrapeJobOut(job_id=str(r["id"]), status=r["status"], error=r["error"], created_at=r["created_at"])
  62. for r in rows
  63. ]
  64. @router.get("/scrape/{job_id}", response_model=ScrapeJobDetailOut)
  65. async def get_scrape_job(job_id: str) -> ScrapeJobDetailOut:
  66. pool = get_pool()
  67. async with pool.acquire() as conn:
  68. row = await conn.fetchrow(
  69. "SELECT id, status, error, created_at FROM scrape_jobs WHERE id = $1",
  70. job_id,
  71. )
  72. if row is None:
  73. raise HTTPException(status_code=404, detail="Scrape job not found")
  74. results: Optional[List[ScrapeResultOut]] = None
  75. if row["status"] == "done":
  76. result_rows = await conn.fetch(
  77. "SELECT url, model_name, prices, model_info, rate_limits, tool_prices, scraped_at FROM scrape_results WHERE job_id = $1 ORDER BY scraped_at ASC",
  78. job_id,
  79. )
  80. def _j(v):
  81. if v is None: return None
  82. return v if isinstance(v, (dict, list)) else json.loads(v)
  83. results = [
  84. ScrapeResultOut(
  85. url=r["url"],
  86. model_name=r["model_name"],
  87. prices=_j(r["prices"]) or {},
  88. model_info=_j(r["model_info"]),
  89. rate_limits=_j(r["rate_limits"]),
  90. tool_prices=_j(r["tool_prices"]),
  91. scraped_at=r["scraped_at"],
  92. )
  93. for r in result_rows
  94. ]
  95. return ScrapeJobDetailOut(
  96. job_id=str(row["id"]),
  97. status=row["status"],
  98. error=row["error"],
  99. created_at=row["created_at"],
  100. results=results,
  101. )