scrape.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. from __future__ import annotations
  2. import asyncio
  3. from datetime import datetime
  4. from typing import List, Optional
  5. import json
  6. from fastapi import APIRouter, HTTPException
  7. from pydantic import BaseModel
  8. from app.db import get_pool
  9. from app.services.scraper import ScraperService
  10. router = APIRouter(tags=["scrape"])
  11. _scraper = ScraperService()
  12. class ScrapeRequest(BaseModel):
  13. urls: List[str]
  14. class ScrapeJobOut(BaseModel):
  15. job_id: str
  16. status: str
  17. error: Optional[str] = None
  18. created_at: datetime
  19. class ScrapeResultOut(BaseModel):
  20. url: str
  21. model_name: str
  22. prices: dict
  23. scraped_at: datetime
  24. class ScrapeJobDetailOut(BaseModel):
  25. job_id: str
  26. status: str
  27. error: Optional[str] = None
  28. created_at: datetime
  29. results: Optional[List[ScrapeResultOut]] = None
  30. @router.post("/scrape", response_model=ScrapeJobOut, status_code=202)
  31. async def create_scrape_job(body: ScrapeRequest) -> ScrapeJobOut:
  32. pool = get_pool()
  33. async with pool.acquire() as conn:
  34. row = await conn.fetchrow(
  35. """
  36. INSERT INTO scrape_jobs (urls, status)
  37. VALUES ($1, 'pending')
  38. RETURNING id, status, error, created_at
  39. """,
  40. body.urls,
  41. )
  42. job_id = str(row["id"])
  43. asyncio.create_task(_scraper.run_job(job_id, body.urls, pool))
  44. return ScrapeJobOut(
  45. job_id=job_id,
  46. status=row["status"],
  47. error=row["error"],
  48. created_at=row["created_at"],
  49. )
  50. @router.get("/scrape", response_model=List[ScrapeJobOut])
  51. async def list_scrape_jobs() -> List[ScrapeJobOut]:
  52. pool = get_pool()
  53. async with pool.acquire() as conn:
  54. rows = await conn.fetch(
  55. "SELECT id, status, error, created_at FROM scrape_jobs ORDER BY created_at DESC"
  56. )
  57. return [
  58. ScrapeJobOut(job_id=str(r["id"]), status=r["status"], error=r["error"], created_at=r["created_at"])
  59. for r in rows
  60. ]
  61. @router.get("/scrape/{job_id}", response_model=ScrapeJobDetailOut)
  62. async def get_scrape_job(job_id: str) -> ScrapeJobDetailOut:
  63. pool = get_pool()
  64. async with pool.acquire() as conn:
  65. row = await conn.fetchrow(
  66. "SELECT id, status, error, created_at FROM scrape_jobs WHERE id = $1",
  67. job_id,
  68. )
  69. if row is None:
  70. raise HTTPException(status_code=404, detail="Scrape job not found")
  71. results: Optional[List[ScrapeResultOut]] = None
  72. if row["status"] == "done":
  73. result_rows = await conn.fetch(
  74. "SELECT url, model_name, prices, scraped_at FROM scrape_results WHERE job_id = $1 ORDER BY scraped_at ASC",
  75. job_id,
  76. )
  77. results = [
  78. ScrapeResultOut(
  79. url=r["url"],
  80. model_name=r["model_name"],
  81. prices=r["prices"] if isinstance(r["prices"], dict) else json.loads(r["prices"]),
  82. scraped_at=r["scraped_at"],
  83. )
  84. for r in result_rows
  85. ]
  86. return ScrapeJobDetailOut(
  87. job_id=str(row["id"]),
  88. status=row["status"],
  89. error=row["error"],
  90. created_at=row["created_at"],
  91. results=results,
  92. )