| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import redis
- from redis.sentinel import Sentinel, SentinelConnectionPool
- from redis.connection import Connection, ConnectionPool
- import time
- import sys
- # ============================================
- # 关键配置:地址映射表
- # ============================================
- SENTINEL_EXTERNAL_IP='192.168.92.96'
- # Sentinel 端口(所有节点共用)
- SENTINEL_EXTERNAL_PORT = 30768 # Sentinel 服务的 NodePort
- # Redis 数据端口 - 每个节点需要独立的 NodePort
- # 这样可以根据 master 主机名动态连接到对应的端口
- REDIS_DATA_MAPPING = {
- # 内部主机名 -> (外部 IP, 外部端口)
- 'redis-node-0.redis-headless.redis.svc.cluster.local': ('192.168.92.96', 32259),
- 'redis-node-1.redis-headless.redis.svc.cluster.local': ('192.168.92.96', 32260),
- 'redis-node-2.redis-headless.redis.svc.cluster.local': ('192.168.92.96', 32261),
- }
- # 旧的映射表(备用)
- ADDRESS_MAPPING = REDIS_DATA_MAPPING
- NODE_PORT_MAPPING = REDIS_DATA_MAPPING
- EXTERNAL_MAPPING = REDIS_DATA_MAPPING
- # ============================================
- # 自定义 Connection 类:拦截并转换地址
- # ============================================
- class ExternalRedisConnection(Connection):
- """自定义连接类,自动转换内部地址到外部地址"""
- def __init__(self, host='localhost', port=6379, **kwargs):
- # 转换地址
- original_host = host
- original_port = port
- if host in EXTERNAL_MAPPING:
- new_host, new_port = EXTERNAL_MAPPING[host]
- print(f" [地址转换] {original_host}:{original_port} -> {new_host}:{new_port}")
- host = new_host
- port = new_port
- else:
- print(f" [地址保持] {host}:{port} (未在映射表中)")
- super().__init__(host=host, port=port, **kwargs)
- class ExternalConnectionPool(ConnectionPool):
- """自定义连接池,使用地址转换连接"""
- connection_class = ExternalRedisConnection
- class ExternalSentinelManagedConnectionPool(SentinelConnectionPool):
- """Sentinel 管理的连接池,支持地址转换"""
- connection_class = ExternalRedisConnection
- def __init__(self, service_name, sentinel_manager, **kwargs):
- # 保存参数
- self.service_name = service_name
- self.sentinel_manager = sentinel_manager
- self._kwargs = kwargs
- super().__init__(service_name, sentinel_manager, **kwargs)
- def get_master_address(self):
- """获取 Master 地址并转换"""
- master_addr = super().get_master_address()
- if master_addr:
- host, port = master_addr
- if host in EXTERNAL_MAPPING:
- new_addr = EXTERNAL_MAPPING[host]
- print(f" [Master 转换] {host}:{port} -> {new_addr[0]}:{new_addr[1]}")
- return new_addr
- return master_addr
- def rotate_slaves(self):
- """轮询 Slave 地址并转换"""
- slaves = super().rotate_slaves()
- converted_slaves = []
- for host, port in slaves:
- if host in EXTERNAL_MAPPING:
- new_host, new_port = EXTERNAL_MAPPING[host]
- print(f" [Slave 转换] {host}:{port} -> {new_host}:{new_port}")
- converted_slaves.append((new_host, new_port))
- else:
- converted_slaves.append((host, port))
- return converted_slaves or [self.get_master_address()]
- # ============================================
- # Redis 配置
- # ============================================
- REDIS_CONFIG = {
- 'sentinels': [
- (SENTINEL_EXTERNAL_IP, SENTINEL_EXTERNAL_PORT), # 哨兵 NodePort
- ],
- 'master_name': 'lqmaster',
- 'password': 'Lq123456!',
- 'socket_timeout': 5,
- 'socket_connect_timeout': 5
- }
- def get_master_external_address(sentinel):
- """获取 master 的外部可访问地址"""
- master_host, master_port = sentinel.discover_master(REDIS_CONFIG['master_name'])
- print(f"Sentinel 返回的 Master: {master_host}:{master_port}")
- # 转换到外部地址
- if master_host in REDIS_DATA_MAPPING:
- ext_host, ext_port = REDIS_DATA_MAPPING[master_host]
- print(f"转换后的外部地址:{ext_host}:{ext_port}")
- return ext_host, ext_port
- else:
- print(f"警告:{master_host} 不在映射表中")
- # 尝试从主机名提取节点名
- if 'redis-node-0' in master_host:
- return REDIS_DATA_MAPPING.get('redis-node-0.redis-headless.redis.svc.cluster.local', (None, None))
- elif 'redis-node-1' in master_host:
- return REDIS_DATA_MAPPING.get('redis-node-1.redis-headless.redis.svc.cluster.local', (None, None))
- elif 'redis-node-2' in master_host:
- return REDIS_DATA_MAPPING.get('redis-node-2.redis-headless.redis.svc.cluster.local', (None, None))
- return None, None
- def simple_test_with_conversion():
- """简化测试:动态获取 master 地址并连接"""
- print("\n" + "="*50)
- print("简化测试:动态主从切换支持")
- print("="*50)
- try:
- # 1. 连接 Sentinel
- sentinel = Sentinel(
- REDIS_CONFIG['sentinels'],
- sentinel_kwargs={
- 'password': REDIS_CONFIG['password'],
- 'socket_timeout': REDIS_CONFIG['socket_timeout']
- }
- )
- # 2. 获取 master 的外部地址
- ext_host, ext_port = get_master_external_address(sentinel)
- if not ext_host or not ext_port:
- print("无法获取 master 外部地址!")
- return None
- # 3. 打印所有节点信息
- print("\nSentinel 管理的所有节点:")
- try:
- slaves = sentinel.discover_slaves(REDIS_CONFIG['master_name'])
- for slave in slaves:
- print(f" 从节点:{slave[0]}:{slave[1]}")
- except Exception as e:
- print(f" 无法获取从节点列表:{e}")
- # 4. 直接连接到 master(使用转换后的地址)
- print(f"\n直接连接 Master: {ext_host}:{ext_port}")
- master_client = redis.Redis(
- host=ext_host,
- port=ext_port,
- password=REDIS_CONFIG['password'],
- socket_timeout=REDIS_CONFIG['socket_timeout'],
- decode_responses=False
- )
- # 5. 测试连接
- ping_result = master_client.ping()
- print(f"✓ 连接成功!PING: {ping_result}")
- # 6. 读写测试
- test_key = b"test:external:conversion"
- test_value = f"timestamp_{int(time.time())}".encode()
- master_client.set(test_key, test_value)
- print(f"✓ 写入成功:{test_key.decode()} = {test_value.decode()}")
- read_value = master_client.get(test_key)
- print(f"✓ 读取成功:{test_key.decode()} = {read_value.decode() if read_value else None}")
- # 7. 清理
- master_client.delete(test_key)
- print(f"✓ 清理成功")
- return master_client
- except Exception as e:
- print(f"✗ 测试失败:{e}")
- import traceback
- traceback.print_exc()
- return None
- def test_with_proxy_command():
- """使用 redis-py 的代理功能(另一种方案)"""
- print("\n" + "="*50)
- print("方案 2:使用连接参数转换")
- print("="*50)
- # 这种方案在创建连接时动态替换地址
- # 适用于需要保持 Sentinel 高可用特性的场景
- try:
- sentinel = Sentinel(REDIS_CONFIG['sentinels'])
- # 获取当前 Master
- master_host, master_port = sentinel.discover_master(REDIS_CONFIG['master_name'])
- print(f"原始 Master: {master_host}:{master_port}")
- # 转换
- if master_host in EXTERNAL_MAPPING:
- new_host, new_port = EXTERNAL_MAPPING[master_host]
- else:
- new_host, new_port = master_host, master_port
- # 创建直连客户端(不通过 Sentinel 管理,适合短期连接)
- client = redis.Redis(
- host=new_host,
- port=new_port,
- password=REDIS_CONFIG['password'],
- decode_responses=True
- )
- # 测试
- client.set("test:proxy", "works")
- value = client.get("test:proxy")
- print(f"✓ 代理测试成功:{value}")
- client.delete("test:proxy")
- return client
- except Exception as e:
- print(f"✗ 失败:{e}")
- return None
- def main():
- """主函数"""
- print("\n")
- print("="*60)
- print("Redis Sentinel 外部访问测试(地址转换版)")
- print("="*60)
- print(f"哨兵地址:{REDIS_CONFIG['sentinels']}")
- print(f"Master 名称:{REDIS_CONFIG['master_name']}")
- print(f"地址映射表:")
- for internal, external in EXTERNAL_MAPPING.items():
- print(f" {internal} -> {external[0]}:{external[1]}")
- print("="*60)
- # 方案 1:简化测试(推荐)
- client = simple_test_with_conversion()
- if not client:
- print("\n简化测试失败,尝试其他方案...")
- # 可以在这里添加其他测试方案
- print("\n" + "="*60)
- print("测试完成!")
- print("="*60)
- if __name__ == "__main__":
- main()
|