| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Redis TTL不同步Bug修复验证测试
- 验证修复后的store_file_info函数能够同步更新meta和content的TTL
- """
- import asyncio
- import json
- import time
- import sys
- from pathlib import Path
- root_dir = Path(__file__).parent.parent
- sys.path.append(str(root_dir))
- from foundation.utils.redis_utils import store_file_info, get_file_info
- from foundation.base.redis_connection import RedisConnectionFactory
- async def test_ttl_sync_fix():
- """测试TTL同步修复效果"""
- print("[TEST] 开始验证TTL同步修复效果")
- try:
- redis_store = await RedisConnectionFactory.get_redis_store()
- file_id = "test_ttl_fix_file_456"
- file_info = {
- "file_name": "修复测试文档.pdf",
- "file_size": 2048,
- "file_content": "这是用于验证修复效果的测试文件内容。" * 100,
- "callback_task_id": "original_task",
- "upload_time": int(time.time())
- }
- print("\n=== 步骤1: 存储完整文件信息 ===")
- # 存储文件信息,TTL=8秒
- success = await store_file_info(file_id, file_info, expire_seconds=8)
- assert success, "存储文件信息失败"
- # 验证初始TTL同步
- meta_ttl = await redis_store.ttl(f"meta:{file_id}")
- content_ttl = await redis_store.ttl(f"content:{file_id}")
- print(f"初始TTL - meta: {meta_ttl}s, content: {content_ttl}s")
- assert abs(meta_ttl - content_ttl) <= 1, "初始TTL应该同步"
- print("\n=== 步骤2: 等待2秒后更新callback_task_id ===")
- # 等待2秒,让TTL减少
- await asyncio.sleep(2)
- # 更新callback_task_id,触发TTL同步修复逻辑
- update_success = await store_file_info(
- file_id,
- {'callback_task_id': 'updated_task'},
- expire_seconds=15 # 设置更长的TTL
- )
- assert update_success, "更新callback_task_id失败"
- print("\n=== 步骤3: 验证TTL同步修复效果 ===")
- # 检查TTL是否同步更新
- new_meta_ttl = await redis_store.ttl(f"meta:{file_id}")
- new_content_ttl = await redis_store.ttl(f"content:{file_id}")
- print(f"更新后TTL - meta: {new_meta_ttl}s, content: {new_content_ttl}s")
- # 验证修复效果:两个TTL都应该被重置为15秒左右
- assert new_meta_ttl > 12, f"meta的TTL应该被重置为约15秒,实际为{new_meta_ttl}"
- assert new_content_ttl > 12, f"content的TTL也应该被重置为约15秒,实际为{new_content_ttl}"
- assert abs(new_meta_ttl - new_content_ttl) <= 2, "修复后TTL应该保持同步"
- print("[PASS] TTL同步修复验证成功!")
- print("\n=== 步骤4: 验证文件内容获取正常 ===")
- # 等待一段时间后验证文件内容仍然可以获取
- await asyncio.sleep(3)
- retrieved_info = await get_file_info(file_id, include_content=True)
- assert retrieved_info is not None, "修复后应该能正常获取文件信息"
- assert retrieved_info['file_content'] is not None, "修复后文件内容应该存在"
- assert retrieved_info['callback_task_id'] == 'updated_task', "callback_task_id应该被正确更新"
- print(f"[PASS] 文件信息获取正常,内容长度: {len(retrieved_info['file_content'])}")
- print("\n=== 步骤5: 验证长期稳定性 ===")
- # 再等待一段时间,验证TTL继续同步
- await asyncio.sleep(3)
- final_meta_ttl = await redis_store.ttl(f"meta:{file_id}")
- final_content_ttl = await redis_store.ttl(f"content:{file_id}")
- print(f"最终TTL - meta: {final_meta_ttl}s, content: {final_content_ttl}s")
- # TTL差值应该仍然很小
- assert abs(final_meta_ttl - final_content_ttl) <= 2, "TTL应该保持同步"
- # 文件内容应该仍然可以获取
- final_info = await get_file_info(file_id, include_content=True)
- assert final_info is not None, "最终应该能获取文件信息"
- assert final_info['file_content'] is not None, "最终文件内容应该存在"
- print("[PASS] 长期稳定性验证成功!")
- print("\n=== 步骤6: 对比修复前后行为 ===")
- # 清理测试文件
- await redis_store.delete(f"meta:{file_id}", f"content:{file_id}")
- # 模拟修复前的错误行为(如果需要对比)
- print("修复前行为: meta和content TTL不同步,导致间歇性获取失败")
- print("修复后行为: meta和content TTL保持同步,确保稳定获取")
- print("[PASS] 修复效果验证完成!")
- except Exception as e:
- print(f"[FAIL] 测试失败: {e}")
- import traceback
- traceback.print_exc()
- finally:
- await RedisConnectionFactory.close_all()
- async def test_edge_cases():
- """测试边界情况"""
- print("\n[TEST] 开始测试边界情况")
- try:
- redis_store = await RedisConnectionFactory.get_redis_store()
- # 测试1: 只有meta没有content的情况
- file_id_1 = "test_edge_meta_only"
- await redis_store.setex(f"meta:{file_id_1}", 10, json.dumps({"test": "meta_only"}))
- success = await store_file_info(file_id_1, {'callback_task_id': 'edge_test'}, expire_seconds=15)
- assert success, "只有meta的情况下更新应该成功"
- meta_ttl = await redis_store.ttl(f"meta:{file_id_1}")
- assert meta_ttl > 12, "meta的TTL应该被更新"
- print("[PASS] 只有meta的边界情况测试通过")
- await redis_store.delete(f"meta:{file_id_1}")
- # 测试2: 新文件存储的正常行为
- file_id_2 = "test_edge_new_file"
- new_file_info = {
- "file_name": "新文件.pdf",
- "file_content": "新文件内容",
- "callback_task_id": "new_task"
- }
- success = await store_file_info(file_id_2, new_file_info, expire_seconds=10)
- assert success, "新文件存储应该成功"
- meta_ttl = await redis_store.ttl(f"meta:{file_id_2}")
- content_ttl = await redis_store.ttl(f"content:{file_id_2}")
- assert abs(meta_ttl - content_ttl) <= 1, "新文件的TTL应该同步"
- print("[PASS] 新文件存储的边界情况测试通过")
- await redis_store.delete(f"meta:{file_id_2}", f"content:{file_id_2}")
- print("[PASS] 所有边界情况测试通过!")
- except Exception as e:
- print(f"[FAIL] 边界情况测试失败: {e}")
- import traceback
- traceback.print_exc()
- finally:
- await RedisConnectionFactory.close_all()
- async def run_fix_validation_tests():
- """运行修复验证测试"""
- print("=" * 60)
- print("Redis TTL不同步Bug修复验证测试")
- print("=" * 60)
- await test_ttl_sync_fix()
- await test_edge_cases()
- print("\n" + "=" * 60)
- print("[SUCCESS] 所有修复验证测试通过!")
- print("=" * 60)
- if __name__ == "__main__":
- asyncio.run(run_fix_validation_tests())
|