#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Redis TTL不同步Bug单元测试 验证问题:store_file_info函数只更新meta的TTL,不更新content的TTL, 导致两个键的过期时间不同步,造成间歇性缓存失败。 """ import asyncio import json import time import pytest from typing import Dict, Any # 导入被测试的模块 import sys from pathlib import Path root_dir = Path(__file__).parent.parent sys.path.append(str(root_dir)) from foundation.utils.redis_utils import store_file_info, get_file_info from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory class TestRedisTTLSync: """Redis TTL同步问题测试类""" @pytest.fixture async def redis_store(self): """Redis连接fixture""" redis_store = await RedisConnectionFactory.get_redis_store() yield redis_store # 测试后清理 await RedisConnectionFactory.close_all() @pytest.fixture async def test_file_data(self): """测试文件数据fixture""" file_id = "test_ttl_bug_file_123" file_info = { "file_name": "测试文档.pdf", "file_size": 1024, "file_content": "这是一个测试文件内容,用于验证TTL不同步问题。" * 50, "callback_task_id": "original_task_id", "upload_time": int(time.time()) } return file_id, file_info async def test_ttl_sync_bug_reproduction(self, redis_store, test_file_data): """ 测试1: 复现TTL不同步Bug 1. 存储完整文件信息 2. 更新callback_task_id(触发TTL不同步) 3. 等待原始content过期 4. 验证获取文件内容失败 """ file_id, file_info = test_file_data print("\n=== 开始测试TTL不同步Bug复现 ===") # 步骤1: 存储完整文件信息,TTL=5秒(便于测试) print("1. 存储完整文件信息...") success = await store_file_info(file_id, file_info, expire_seconds=5) assert success, "存储文件信息失败" # 验证初始状态 print("2. 验证初始状态...") retrieved_info = await get_file_info(file_id, include_content=True) assert retrieved_info is not None, "初始获取文件信息失败" assert retrieved_info['file_content'] is not None, "初始文件内容为空" print(f" [OK] 文件内容长度: {len(retrieved_info['file_content'])}") # 检查键的TTL meta_ttl = await redis_store.ttl(f"meta:{file_id}") content_ttl = await redis_store.ttl(f"content:{file_id}") print(f" [OK] 初始TTL - meta: {meta_ttl}s, content: {content_ttl}s") assert abs(meta_ttl - content_ttl) <= 1, "初始TTL应该基本同步" # 步骤3: 等待3秒,让TTL减少一些 print("3. 等待3秒...") await asyncio.sleep(3) # 步骤4: 更新callback_task_id(触发Bug) print("4. 更新callback_task_id(触发TTL不同步Bug)...") update_success = await store_file_info( file_id, {'callback_task_id': 'updated_task_id'}, expire_seconds=10 # 设置更长的TTL ) assert update_success, "更新callback_task_id失败" # 步骤5: 检查TTL不同步问题 print("5. 检查TTL不同步问题...") new_meta_ttl = await redis_store.ttl(f"meta:{file_id}") new_content_ttl = await redis_store.ttl(f"content:{file_id}") print(f" [OK] 更新后TTL - meta: {new_meta_ttl}s, content: {new_content_ttl}s") # 验证TTL不同步:meta的TTL应该重置为10秒,content应该继续倒计时(约2秒) assert new_meta_ttl > 8, f"meta的TTL应该被重置为约10秒,实际为{new_meta_ttl}" assert new_content_ttl < 5, f"content的TTL应该继续倒计时,实际为{new_content_ttl}" assert abs(new_meta_ttl - new_content_ttl) > 3, "TTL应该明显不同步" print(" [PASS] TTL不同步Bug已复现!") # 步骤6: 等待content过期 print("6. 等待content过期...") await asyncio.sleep(new_content_ttl + 1) # 步骤7: 验证获取文件内容失败 print("7. 验证content过期后获取失败...") final_info = await get_file_info(file_id, include_content=True) # 根据当前bug逻辑,这里应该返回None(因为content缺失) assert final_info is None, f"content过期后应该返回None,实际返回: {final_info}" print(" [PASS] Bug验证成功:content过期后无法获取文件信息") # 清理测试数据 await redis_store.delete(f"meta:{file_id}", f"content:{file_id}") async def test_only_meta_retrieval_works(self, redis_store, test_file_data): """ 测试2: 验证不包含content的获取仍然正常工作 即使content过期,meta信息应该仍然可以获取 """ file_id, file_info = test_file_data print("\n=== 测试meta单独获取功能 ===") # 存储文件信息 await store_file_info(file_id, file_info, expire_seconds=3) # 更新callback_task_id await store_file_info(file_id, {'callback_task_id': 'updated'}, expire_seconds=10) # 等待content过期 await asyncio.sleep(4) # 测试不包含content的获取 meta_only_info = await get_file_info(file_id, include_content=False) assert meta_only_info is not None, "meta信息应该可以正常获取" assert meta_only_info['callback_task_id'] == 'updated', "callback_task_id应该被正确更新" assert 'file_content' not in meta_only_info, "不应该包含file_content" print(" [PASS] meta信息单独获取正常") # 清理 await redis_store.delete(f"meta:{file_id}") async def test_current_bug_detection_in_logs(self, redis_store, test_file_data): """ 测试3: 模拟日志中观察到的实际场景 复现日志中的时间序列: - 11:41:53 文件信息存储 - 11:42:01 获取失败(8秒后) """ file_id, file_info = test_file_data print("\n=== 模拟日志中的实际场景 ===") # 步骤1: 模拟文件上传(11:41:53) print("1. 模拟文件上传(11:41:53)...") upload_time = time.time() await store_file_info(file_id, file_info, expire_seconds=10) # 10秒TTL # 步骤2: 模拟一些处理时间 await asyncio.sleep(2) # 步骤3: 模拟callback_task_id更新(可能发生在中间某个时间点) print("3. 模拟callback_task_id更新...") await store_file_info(file_id, {'callback_task_id': 'updated_task'}, expire_seconds=15) # 步骤4: 等待到11:42:01(约8秒后) elapsed_time = time.time() - upload_time wait_time = 8 - elapsed_time if wait_time > 0: print(f"4. 等待到11:42:01(还需要{wait_time:.1f}秒)...") await asyncio.sleep(wait_time) # 步骤5: 验证获取失败(如日志所示) print("5. 验证11:42:01时的获取失败...") failed_info = await get_file_info(file_id, include_content=True) assert failed_info is None, "模拟11:42:01时应该获取失败" print(" [PASS] 成功复现日志中的失败场景") # 清理 await redis_store.delete(f"meta:{file_id}") async def run_ttl_bug_tests(): """运行TTL Bug测试""" print("[TEST] 开始Redis TTL不同步Bug测试") test_instance = TestRedisTTLSync() try: # 获取redis连接 redis_store = await RedisConnectionFactory.get_redis_store() # 准备测试数据 file_id = "test_ttl_bug_file_123" file_info = { "file_name": "测试文档.pdf", "file_size": 1024, "file_content": "这是一个测试文件内容,用于验证TTL不同步问题。" * 50, "callback_task_id": "original_task_id", "upload_time": int(time.time()) } test_file_data = (file_id, file_info) # 运行测试 await test_instance.test_ttl_sync_bug_reproduction(redis_store, test_file_data) await test_instance.test_only_meta_retrieval_works(redis_store, test_file_data) await test_instance.test_current_bug_detection_in_logs(redis_store, test_file_data) print("\n[PASS] 所有TTL Bug测试通过!") except Exception as e: print(f"\n[FAIL] 测试失败: {e}") import traceback traceback.print_exc() finally: await RedisConnectionFactory.close_all() if __name__ == "__main__": # 直接运行测试 asyncio.run(run_ttl_bug_tests())