redis_sentinel_test_2.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import redis
  4. from redis.sentinel import Sentinel, SentinelConnectionPool
  5. from redis.connection import Connection, ConnectionPool
  6. import time
  7. import sys
  8. # ============================================
  9. # 关键配置:地址映射表
  10. # ============================================
  11. SENTINEL_EXTERNAL_IP='192.168.92.96'
  12. # Sentinel 端口(所有节点共用)
  13. SENTINEL_EXTERNAL_PORT = 30768 # Sentinel 服务的 NodePort
  14. # Redis 数据端口 - 每个节点需要独立的 NodePort
  15. # 这样可以根据 master 主机名动态连接到对应的端口
  16. REDIS_DATA_MAPPING = {
  17. # 内部主机名 -> (外部 IP, 外部端口)
  18. 'redis-node-0.redis-headless.redis.svc.cluster.local': ('192.168.92.96', 32259),
  19. 'redis-node-1.redis-headless.redis.svc.cluster.local': ('192.168.92.96', 32260),
  20. 'redis-node-2.redis-headless.redis.svc.cluster.local': ('192.168.92.96', 32261),
  21. }
  22. # 旧的映射表(备用)
  23. ADDRESS_MAPPING = REDIS_DATA_MAPPING
  24. NODE_PORT_MAPPING = REDIS_DATA_MAPPING
  25. EXTERNAL_MAPPING = REDIS_DATA_MAPPING
  26. # ============================================
  27. # 自定义 Connection 类:拦截并转换地址
  28. # ============================================
  29. class ExternalRedisConnection(Connection):
  30. """自定义连接类,自动转换内部地址到外部地址"""
  31. def __init__(self, host='localhost', port=6379, **kwargs):
  32. # 转换地址
  33. original_host = host
  34. original_port = port
  35. if host in EXTERNAL_MAPPING:
  36. new_host, new_port = EXTERNAL_MAPPING[host]
  37. print(f" [地址转换] {original_host}:{original_port} -> {new_host}:{new_port}")
  38. host = new_host
  39. port = new_port
  40. else:
  41. print(f" [地址保持] {host}:{port} (未在映射表中)")
  42. super().__init__(host=host, port=port, **kwargs)
  43. class ExternalConnectionPool(ConnectionPool):
  44. """自定义连接池,使用地址转换连接"""
  45. connection_class = ExternalRedisConnection
  46. class ExternalSentinelManagedConnectionPool(SentinelConnectionPool):
  47. """Sentinel 管理的连接池,支持地址转换"""
  48. connection_class = ExternalRedisConnection
  49. def __init__(self, service_name, sentinel_manager, **kwargs):
  50. # 保存参数
  51. self.service_name = service_name
  52. self.sentinel_manager = sentinel_manager
  53. self._kwargs = kwargs
  54. super().__init__(service_name, sentinel_manager, **kwargs)
  55. def get_master_address(self):
  56. """获取 Master 地址并转换"""
  57. master_addr = super().get_master_address()
  58. if master_addr:
  59. host, port = master_addr
  60. if host in EXTERNAL_MAPPING:
  61. new_addr = EXTERNAL_MAPPING[host]
  62. print(f" [Master 转换] {host}:{port} -> {new_addr[0]}:{new_addr[1]}")
  63. return new_addr
  64. return master_addr
  65. def rotate_slaves(self):
  66. """轮询 Slave 地址并转换"""
  67. slaves = super().rotate_slaves()
  68. converted_slaves = []
  69. for host, port in slaves:
  70. if host in EXTERNAL_MAPPING:
  71. new_host, new_port = EXTERNAL_MAPPING[host]
  72. print(f" [Slave 转换] {host}:{port} -> {new_host}:{new_port}")
  73. converted_slaves.append((new_host, new_port))
  74. else:
  75. converted_slaves.append((host, port))
  76. return converted_slaves or [self.get_master_address()]
  77. # ============================================
  78. # Redis 配置
  79. # ============================================
  80. REDIS_CONFIG = {
  81. 'sentinels': [
  82. (SENTINEL_EXTERNAL_IP, SENTINEL_EXTERNAL_PORT), # 哨兵 NodePort
  83. ],
  84. 'master_name': 'lqmaster',
  85. 'password': 'Lq123456!',
  86. 'socket_timeout': 5,
  87. 'socket_connect_timeout': 5
  88. }
  89. def get_master_external_address(sentinel):
  90. """获取 master 的外部可访问地址"""
  91. master_host, master_port = sentinel.discover_master(REDIS_CONFIG['master_name'])
  92. print(f"Sentinel 返回的 Master: {master_host}:{master_port}")
  93. # 转换到外部地址
  94. if master_host in REDIS_DATA_MAPPING:
  95. ext_host, ext_port = REDIS_DATA_MAPPING[master_host]
  96. print(f"转换后的外部地址:{ext_host}:{ext_port}")
  97. return ext_host, ext_port
  98. else:
  99. print(f"警告:{master_host} 不在映射表中")
  100. # 尝试从主机名提取节点名
  101. if 'redis-node-0' in master_host:
  102. return REDIS_DATA_MAPPING.get('redis-node-0.redis-headless.redis.svc.cluster.local', (None, None))
  103. elif 'redis-node-1' in master_host:
  104. return REDIS_DATA_MAPPING.get('redis-node-1.redis-headless.redis.svc.cluster.local', (None, None))
  105. elif 'redis-node-2' in master_host:
  106. return REDIS_DATA_MAPPING.get('redis-node-2.redis-headless.redis.svc.cluster.local', (None, None))
  107. return None, None
  108. def simple_test_with_conversion():
  109. """简化测试:动态获取 master 地址并连接"""
  110. print("\n" + "="*50)
  111. print("简化测试:动态主从切换支持")
  112. print("="*50)
  113. try:
  114. # 1. 连接 Sentinel
  115. sentinel = Sentinel(
  116. REDIS_CONFIG['sentinels'],
  117. sentinel_kwargs={
  118. 'password': REDIS_CONFIG['password'],
  119. 'socket_timeout': REDIS_CONFIG['socket_timeout']
  120. }
  121. )
  122. # 2. 获取 master 的外部地址
  123. ext_host, ext_port = get_master_external_address(sentinel)
  124. if not ext_host or not ext_port:
  125. print("无法获取 master 外部地址!")
  126. return None
  127. # 3. 打印所有节点信息
  128. print("\nSentinel 管理的所有节点:")
  129. try:
  130. slaves = sentinel.discover_slaves(REDIS_CONFIG['master_name'])
  131. for slave in slaves:
  132. print(f" 从节点:{slave[0]}:{slave[1]}")
  133. except Exception as e:
  134. print(f" 无法获取从节点列表:{e}")
  135. # 4. 直接连接到 master(使用转换后的地址)
  136. print(f"\n直接连接 Master: {ext_host}:{ext_port}")
  137. master_client = redis.Redis(
  138. host=ext_host,
  139. port=ext_port,
  140. password=REDIS_CONFIG['password'],
  141. socket_timeout=REDIS_CONFIG['socket_timeout'],
  142. decode_responses=False
  143. )
  144. # 5. 测试连接
  145. ping_result = master_client.ping()
  146. print(f"✓ 连接成功!PING: {ping_result}")
  147. # 6. 读写测试
  148. test_key = b"test:external:conversion"
  149. test_value = f"timestamp_{int(time.time())}".encode()
  150. master_client.set(test_key, test_value)
  151. print(f"✓ 写入成功:{test_key.decode()} = {test_value.decode()}")
  152. read_value = master_client.get(test_key)
  153. print(f"✓ 读取成功:{test_key.decode()} = {read_value.decode() if read_value else None}")
  154. # 7. 清理
  155. master_client.delete(test_key)
  156. print(f"✓ 清理成功")
  157. return master_client
  158. except Exception as e:
  159. print(f"✗ 测试失败:{e}")
  160. import traceback
  161. traceback.print_exc()
  162. return None
  163. def test_with_proxy_command():
  164. """使用 redis-py 的代理功能(另一种方案)"""
  165. print("\n" + "="*50)
  166. print("方案 2:使用连接参数转换")
  167. print("="*50)
  168. # 这种方案在创建连接时动态替换地址
  169. # 适用于需要保持 Sentinel 高可用特性的场景
  170. try:
  171. sentinel = Sentinel(REDIS_CONFIG['sentinels'])
  172. # 获取当前 Master
  173. master_host, master_port = sentinel.discover_master(REDIS_CONFIG['master_name'])
  174. print(f"原始 Master: {master_host}:{master_port}")
  175. # 转换
  176. if master_host in EXTERNAL_MAPPING:
  177. new_host, new_port = EXTERNAL_MAPPING[master_host]
  178. else:
  179. new_host, new_port = master_host, master_port
  180. # 创建直连客户端(不通过 Sentinel 管理,适合短期连接)
  181. client = redis.Redis(
  182. host=new_host,
  183. port=new_port,
  184. password=REDIS_CONFIG['password'],
  185. decode_responses=True
  186. )
  187. # 测试
  188. client.set("test:proxy", "works")
  189. value = client.get("test:proxy")
  190. print(f"✓ 代理测试成功:{value}")
  191. client.delete("test:proxy")
  192. return client
  193. except Exception as e:
  194. print(f"✗ 失败:{e}")
  195. return None
  196. def main():
  197. """主函数"""
  198. print("\n")
  199. print("="*60)
  200. print("Redis Sentinel 外部访问测试(地址转换版)")
  201. print("="*60)
  202. print(f"哨兵地址:{REDIS_CONFIG['sentinels']}")
  203. print(f"Master 名称:{REDIS_CONFIG['master_name']}")
  204. print(f"地址映射表:")
  205. for internal, external in EXTERNAL_MAPPING.items():
  206. print(f" {internal} -> {external[0]}:{external[1]}")
  207. print("="*60)
  208. # 方案 1:简化测试(推荐)
  209. client = simple_test_with_conversion()
  210. if not client:
  211. print("\n简化测试失败,尝试其他方案...")
  212. # 可以在这里添加其他测试方案
  213. print("\n" + "="*60)
  214. print("测试完成!")
  215. print("="*60)
  216. if __name__ == "__main__":
  217. main()