async_mysql_conn_pool.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import asyncio
  2. import aiomysql
  3. from contextlib import asynccontextmanager
  4. from typing import Dict,Optional, AsyncGenerator
  5. from foundation.observability.logger.loggering import server_logger
  6. from foundation.utils.common import handler_err
  7. from foundation.infrastructure.config import config_handler
  8. # 异步数据库连接池
  9. class AsyncMySQLPool:
  10. _instance = None
  11. def __new__(cls, *args, **kwargs):
  12. if not cls._instance:
  13. cls._instance = super().__new__(cls)
  14. return cls._instance
  15. def __init__(self):
  16. if not hasattr(self, '_pool'):
  17. self._pool = None
  18. self._initialized = False
  19. async def initialize(self, max_retries=3, retry_delay=2):
  20. """初始化连接池,支持重试。已初始化且连接池健康时直接返回。"""
  21. if self._initialized and self._pool and not self._pool._closed:
  22. if self._pool._loop and not self._pool._loop.is_closed():
  23. return
  24. server_logger.info("事件循环已关闭,重新初始化连接池...")
  25. self._pool.close()
  26. await self._pool.wait_closed()
  27. self._pool = None
  28. self._initialized = False
  29. # 关闭旧池(如果有)
  30. if self._pool and not self._pool._closed:
  31. server_logger.info("关闭旧连接池...")
  32. self._pool.close()
  33. await self._pool.wait_closed()
  34. self._pool = None
  35. self._initialized = False
  36. last_error = None
  37. for attempt in range(1, max_retries + 1):
  38. try:
  39. server_logger.info(f"尝试初始化MySQL连接池 (第{attempt}/{max_retries}次)...")
  40. self._pool = await aiomysql.create_pool(
  41. host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"),
  42. port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")),
  43. user=config_handler.get("mysql", "MYSQL_USER"),
  44. password=config_handler.get("mysql", "MYSQL_PASSWORD"),
  45. db=config_handler.get("mysql", "MYSQL_DB"),
  46. minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")),
  47. maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")),
  48. autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT"),
  49. connect_timeout=int(config_handler.get("mysql", "MYSQL_CONNECT_TIMEOUT", "30")),
  50. pool_recycle=1800
  51. )
  52. self._initialized = True
  53. server_logger.info("异步MySQL连接池初始化成功")
  54. return
  55. except Exception as e:
  56. last_error = e
  57. server_logger.warning(f"连接池初始化失败 (第{attempt}次): {e}")
  58. if attempt < max_retries:
  59. server_logger.info(f"{retry_delay}秒后重试...")
  60. await asyncio.sleep(retry_delay)
  61. server_logger.error(f"连接池初始化失败,已重试{max_retries}次: {last_error}")
  62. raise last_error
  63. async def close(self):
  64. """关闭连接池"""
  65. if self._pool:
  66. self._pool.close()
  67. await self._pool.wait_closed()
  68. server_logger.info("异步MySQL连接池已关闭")
  69. @asynccontextmanager
  70. async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]:
  71. """获取数据库连接的上下文管理器,带连接健康检查"""
  72. if not self._initialized or not self._pool or self._pool._closed:
  73. await self.initialize()
  74. async with self._pool.acquire() as conn:
  75. try:
  76. await conn.ping()
  77. except Exception:
  78. server_logger.warning("连接已失效,尝试重新初始化连接池")
  79. await self.initialize()
  80. async with self._pool.acquire() as new_conn:
  81. try:
  82. yield new_conn
  83. except Exception as e:
  84. server_logger.error(f"数据库连接操作失败: {e}")
  85. raise
  86. return
  87. try:
  88. yield conn
  89. except Exception as e:
  90. server_logger.error(f"数据库连接操作失败: {e}")
  91. raise
  92. @asynccontextmanager
  93. async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]:
  94. """获取游标的上下文管理器"""
  95. if connection:
  96. # 使用提供的连接
  97. async with connection.cursor(aiomysql.DictCursor) as cursor:
  98. try:
  99. yield cursor
  100. except Exception as e:
  101. server_logger.error(f"游标操作失败: {e}")
  102. raise
  103. else:
  104. # 创建新连接
  105. async with self.get_connection() as conn:
  106. async with conn.cursor(aiomysql.DictCursor) as cursor:
  107. try:
  108. yield cursor
  109. except Exception as e:
  110. server_logger.error(f"游标操作失败: {e}")
  111. raise
  112. # 全局数据库连接池实例
  113. #async_db_pool = AsyncMySQLPool()