| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- from __future__ import annotations
- import logging
- from typing import Optional
- import asyncpg
- from app.config import settings
- logger = logging.getLogger(__name__)
- _pool: Optional[asyncpg.Pool] = None
- async def init_pool() -> None:
- """Create the global asyncpg connection pool."""
- global _pool
- logger.info(
- "Connecting to database %s@%s:%s/%s ...",
- settings.db_user,
- settings.db_host,
- settings.db_port,
- settings.db_name,
- )
- try:
- _pool = await asyncpg.create_pool(
- host=settings.db_host,
- port=settings.db_port,
- user=settings.db_user,
- password=settings.db_password,
- database=settings.db_name,
- server_settings={"search_path": "crawl"},
- )
- logger.info("Database connection pool created successfully.")
- except Exception as exc:
- logger.error("Failed to connect to database: %s", exc)
- raise
- async def close_pool() -> None:
- """Close the global asyncpg connection pool."""
- global _pool
- if _pool is not None:
- await _pool.close()
- _pool = None
- logger.info("Database connection pool closed.")
- def get_pool() -> asyncpg.Pool:
- """Return the global connection pool. Must be called after init_pool()."""
- if _pool is None:
- raise RuntimeError("Database pool is not initialised. Call init_pool() first.")
- return _pool
|