async_mysql_conn_pool.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import aiomysql
  2. from contextlib import asynccontextmanager
  3. from typing import Dict,Optional, AsyncGenerator
  4. from foundation.logger.loggering import server_logger
  5. from foundation.utils.common import handler_err
  6. from foundation.base.config import config_handler
  7. # 异步数据库连接池
  8. class AsyncMySQLPool:
  9. _instance = None
  10. def __new__(cls, *args, **kwargs):
  11. if not cls._instance:
  12. cls._instance = super().__new__(cls)
  13. return cls._instance
  14. def __init__(self):
  15. if not hasattr(self, '_pool'):
  16. self._pool = None
  17. self._initialized = False
  18. async def initialize(self):
  19. """初始化连接池"""
  20. try:
  21. self._pool = await aiomysql.create_pool(
  22. host=config_handler.get("mysql", "MYSQL_HOST" , "localhost"),
  23. port=int(config_handler.get("mysql", "MYSQL_PORT" , "3306")),
  24. user=config_handler.get("mysql", "MYSQL_USER"),
  25. password=config_handler.get("mysql", "MYSQL_PASSWORD"),
  26. db=config_handler.get("mysql", "MYSQL_DB"),
  27. minsize=int(config_handler.get("mysql", "MYSQL_MIN_SIZE" , "1")),
  28. maxsize=int(config_handler.get("mysql", "MYSQL_MAX_SIZE" , "2")),
  29. autocommit=config_handler.get("mysql", "MYSQL_AUTO_COMMIT")
  30. )
  31. self._initialized = True
  32. server_logger.info("异步MySQL连接池初始化成功")
  33. except Exception as e:
  34. server_logger.error(f"连接池初始化失败: {e}")
  35. raise
  36. async def close(self):
  37. """关闭连接池"""
  38. if self._pool:
  39. self._pool.close()
  40. await self._pool.wait_closed()
  41. server_logger.info("异步MySQL连接池已关闭")
  42. @asynccontextmanager
  43. async def get_connection(self) -> AsyncGenerator[aiomysql.Connection, None]:
  44. """获取数据库连接的上下文管理器"""
  45. if not self._initialized:
  46. # 如果没有初始化,使用默认配置初始化
  47. await self.initialize()
  48. async with self._pool.acquire() as conn:
  49. try:
  50. yield conn
  51. except Exception as e:
  52. server_logger.error(f"数据库连接操作失败: {e}")
  53. raise
  54. @asynccontextmanager
  55. async def get_cursor(self, connection: Optional[aiomysql.Connection] = None) -> AsyncGenerator[aiomysql.Cursor, None]:
  56. """获取游标的上下文管理器"""
  57. if connection:
  58. # 使用提供的连接
  59. async with connection.cursor(aiomysql.DictCursor) as cursor:
  60. try:
  61. yield cursor
  62. except Exception as e:
  63. server_logger.error(f"游标操作失败: {e}")
  64. raise
  65. else:
  66. # 创建新连接
  67. async with self.get_connection() as conn:
  68. async with conn.cursor(aiomysql.DictCursor) as cursor:
  69. try:
  70. yield cursor
  71. except Exception as e:
  72. server_logger.error(f"游标操作失败: {e}")
  73. raise
  74. # 全局数据库连接池实例
  75. #async_db_pool = AsyncMySQLPool()