async_mysql_conn_pool.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import asyncio
  2. import aiomysql
  3. from contextlib import asynccontextmanager
  4. from typing import Dict,Optional, AsyncGenerator
  5. from foundation.observability.logger.loggering import server_logger
  6. from foundation.utils.common import handler_err
  7. from foundation.infrastructure.config import config_handler
  8. # 异步数据库连接池
  9. class AsyncMySQLPool:
  10. _instance = None
  11. def __new__(cls, *args, **kwargs):
  12. if not cls._instance:
  13. cls._instance = super().__new__(cls)
  14. return cls._instance
  15. def __init__(self):
  16. if not hasattr(self, '_pool'):
  17. self._pool = None
  18. self._initialized = False
  19. async def initialize(self, max_retries=3, retry_delay=2):
  20. """初始化连接池,支持重试。已初始化且连接池健康时直接返回。"""
  21. if self._initialized and self._pool and not self._pool._closed:
  22. return
  23. # 关闭旧池(如果有)
  24. if self._pool and not self._pool._closed:
  25. server_logger.info("关闭旧连接池...")
  26. self._pool.close()
  27. await self._pool.wait_closed()
  28. self._pool = None
  29. self._initialized = False
  30. last_error = None
  31. for attempt in range(1, max_retries + 1):
  32. try:
  33. server_logger.info(f"尝试初始化MySQL连接池 (第{attempt}/{max_retries}次)...")
  34. self._pool = await aiomysql.create_pool(
  35. host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"),
  36. port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")),
  37. user=config_handler.get("mysql", "MYSQL_USER"),
  38. password=config_handler.get("mysql", "MYSQL_PASSWORD"),
  39. db=config_handler.get("mysql", "MYSQL_DB"),
  40. minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")),
  41. maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")),
  42. autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT"),
  43. connect_timeout=int(config_handler.get("mysql", "MYSQL_CONNECT_TIMEOUT", "30")),
  44. pool_recycle=1800
  45. )
  46. self._initialized = True
  47. server_logger.info("异步MySQL连接池初始化成功")
  48. return
  49. except Exception as e:
  50. last_error = e
  51. server_logger.warning(f"连接池初始化失败 (第{attempt}次): {e}")
  52. if attempt < max_retries:
  53. server_logger.info(f"{retry_delay}秒后重试...")
  54. await asyncio.sleep(retry_delay)
  55. server_logger.error(f"连接池初始化失败,已重试{max_retries}次: {last_error}")
  56. raise last_error
  57. async def close(self):
  58. """关闭连接池"""
  59. if self._pool:
  60. self._pool.close()
  61. await self._pool.wait_closed()
  62. server_logger.info("异步MySQL连接池已关闭")
  63. @asynccontextmanager
  64. async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]:
  65. """获取数据库连接的上下文管理器,带连接健康检查"""
  66. if not self._initialized or not self._pool or self._pool._closed:
  67. await self.initialize()
  68. async with self._pool.acquire() as conn:
  69. try:
  70. await conn.ping()
  71. except Exception:
  72. server_logger.warning("连接已失效,尝试重新初始化连接池")
  73. await self.initialize()
  74. async with self._pool.acquire() as new_conn:
  75. try:
  76. yield new_conn
  77. except Exception as e:
  78. server_logger.error(f"数据库连接操作失败: {e}")
  79. raise
  80. return
  81. try:
  82. yield conn
  83. except Exception as e:
  84. server_logger.error(f"数据库连接操作失败: {e}")
  85. raise
  86. @asynccontextmanager
  87. async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]:
  88. """获取游标的上下文管理器"""
  89. if connection:
  90. # 使用提供的连接
  91. async with connection.cursor(aiomysql.DictCursor) as cursor:
  92. try:
  93. yield cursor
  94. except Exception as e:
  95. server_logger.error(f"游标操作失败: {e}")
  96. raise
  97. else:
  98. # 创建新连接
  99. async with self.get_connection() as conn:
  100. async with conn.cursor(aiomysql.DictCursor) as cursor:
  101. try:
  102. yield cursor
  103. except Exception as e:
  104. server_logger.error(f"游标操作失败: {e}")
  105. raise
  106. # 全局数据库连接池实例
  107. #async_db_pool = AsyncMySQLPool()