from __future__ import annotations import logging from typing import Optional import asyncpg from app.config import settings logger = logging.getLogger(__name__) _pool: Optional[asyncpg.Pool] = None async def init_pool() -> None: """Create the global asyncpg connection pool.""" global _pool logger.info( "Connecting to database %s@%s:%s/%s ...", settings.db_user, settings.db_host, settings.db_port, settings.db_name, ) try: _pool = await asyncpg.create_pool( host=settings.db_host, port=settings.db_port, user=settings.db_user, password=settings.db_password, database=settings.db_name, server_settings={"search_path": "crawl"}, ) logger.info("Database connection pool created successfully.") except Exception as exc: logger.error("Failed to connect to database: %s", exc) raise async def close_pool() -> None: """Close the global asyncpg connection pool.""" global _pool if _pool is not None: await _pool.close() _pool = None logger.info("Database connection pool closed.") def get_pool() -> asyncpg.Pool: """Return the global connection pool. Must be called after init_pool().""" if _pool is None: raise RuntimeError("Database pool is not initialised. Call init_pool() first.") return _pool