| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- import aiomysql
- from contextlib import asynccontextmanager
- from typing import Dict,Optional, AsyncGenerator
- from foundation.observability.logger.loggering import server_logger
- from foundation.utils.common import handler_err
- from foundation.infrastructure.config import config_handler
- # 异步数据库连接池
- class AsyncMySQLPool:
- _instance = None
-
- def __new__(cls, *args, **kwargs):
- if not cls._instance:
- cls._instance = super().__new__(cls)
- return cls._instance
-
- def __init__(self):
- if not hasattr(self, '_pool'):
- self._pool = None
- self._initialized = False
-
- async def initialize(self):
- """初始化连接池"""
- try:
-
- self._pool = await aiomysql.create_pool(
- host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"),
- port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")),
- user=config_handler.get("mysql", "MYSQL_USER"),
- password=config_handler.get("mysql", "MYSQL_PASSWORD"),
- db=config_handler.get("mysql", "MYSQL_DB"),
- minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")),
- maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")),
- autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT")
- )
- self._initialized = True
- server_logger.info("异步MySQL连接池初始化成功")
- except Exception as e:
- server_logger.error(f"连接池初始化失败: {e}")
- raise
-
- async def close(self):
- """关闭连接池"""
- if self._pool:
- self._pool.close()
- await self._pool.wait_closed()
- server_logger.info("异步MySQL连接池已关闭")
-
- @asynccontextmanager
- async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]:
- """获取数据库连接的上下文管理器"""
- if not self._initialized:
- # 如果没有初始化,使用默认配置初始化
- await self.initialize()
-
- async with self._pool.acquire() as conn:
- try:
- yield conn
- except Exception as e:
- server_logger.error(f"数据库连接操作失败: {e}")
- raise
-
- @asynccontextmanager
- async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]:
- """获取游标的上下文管理器"""
- if connection:
- # 使用提供的连接
- async with connection.cursor(aiomysql.DictCursor) as cursor:
- try:
- yield cursor
- except Exception as e:
- server_logger.error(f"游标操作失败: {e}")
- raise
- else:
- # 创建新连接
- async with self.get_connection() as conn:
- async with conn.cursor(aiomysql.DictCursor) as cursor:
- try:
- yield cursor
- except Exception as e:
- server_logger.error(f"游标操作失败: {e}")
- raise
- # 全局数据库连接池实例
- #async_db_pool = AsyncMySQLPool()
|