import asyncio import aiomysql from contextlib import asynccontextmanager from typing import Dict,Optional, AsyncGenerator from foundation.observability.logger.loggering import server_logger from foundation.utils.common import handler_err from foundation.infrastructure.config import config_handler # 异步数据库连接池 class AsyncMySQLPool: _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if not hasattr(self, '_pool'): self._pool = None self._initialized = False async def initialize(self, max_retries=3, retry_delay=2): """初始化连接池,支持重试。已初始化且连接池健康时直接返回。""" if self._initialized and self._pool and not self._pool._closed: if self._pool._loop and not self._pool._loop.is_closed(): return server_logger.info("事件循环已关闭,丢弃旧连接池引用...") self._pool = None self._initialized = False # 关闭旧池(如果有,且事件循环仍存活) if self._pool and not self._pool._closed: if self._pool._loop and self._pool._loop.is_closed(): server_logger.info("旧连接池的事件循环已关闭,丢弃引用...") self._pool = None self._initialized = False else: server_logger.info("关闭旧连接池...") self._pool.close() await self._pool.wait_closed() self._pool = None self._initialized = False last_error = None for attempt in range(1, max_retries + 1): try: server_logger.info(f"尝试初始化MySQL连接池 (第{attempt}/{max_retries}次)...") self._pool = await aiomysql.create_pool( host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"), port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")), user=config_handler.get("mysql", "MYSQL_USER"), password=config_handler.get("mysql", "MYSQL_PASSWORD"), db=config_handler.get("mysql", "MYSQL_DB"), minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")), maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")), autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT"), connect_timeout=int(config_handler.get("mysql", "MYSQL_CONNECT_TIMEOUT", "30")), pool_recycle=1800 ) self._initialized = True server_logger.info("异步MySQL连接池初始化成功") return except Exception as e: last_error = e server_logger.warning(f"连接池初始化失败 (第{attempt}次): {e}") if attempt < max_retries: server_logger.info(f"{retry_delay}秒后重试...") await asyncio.sleep(retry_delay) server_logger.error(f"连接池初始化失败,已重试{max_retries}次: {last_error}") raise last_error async def close(self): """关闭连接池""" if self._pool: self._pool.close() await self._pool.wait_closed() server_logger.info("异步MySQL连接池已关闭") @asynccontextmanager async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]: """获取数据库连接的上下文管理器,带连接健康检查""" if not self._initialized or not self._pool or self._pool._closed: await self.initialize() async with self._pool.acquire() as conn: try: await conn.ping() except Exception: server_logger.warning("连接已失效,尝试重新初始化连接池") await self.initialize() async with self._pool.acquire() as new_conn: try: yield new_conn except Exception as e: server_logger.error(f"数据库连接操作失败: {e}") raise return try: yield conn except Exception as e: server_logger.error(f"数据库连接操作失败: {e}") raise @asynccontextmanager async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]: """获取游标的上下文管理器""" if connection: # 使用提供的连接 async with connection.cursor(aiomysql.DictCursor) as cursor: try: yield cursor except Exception as e: server_logger.error(f"游标操作失败: {e}") raise else: # 创建新连接 async with self.get_connection() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: try: yield cursor except Exception as e: server_logger.error(f"游标操作失败: {e}") raise # 全局数据库连接池实例 #async_db_pool = AsyncMySQLPool()