#!/usr/bin/env python3 # -*- coding: utf-8 -*- import redis from redis.sentinel import Sentinel, SentinelConnectionPool from redis.connection import Connection, ConnectionPool import time import sys # ============================================ # 关键配置:地址映射表 # ============================================ SENTINEL_EXTERNAL_IP='192.168.92.96' # Sentinel 端口(所有节点共用) SENTINEL_EXTERNAL_PORT = 30768 # Sentinel 服务的 NodePort # Redis 数据端口 - 每个节点需要独立的 NodePort # 这样可以根据 master 主机名动态连接到对应的端口 REDIS_DATA_MAPPING = { # 内部主机名 -> (外部 IP, 外部端口) 'redis-node-0.redis-headless.redis.svc.cluster.local': ('192.168.92.96', 32259), 'redis-node-1.redis-headless.redis.svc.cluster.local': ('192.168.92.96', 32260), 'redis-node-2.redis-headless.redis.svc.cluster.local': ('192.168.92.96', 32261), } # 旧的映射表(备用) ADDRESS_MAPPING = REDIS_DATA_MAPPING NODE_PORT_MAPPING = REDIS_DATA_MAPPING EXTERNAL_MAPPING = REDIS_DATA_MAPPING # ============================================ # 自定义 Connection 类:拦截并转换地址 # ============================================ class ExternalRedisConnection(Connection): """自定义连接类,自动转换内部地址到外部地址""" def __init__(self, host='localhost', port=6379, **kwargs): # 转换地址 original_host = host original_port = port if host in EXTERNAL_MAPPING: new_host, new_port = EXTERNAL_MAPPING[host] print(f" [地址转换] {original_host}:{original_port} -> {new_host}:{new_port}") host = new_host port = new_port else: print(f" [地址保持] {host}:{port} (未在映射表中)") super().__init__(host=host, port=port, **kwargs) class ExternalConnectionPool(ConnectionPool): """自定义连接池,使用地址转换连接""" connection_class = ExternalRedisConnection class ExternalSentinelManagedConnectionPool(SentinelConnectionPool): """Sentinel 管理的连接池,支持地址转换""" connection_class = ExternalRedisConnection def __init__(self, service_name, sentinel_manager, **kwargs): # 保存参数 self.service_name = service_name self.sentinel_manager = sentinel_manager self._kwargs = kwargs super().__init__(service_name, sentinel_manager, **kwargs) def get_master_address(self): """获取 Master 地址并转换""" master_addr = super().get_master_address() if master_addr: host, port = master_addr if host in EXTERNAL_MAPPING: new_addr = EXTERNAL_MAPPING[host] print(f" [Master 转换] {host}:{port} -> {new_addr[0]}:{new_addr[1]}") return new_addr return master_addr def rotate_slaves(self): """轮询 Slave 地址并转换""" slaves = super().rotate_slaves() converted_slaves = [] for host, port in slaves: if host in EXTERNAL_MAPPING: new_host, new_port = EXTERNAL_MAPPING[host] print(f" [Slave 转换] {host}:{port} -> {new_host}:{new_port}") converted_slaves.append((new_host, new_port)) else: converted_slaves.append((host, port)) return converted_slaves or [self.get_master_address()] # ============================================ # Redis 配置 # ============================================ REDIS_CONFIG = { 'sentinels': [ (SENTINEL_EXTERNAL_IP, SENTINEL_EXTERNAL_PORT), # 哨兵 NodePort ], 'master_name': 'lqmaster', 'password': 'Lq123456!', 'socket_timeout': 5, 'socket_connect_timeout': 5 } def get_master_external_address(sentinel): """获取 master 的外部可访问地址""" master_host, master_port = sentinel.discover_master(REDIS_CONFIG['master_name']) print(f"Sentinel 返回的 Master: {master_host}:{master_port}") # 转换到外部地址 if master_host in REDIS_DATA_MAPPING: ext_host, ext_port = REDIS_DATA_MAPPING[master_host] print(f"转换后的外部地址:{ext_host}:{ext_port}") return ext_host, ext_port else: print(f"警告:{master_host} 不在映射表中") # 尝试从主机名提取节点名 if 'redis-node-0' in master_host: return REDIS_DATA_MAPPING.get('redis-node-0.redis-headless.redis.svc.cluster.local', (None, None)) elif 'redis-node-1' in master_host: return REDIS_DATA_MAPPING.get('redis-node-1.redis-headless.redis.svc.cluster.local', (None, None)) elif 'redis-node-2' in master_host: return REDIS_DATA_MAPPING.get('redis-node-2.redis-headless.redis.svc.cluster.local', (None, None)) return None, None def simple_test_with_conversion(): """简化测试:动态获取 master 地址并连接""" print("\n" + "="*50) print("简化测试:动态主从切换支持") print("="*50) try: # 1. 连接 Sentinel sentinel = Sentinel( REDIS_CONFIG['sentinels'], sentinel_kwargs={ 'password': REDIS_CONFIG['password'], 'socket_timeout': REDIS_CONFIG['socket_timeout'] } ) # 2. 获取 master 的外部地址 ext_host, ext_port = get_master_external_address(sentinel) if not ext_host or not ext_port: print("无法获取 master 外部地址!") return None # 3. 打印所有节点信息 print("\nSentinel 管理的所有节点:") try: slaves = sentinel.discover_slaves(REDIS_CONFIG['master_name']) for slave in slaves: print(f" 从节点:{slave[0]}:{slave[1]}") except Exception as e: print(f" 无法获取从节点列表:{e}") # 4. 直接连接到 master(使用转换后的地址) print(f"\n直接连接 Master: {ext_host}:{ext_port}") master_client = redis.Redis( host=ext_host, port=ext_port, password=REDIS_CONFIG['password'], socket_timeout=REDIS_CONFIG['socket_timeout'], decode_responses=False ) # 5. 测试连接 ping_result = master_client.ping() print(f"✓ 连接成功!PING: {ping_result}") # 6. 读写测试 test_key = b"test:external:conversion" test_value = f"timestamp_{int(time.time())}".encode() master_client.set(test_key, test_value) print(f"✓ 写入成功:{test_key.decode()} = {test_value.decode()}") read_value = master_client.get(test_key) print(f"✓ 读取成功:{test_key.decode()} = {read_value.decode() if read_value else None}") # 7. 清理 master_client.delete(test_key) print(f"✓ 清理成功") return master_client except Exception as e: print(f"✗ 测试失败:{e}") import traceback traceback.print_exc() return None def test_with_proxy_command(): """使用 redis-py 的代理功能(另一种方案)""" print("\n" + "="*50) print("方案 2:使用连接参数转换") print("="*50) # 这种方案在创建连接时动态替换地址 # 适用于需要保持 Sentinel 高可用特性的场景 try: sentinel = Sentinel(REDIS_CONFIG['sentinels']) # 获取当前 Master master_host, master_port = sentinel.discover_master(REDIS_CONFIG['master_name']) print(f"原始 Master: {master_host}:{master_port}") # 转换 if master_host in EXTERNAL_MAPPING: new_host, new_port = EXTERNAL_MAPPING[master_host] else: new_host, new_port = master_host, master_port # 创建直连客户端(不通过 Sentinel 管理,适合短期连接) client = redis.Redis( host=new_host, port=new_port, password=REDIS_CONFIG['password'], decode_responses=True ) # 测试 client.set("test:proxy", "works") value = client.get("test:proxy") print(f"✓ 代理测试成功:{value}") client.delete("test:proxy") return client except Exception as e: print(f"✗ 失败:{e}") return None def main(): """主函数""" print("\n") print("="*60) print("Redis Sentinel 外部访问测试(地址转换版)") print("="*60) print(f"哨兵地址:{REDIS_CONFIG['sentinels']}") print(f"Master 名称:{REDIS_CONFIG['master_name']}") print(f"地址映射表:") for internal, external in EXTERNAL_MAPPING.items(): print(f" {internal} -> {external[0]}:{external[1]}") print("="*60) # 方案 1:简化测试(推荐) client = simple_test_with_conversion() if not client: print("\n简化测试失败,尝试其他方案...") # 可以在这里添加其他测试方案 print("\n" + "="*60) print("测试完成!") print("="*60) if __name__ == "__main__": main()