| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- from __future__ import annotations
- import asyncio
- from datetime import datetime
- from typing import List, Optional
- import json
- from fastapi import APIRouter, HTTPException
- from pydantic import BaseModel
- from app.db import get_pool
- from app.services.scraper import ScraperService
- router = APIRouter(tags=["scrape"])
- _scraper = ScraperService()
- class ScrapeRequest(BaseModel):
- urls: List[str]
- class ScrapeJobOut(BaseModel):
- job_id: str
- status: str
- error: Optional[str] = None
- created_at: datetime
- class ScrapeResultOut(BaseModel):
- url: str
- model_name: str
- prices: dict
- model_info: Optional[dict] = None
- rate_limits: Optional[dict] = None
- tool_prices: Optional[list] = None
- scraped_at: datetime
- class ScrapeJobDetailOut(BaseModel):
- job_id: str
- status: str
- error: Optional[str] = None
- created_at: datetime
- results: Optional[List[ScrapeResultOut]] = None
- @router.post("/scrape", response_model=ScrapeJobOut, status_code=202)
- async def create_scrape_job(body: ScrapeRequest) -> ScrapeJobOut:
- pool = get_pool()
- async with pool.acquire() as conn:
- row = await conn.fetchrow(
- """
- INSERT INTO scrape_jobs (urls, status)
- VALUES ($1, 'pending')
- RETURNING id, status, error, created_at
- """,
- body.urls,
- )
- job_id = str(row["id"])
- asyncio.create_task(_scraper.run_job(job_id, body.urls, pool))
- return ScrapeJobOut(
- job_id=job_id,
- status=row["status"],
- error=row["error"],
- created_at=row["created_at"],
- )
- @router.get("/scrape", response_model=List[ScrapeJobOut])
- async def list_scrape_jobs() -> List[ScrapeJobOut]:
- pool = get_pool()
- async with pool.acquire() as conn:
- rows = await conn.fetch(
- "SELECT id, status, error, created_at FROM scrape_jobs ORDER BY created_at DESC"
- )
- return [
- ScrapeJobOut(job_id=str(r["id"]), status=r["status"], error=r["error"], created_at=r["created_at"])
- for r in rows
- ]
- @router.get("/scrape/{job_id}", response_model=ScrapeJobDetailOut)
- async def get_scrape_job(job_id: str) -> ScrapeJobDetailOut:
- pool = get_pool()
- async with pool.acquire() as conn:
- row = await conn.fetchrow(
- "SELECT id, status, error, created_at FROM scrape_jobs WHERE id = $1",
- job_id,
- )
- if row is None:
- raise HTTPException(status_code=404, detail="Scrape job not found")
- results: Optional[List[ScrapeResultOut]] = None
- if row["status"] == "done":
- result_rows = await conn.fetch(
- "SELECT url, model_name, prices, model_info, rate_limits, tool_prices, scraped_at FROM scrape_results WHERE job_id = $1 ORDER BY scraped_at ASC",
- job_id,
- )
- def _j(v):
- if v is None: return None
- return v if isinstance(v, (dict, list)) else json.loads(v)
- results = [
- ScrapeResultOut(
- url=r["url"],
- model_name=r["model_name"],
- prices=_j(r["prices"]) or {},
- model_info=_j(r["model_info"]),
- rate_limits=_j(r["rate_limits"]),
- tool_prices=_j(r["tool_prices"]),
- scraped_at=r["scraped_at"],
- )
- for r in result_rows
- ]
- return ScrapeJobDetailOut(
- job_id=str(row["id"]),
- status=row["status"],
- error=row["error"],
- created_at=row["created_at"],
- results=results,
- )
|