async_mysql_conn_pool.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import asyncio
  2. import aiomysql
  3. from contextlib import asynccontextmanager
  4. from typing import Dict,Optional, AsyncGenerator
  5. from foundation.observability.logger.loggering import server_logger
  6. from foundation.utils.common import handler_err
  7. from foundation.infrastructure.config import config_handler
  8. # 异步数据库连接池
  9. class AsyncMySQLPool:
  10. _instance = None
  11. def __new__(cls, *args, **kwargs):
  12. if not cls._instance:
  13. cls._instance = super().__new__(cls)
  14. return cls._instance
  15. def __init__(self):
  16. if not hasattr(self, '_pool'):
  17. self._pool = None
  18. self._initialized = False
  19. async def initialize(self, max_retries=3, retry_delay=2):
  20. """初始化连接池,支持重试。已初始化且连接池健康时直接返回。"""
  21. if self._initialized and self._pool and not self._pool._closed:
  22. if self._pool._loop and not self._pool._loop.is_closed():
  23. return
  24. server_logger.info("事件循环已关闭,丢弃旧连接池引用...")
  25. self._pool = None
  26. self._initialized = False
  27. # 关闭旧池(如果有,且事件循环仍存活)
  28. if self._pool and not self._pool._closed:
  29. if self._pool._loop and self._pool._loop.is_closed():
  30. server_logger.info("旧连接池的事件循环已关闭,丢弃引用...")
  31. self._pool = None
  32. self._initialized = False
  33. else:
  34. server_logger.info("关闭旧连接池...")
  35. self._pool.close()
  36. await self._pool.wait_closed()
  37. self._pool = None
  38. self._initialized = False
  39. last_error = None
  40. for attempt in range(1, max_retries + 1):
  41. try:
  42. server_logger.info(f"尝试初始化MySQL连接池 (第{attempt}/{max_retries}次)...")
  43. self._pool = await aiomysql.create_pool(
  44. host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"),
  45. port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")),
  46. user=config_handler.get("mysql", "MYSQL_USER"),
  47. password=config_handler.get("mysql", "MYSQL_PASSWORD"),
  48. db=config_handler.get("mysql", "MYSQL_DB"),
  49. minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")),
  50. maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")),
  51. autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT"),
  52. connect_timeout=int(config_handler.get("mysql", "MYSQL_CONNECT_TIMEOUT", "30")),
  53. pool_recycle=1800
  54. )
  55. self._initialized = True
  56. server_logger.info("异步MySQL连接池初始化成功")
  57. return
  58. except Exception as e:
  59. last_error = e
  60. server_logger.warning(f"连接池初始化失败 (第{attempt}次): {e}")
  61. if attempt < max_retries:
  62. server_logger.info(f"{retry_delay}秒后重试...")
  63. await asyncio.sleep(retry_delay)
  64. server_logger.error(f"连接池初始化失败,已重试{max_retries}次: {last_error}")
  65. raise last_error
  66. async def close(self):
  67. """关闭连接池"""
  68. if self._pool:
  69. self._pool.close()
  70. await self._pool.wait_closed()
  71. server_logger.info("异步MySQL连接池已关闭")
  72. @asynccontextmanager
  73. async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]:
  74. """获取数据库连接的上下文管理器,带连接健康检查"""
  75. if not self._initialized or not self._pool or self._pool._closed:
  76. await self.initialize()
  77. async with self._pool.acquire() as conn:
  78. try:
  79. await conn.ping()
  80. except Exception:
  81. server_logger.warning("连接已失效,尝试重新初始化连接池")
  82. await self.initialize()
  83. async with self._pool.acquire() as new_conn:
  84. try:
  85. yield new_conn
  86. except Exception as e:
  87. server_logger.error(f"数据库连接操作失败: {e}")
  88. raise
  89. return
  90. try:
  91. yield conn
  92. except Exception as e:
  93. server_logger.error(f"数据库连接操作失败: {e}")
  94. raise
  95. @asynccontextmanager
  96. async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]:
  97. """获取游标的上下文管理器"""
  98. if connection:
  99. # 使用提供的连接
  100. async with connection.cursor(aiomysql.DictCursor) as cursor:
  101. try:
  102. yield cursor
  103. except Exception as e:
  104. server_logger.error(f"游标操作失败: {e}")
  105. raise
  106. else:
  107. # 创建新连接
  108. async with self.get_connection() as conn:
  109. async with conn.cursor(aiomysql.DictCursor) as cursor:
  110. try:
  111. yield cursor
  112. except Exception as e:
  113. server_logger.error(f"游标操作失败: {e}")
  114. raise
  115. # 全局数据库连接池实例
  116. #async_db_pool = AsyncMySQLPool()