db.py 934 B

12345678910111213141516171819202122232425262728293031323334353637
  1. from __future__ import annotations
  2. from typing import Optional
  3. import asyncpg
  4. from app.config import settings
  5. _pool: Optional[asyncpg.Pool] = None
  6. async def init_pool() -> None:
  7. """Create the global asyncpg connection pool."""
  8. global _pool
  9. _pool = await asyncpg.create_pool(
  10. host=settings.db_host,
  11. port=settings.db_port,
  12. user=settings.db_user,
  13. password=settings.db_password,
  14. database=settings.db_name,
  15. server_settings={"search_path": "crawl"},
  16. )
  17. async def close_pool() -> None:
  18. """Close the global asyncpg connection pool."""
  19. global _pool
  20. if _pool is not None:
  21. await _pool.close()
  22. _pool = None
  23. def get_pool() -> asyncpg.Pool:
  24. """Return the global connection pool. Must be called after init_pool()."""
  25. if _pool is None:
  26. raise RuntimeError("Database pool is not initialised. Call init_pool() first.")
  27. return _pool