#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Redis TTL不同步Bug修复验证测试 验证修复后的store_file_info函数能够同步更新meta和content的TTL """ import asyncio import json import time import sys from pathlib import Path root_dir = Path(__file__).parent.parent sys.path.append(str(root_dir)) from foundation.utils.redis_utils import store_file_info, get_file_info from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory async def test_ttl_sync_fix(): """测试TTL同步修复效果""" print("[TEST] 开始验证TTL同步修复效果") try: redis_store = await RedisConnectionFactory.get_redis_store() file_id = "test_ttl_fix_file_456" file_info = { "file_name": "修复测试文档.pdf", "file_size": 2048, "file_content": "这是用于验证修复效果的测试文件内容。" * 100, "callback_task_id": "original_task", "upload_time": int(time.time()) } print("\n=== 步骤1: 存储完整文件信息 ===") # 存储文件信息,TTL=8秒 success = await store_file_info(file_id, file_info, expire_seconds=8) assert success, "存储文件信息失败" # 验证初始TTL同步 meta_ttl = await redis_store.ttl(f"meta:{file_id}") content_ttl = await redis_store.ttl(f"content:{file_id}") print(f"初始TTL - meta: {meta_ttl}s, content: {content_ttl}s") assert abs(meta_ttl - content_ttl) <= 1, "初始TTL应该同步" print("\n=== 步骤2: 等待2秒后更新callback_task_id ===") # 等待2秒,让TTL减少 await asyncio.sleep(2) # 更新callback_task_id,触发TTL同步修复逻辑 update_success = await store_file_info( file_id, {'callback_task_id': 'updated_task'}, expire_seconds=15 # 设置更长的TTL ) assert update_success, "更新callback_task_id失败" print("\n=== 步骤3: 验证TTL同步修复效果 ===") # 检查TTL是否同步更新 new_meta_ttl = await redis_store.ttl(f"meta:{file_id}") new_content_ttl = await redis_store.ttl(f"content:{file_id}") print(f"更新后TTL - meta: {new_meta_ttl}s, content: {new_content_ttl}s") # 验证修复效果:两个TTL都应该被重置为15秒左右 assert new_meta_ttl > 12, f"meta的TTL应该被重置为约15秒,实际为{new_meta_ttl}" assert new_content_ttl > 12, f"content的TTL也应该被重置为约15秒,实际为{new_content_ttl}" assert abs(new_meta_ttl - new_content_ttl) <= 2, "修复后TTL应该保持同步" print("[PASS] TTL同步修复验证成功!") print("\n=== 步骤4: 验证文件内容获取正常 ===") # 等待一段时间后验证文件内容仍然可以获取 await asyncio.sleep(3) retrieved_info = await get_file_info(file_id, include_content=True) assert retrieved_info is not None, "修复后应该能正常获取文件信息" assert retrieved_info['file_content'] is not None, "修复后文件内容应该存在" assert retrieved_info['callback_task_id'] == 'updated_task', "callback_task_id应该被正确更新" print(f"[PASS] 文件信息获取正常,内容长度: {len(retrieved_info['file_content'])}") print("\n=== 步骤5: 验证长期稳定性 ===") # 再等待一段时间,验证TTL继续同步 await asyncio.sleep(3) final_meta_ttl = await redis_store.ttl(f"meta:{file_id}") final_content_ttl = await redis_store.ttl(f"content:{file_id}") print(f"最终TTL - meta: {final_meta_ttl}s, content: {final_content_ttl}s") # TTL差值应该仍然很小 assert abs(final_meta_ttl - final_content_ttl) <= 2, "TTL应该保持同步" # 文件内容应该仍然可以获取 final_info = await get_file_info(file_id, include_content=True) assert final_info is not None, "最终应该能获取文件信息" assert final_info['file_content'] is not None, "最终文件内容应该存在" print("[PASS] 长期稳定性验证成功!") print("\n=== 步骤6: 对比修复前后行为 ===") # 清理测试文件 await redis_store.delete(f"meta:{file_id}", f"content:{file_id}") # 模拟修复前的错误行为(如果需要对比) print("修复前行为: meta和content TTL不同步,导致间歇性获取失败") print("修复后行为: meta和content TTL保持同步,确保稳定获取") print("[PASS] 修复效果验证完成!") except Exception as e: print(f"[FAIL] 测试失败: {e}") import traceback traceback.print_exc() finally: await RedisConnectionFactory.close_all() async def test_edge_cases(): """测试边界情况""" print("\n[TEST] 开始测试边界情况") try: redis_store = await RedisConnectionFactory.get_redis_store() # 测试1: 只有meta没有content的情况 file_id_1 = "test_edge_meta_only" await redis_store.setex(f"meta:{file_id_1}", 10, json.dumps({"test": "meta_only"})) success = await store_file_info(file_id_1, {'callback_task_id': 'edge_test'}, expire_seconds=15) assert success, "只有meta的情况下更新应该成功" meta_ttl = await redis_store.ttl(f"meta:{file_id_1}") assert meta_ttl > 12, "meta的TTL应该被更新" print("[PASS] 只有meta的边界情况测试通过") await redis_store.delete(f"meta:{file_id_1}") # 测试2: 新文件存储的正常行为 file_id_2 = "test_edge_new_file" new_file_info = { "file_name": "新文件.pdf", "file_content": "新文件内容", "callback_task_id": "new_task" } success = await store_file_info(file_id_2, new_file_info, expire_seconds=10) assert success, "新文件存储应该成功" meta_ttl = await redis_store.ttl(f"meta:{file_id_2}") content_ttl = await redis_store.ttl(f"content:{file_id_2}") assert abs(meta_ttl - content_ttl) <= 1, "新文件的TTL应该同步" print("[PASS] 新文件存储的边界情况测试通过") await redis_store.delete(f"meta:{file_id_2}", f"content:{file_id_2}") print("[PASS] 所有边界情况测试通过!") except Exception as e: print(f"[FAIL] 边界情况测试失败: {e}") import traceback traceback.print_exc() finally: await RedisConnectionFactory.close_all() async def run_fix_validation_tests(): """运行修复验证测试""" print("=" * 60) print("Redis TTL不同步Bug修复验证测试") print("=" * 60) await test_ttl_sync_fix() await test_edge_cases() print("\n" + "=" * 60) print("[SUCCESS] 所有修复验证测试通过!") print("=" * 60) if __name__ == "__main__": asyncio.run(run_fix_validation_tests())