| 12345678910111213141516171819202122232425262728293031323334353637 |
- from __future__ import annotations
- from typing import Optional
- import asyncpg
- from app.config import settings
- _pool: Optional[asyncpg.Pool] = None
- async def init_pool() -> None:
- """Create the global asyncpg connection pool."""
- global _pool
- _pool = await asyncpg.create_pool(
- host=settings.db_host,
- port=settings.db_port,
- user=settings.db_user,
- password=settings.db_password,
- database=settings.db_name,
- server_settings={"search_path": "crawl"},
- )
- async def close_pool() -> None:
- """Close the global asyncpg connection pool."""
- global _pool
- if _pool is not None:
- await _pool.close()
- _pool = None
- def get_pool() -> asyncpg.Pool:
- """Return the global connection pool. Must be called after init_pool()."""
- if _pool is None:
- raise RuntimeError("Database pool is not initialised. Call init_pool() first.")
- return _pool
|