async_mysql_conn_pool.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import aiomysql
  2. from contextlib import asynccontextmanager
  3. from typing import Dict,Optional, AsyncGenerator
  4. def _get_mysql_logger():
  5. try:
  6. from foundation.observability.logger.loggering import server_logger
  7. return server_logger
  8. except ImportError:
  9. import logging
  10. return logging.getLogger(__name__)
  11. from foundation.utils.common import handler_err
  12. from foundation.infrastructure.config.config import config_handler
  13. # 异步数据库连接池
  14. class AsyncMySQLPool:
  15. _instance = None
  16. def __new__(cls, *args, **kwargs):
  17. if not cls._instance:
  18. cls._instance = super().__new__(cls)
  19. return cls._instance
  20. def __init__(self):
  21. if not hasattr(self, '_pool'):
  22. self._pool = None
  23. self._initialized = False
  24. async def initialize(self):
  25. """初始化连接池"""
  26. try:
  27. self._pool = await aiomysql.create_pool(
  28. host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"),
  29. port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")),
  30. user=config_handler.get("mysql", "MYSQL_USER"),
  31. password=config_handler.get("mysql", "MYSQL_PASSWORD"),
  32. db=config_handler.get("mysql", "MYSQL_DB"),
  33. minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")),
  34. maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")),
  35. autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT")
  36. )
  37. self._initialized = True
  38. _get_mysql_logger().info("异步MySQL连接池初始化成功")
  39. except Exception as e:
  40. _get_mysql_logger().error(f"连接池初始化失败: {e}")
  41. raise
  42. async def close(self):
  43. """关闭连接池"""
  44. if self._pool:
  45. self._pool.close()
  46. await self._pool.wait_closed()
  47. _get_mysql_logger().info("异步MySQL连接池已关闭")
  48. @asynccontextmanager
  49. async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]:
  50. """获取数据库连接的上下文管理器"""
  51. if not self._initialized:
  52. # 如果没有初始化,使用默认配置初始化
  53. await self.initialize()
  54. async with self._pool.acquire() as conn:
  55. try:
  56. yield conn
  57. except Exception as e:
  58. _get_mysql_logger().error(f"数据库连接操作失败: {e}")
  59. raise
  60. @asynccontextmanager
  61. async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]:
  62. """获取游标的上下文管理器"""
  63. if connection:
  64. # 使用提供的连接
  65. async with connection.cursor(aiomysql.DictCursor) as cursor:
  66. try:
  67. yield cursor
  68. except Exception as e:
  69. _get_mysql_logger().error(f"游标操作失败: {e}")
  70. raise
  71. else:
  72. # 创建新连接
  73. async with self.get_connection() as conn:
  74. async with conn.cursor(aiomysql.DictCursor) as cursor:
  75. try:
  76. yield cursor
  77. except Exception as e:
  78. _get_mysql_logger().error(f"游标操作失败: {e}")
  79. raise
  80. # 全局数据库连接池实例
  81. #async_db_pool = AsyncMySQLPool()