| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- import asyncio
- import aiomysql
- from contextlib import asynccontextmanager
- from typing import Dict,Optional, AsyncGenerator
- from foundation.observability.logger.loggering import server_logger
- from foundation.utils.common import handler_err
- from foundation.infrastructure.config import config_handler
- # 异步数据库连接池
- class AsyncMySQLPool:
- _instance = None
- def __new__(cls, *args, **kwargs):
- if not cls._instance:
- cls._instance = super().__new__(cls)
- return cls._instance
- def __init__(self):
- if not hasattr(self, '_pool'):
- self._pool = None
- self._initialized = False
- async def initialize(self, max_retries=3, retry_delay=2):
- """初始化连接池,支持重试"""
- last_error = None
- for attempt in range(1, max_retries + 1):
- try:
- server_logger.info(f"尝试初始化MySQL连接池 (第{attempt}/{max_retries}次)...")
- self._pool = await aiomysql.create_pool(
- host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"),
- port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")),
- user=config_handler.get("mysql", "MYSQL_USER"),
- password=config_handler.get("mysql", "MYSQL_PASSWORD"),
- db=config_handler.get("mysql", "MYSQL_DB"),
- minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")),
- maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")),
- autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT"),
- connect_timeout=int(config_handler.get("mysql", "MYSQL_CONNECT_TIMEOUT", "30"))
- )
- self._initialized = True
- server_logger.info("异步MySQL连接池初始化成功")
- return
- except Exception as e:
- last_error = e
- server_logger.warning(f"连接池初始化失败 (第{attempt}次): {e}")
- if attempt < max_retries:
- server_logger.info(f"{retry_delay}秒后重试...")
- await asyncio.sleep(retry_delay)
- server_logger.error(f"连接池初始化失败,已重试{max_retries}次: {last_error}")
- raise last_error
-
- async def close(self):
- """关闭连接池"""
- if self._pool:
- self._pool.close()
- await self._pool.wait_closed()
- server_logger.info("异步MySQL连接池已关闭")
-
- @asynccontextmanager
- async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]:
- """获取数据库连接的上下文管理器"""
- if not self._initialized:
- # 如果没有初始化,使用默认配置初始化
- await self.initialize()
-
- async with self._pool.acquire() as conn:
- try:
- yield conn
- except Exception as e:
- server_logger.error(f"数据库连接操作失败: {e}")
- raise
-
- @asynccontextmanager
- async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]:
- """获取游标的上下文管理器"""
- if connection:
- # 使用提供的连接
- async with connection.cursor(aiomysql.DictCursor) as cursor:
- try:
- yield cursor
- except Exception as e:
- server_logger.error(f"游标操作失败: {e}")
- raise
- else:
- # 创建新连接
- async with self.get_connection() as conn:
- async with conn.cursor(aiomysql.DictCursor) as cursor:
- try:
- yield cursor
- except Exception as e:
- server_logger.error(f"游标操作失败: {e}")
- raise
- # 全局数据库连接池实例
- #async_db_pool = AsyncMySQLPool()
|