import aiomysql from contextlib import asynccontextmanager from typing import Dict,Optional, AsyncGenerator from foundation.observability.logger.loggering import server_logger from foundation.utils.common import handler_err from foundation.infrastructure.config import config_handler # 异步数据库连接池 class AsyncMySQLPool: _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not hasattr(self, '_pool'): self._pool = None self._initialized = False async def initialize(self): """初始化连接池""" try: self._pool = await aiomysql.create_pool( host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"), port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")), user=config_handler.get("mysql", "MYSQL_USER"), password=config_handler.get("mysql", "MYSQL_PASSWORD"), db=config_handler.get("mysql", "MYSQL_DB"), minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")), maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")), autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT") ) self._initialized = True server_logger.info("异步MySQL连接池初始化成功") except Exception as e: server_logger.error(f"连接池初始化失败: {e}") raise async def close(self): """关闭连接池""" if self._pool: self._pool.close() await self._pool.wait_closed() server_logger.info("异步MySQL连接池已关闭") @asynccontextmanager async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]: """获取数据库连接的上下文管理器""" if not self._initialized: # 如果没有初始化,使用默认配置初始化 await self.initialize() async with self._pool.acquire() as conn: try: yield conn except Exception as e: server_logger.error(f"数据库连接操作失败: {e}") raise @asynccontextmanager async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]: """获取游标的上下文管理器""" if connection: # 使用提供的连接 async with connection.cursor(aiomysql.DictCursor) as cursor: try: yield cursor except Exception as e: server_logger.error(f"游标操作失败: {e}") raise else: # 创建新连接 async with self.get_connection() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: try: yield cursor except Exception as e: server_logger.error(f"游标操作失败: {e}") raise # 全局数据库连接池实例 #async_db_pool = AsyncMySQLPool()