async_mysql_conn_pool.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import asyncio
  2. import aiomysql
  3. from contextlib import asynccontextmanager
  4. from typing import Dict,Optional, AsyncGenerator
  5. from foundation.observability.logger.loggering import server_logger
  6. from foundation.utils.common import handler_err
  7. from foundation.infrastructure.config import config_handler
  8. # 异步数据库连接池
  9. class AsyncMySQLPool:
  10. _instance = None
  11. def __new__(cls, *args, **kwargs):
  12. if not cls._instance:
  13. cls._instance = super().__new__(cls)
  14. return cls._instance
  15. def __init__(self):
  16. if not hasattr(self, '_pool'):
  17. self._pool = None
  18. self._initialized = False
  19. async def initialize(self, max_retries=3, retry_delay=2):
  20. """初始化连接池,支持重试"""
  21. last_error = None
  22. for attempt in range(1, max_retries + 1):
  23. try:
  24. server_logger.info(f"尝试初始化MySQL连接池 (第{attempt}/{max_retries}次)...")
  25. self._pool = await aiomysql.create_pool(
  26. host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"),
  27. port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")),
  28. user=config_handler.get("mysql", "MYSQL_USER"),
  29. password=config_handler.get("mysql", "MYSQL_PASSWORD"),
  30. db=config_handler.get("mysql", "MYSQL_DB"),
  31. minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")),
  32. maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")),
  33. autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT"),
  34. connect_timeout=int(config_handler.get("mysql", "MYSQL_CONNECT_TIMEOUT", "30"))
  35. )
  36. self._initialized = True
  37. server_logger.info("异步MySQL连接池初始化成功")
  38. return
  39. except Exception as e:
  40. last_error = e
  41. server_logger.warning(f"连接池初始化失败 (第{attempt}次): {e}")
  42. if attempt < max_retries:
  43. server_logger.info(f"{retry_delay}秒后重试...")
  44. await asyncio.sleep(retry_delay)
  45. server_logger.error(f"连接池初始化失败,已重试{max_retries}次: {last_error}")
  46. raise last_error
  47. async def close(self):
  48. """关闭连接池"""
  49. if self._pool:
  50. self._pool.close()
  51. await self._pool.wait_closed()
  52. server_logger.info("异步MySQL连接池已关闭")
  53. @asynccontextmanager
  54. async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]:
  55. """获取数据库连接的上下文管理器"""
  56. if not self._initialized:
  57. # 如果没有初始化,使用默认配置初始化
  58. await self.initialize()
  59. async with self._pool.acquire() as conn:
  60. try:
  61. yield conn
  62. except Exception as e:
  63. server_logger.error(f"数据库连接操作失败: {e}")
  64. raise
  65. @asynccontextmanager
  66. async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]:
  67. """获取游标的上下文管理器"""
  68. if connection:
  69. # 使用提供的连接
  70. async with connection.cursor(aiomysql.DictCursor) as cursor:
  71. try:
  72. yield cursor
  73. except Exception as e:
  74. server_logger.error(f"游标操作失败: {e}")
  75. raise
  76. else:
  77. # 创建新连接
  78. async with self.get_connection() as conn:
  79. async with conn.cursor(aiomysql.DictCursor) as cursor:
  80. try:
  81. yield cursor
  82. except Exception as e:
  83. server_logger.error(f"游标操作失败: {e}")
  84. raise
  85. # 全局数据库连接池实例
  86. #async_db_pool = AsyncMySQLPool()