db.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from __future__ import annotations
  2. import logging
  3. from typing import Optional
  4. import asyncpg
  5. from app.config import settings
  6. logger = logging.getLogger(__name__)
  7. _pool: Optional[asyncpg.Pool] = None
  8. async def init_pool() -> None:
  9. """Create the global asyncpg connection pool."""
  10. global _pool
  11. logger.info(
  12. "Connecting to database %s@%s:%s/%s ...",
  13. settings.db_user,
  14. settings.db_host,
  15. settings.db_port,
  16. settings.db_name,
  17. )
  18. try:
  19. _pool = await asyncpg.create_pool(
  20. host=settings.db_host,
  21. port=settings.db_port,
  22. user=settings.db_user,
  23. password=settings.db_password,
  24. database=settings.db_name,
  25. server_settings={"search_path": "crawl"},
  26. )
  27. logger.info("Database connection pool created successfully.")
  28. except Exception as exc:
  29. logger.error("Failed to connect to database: %s", exc)
  30. raise
  31. async def close_pool() -> None:
  32. """Close the global asyncpg connection pool."""
  33. global _pool
  34. if _pool is not None:
  35. await _pool.close()
  36. _pool = None
  37. logger.info("Database connection pool closed.")
  38. def get_pool() -> asyncpg.Pool:
  39. """Return the global connection pool. Must be called after init_pool()."""
  40. if _pool is None:
  41. raise RuntimeError("Database pool is not initialised. Call init_pool() first.")
  42. return _pool