import asyncio import aiomysql from contextlib import asynccontextmanager from typing import Dict,Optional, AsyncGenerator from foundation.observability.logger.loggering import server_logger from foundation.utils.common import handler_err from foundation.infrastructure.config import config_handler # 异步数据库连接池 class AsyncMySQLPool: _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not hasattr(self, '_pool'): self._pool = None self._initialized = False async def initialize(self, max_retries=3, retry_delay=2): """初始化连接池,支持重试""" last_error = None for attempt in range(1, max_retries + 1): try: server_logger.info(f"尝试初始化MySQL连接池 (第{attempt}/{max_retries}次)...") self._pool = await aiomysql.create_pool( host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"), port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")), user=config_handler.get("mysql", "MYSQL_USER"), password=config_handler.get("mysql", "MYSQL_PASSWORD"), db=config_handler.get("mysql", "MYSQL_DB"), minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")), maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")), autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT"), connect_timeout=int(config_handler.get("mysql", "MYSQL_CONNECT_TIMEOUT", "30")) ) self._initialized = True server_logger.info("异步MySQL连接池初始化成功") return except Exception as e: last_error = e server_logger.warning(f"连接池初始化失败 (第{attempt}次): {e}") if attempt < max_retries: server_logger.info(f"{retry_delay}秒后重试...") await asyncio.sleep(retry_delay) server_logger.error(f"连接池初始化失败,已重试{max_retries}次: {last_error}") raise last_error async def close(self): """关闭连接池""" if self._pool: self._pool.close() await self._pool.wait_closed() server_logger.info("异步MySQL连接池已关闭") @asynccontextmanager async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]: """获取数据库连接的上下文管理器""" if not self._initialized: # 如果没有初始化,使用默认配置初始化 await self.initialize() async with self._pool.acquire() as conn: try: yield conn except Exception as e: server_logger.error(f"数据库连接操作失败: {e}") raise @asynccontextmanager async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]: """获取游标的上下文管理器""" if connection: # 使用提供的连接 async with connection.cursor(aiomysql.DictCursor) as cursor: try: yield cursor except Exception as e: server_logger.error(f"游标操作失败: {e}") raise else: # 创建新连接 async with self.get_connection() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: try: yield cursor except Exception as e: server_logger.error(f"游标操作失败: {e}") raise # 全局数据库连接池实例 #async_db_pool = AsyncMySQLPool()