| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Redis TTL不同步Bug单元测试
- 验证问题:store_file_info函数只更新meta的TTL,不更新content的TTL,
- 导致两个键的过期时间不同步,造成间歇性缓存失败。
- """
- import asyncio
- import json
- import time
- import pytest
- from typing import Dict, Any
- # 导入被测试的模块
- import sys
- from pathlib import Path
- root_dir = Path(__file__).parent.parent
- sys.path.append(str(root_dir))
- from foundation.utils.redis_utils import store_file_info, get_file_info
- from foundation.infrastructure.cache.redis_connection import RedisConnectionFactory
- class TestRedisTTLSync:
- """Redis TTL同步问题测试类"""
- @pytest.fixture
- async def redis_store(self):
- """Redis连接fixture"""
- redis_store = await RedisConnectionFactory.get_redis_store()
- yield redis_store
- # 测试后清理
- await RedisConnectionFactory.close_all()
- @pytest.fixture
- async def test_file_data(self):
- """测试文件数据fixture"""
- file_id = "test_ttl_bug_file_123"
- file_info = {
- "file_name": "测试文档.pdf",
- "file_size": 1024,
- "file_content": "这是一个测试文件内容,用于验证TTL不同步问题。" * 50,
- "callback_task_id": "original_task_id",
- "upload_time": int(time.time())
- }
- return file_id, file_info
- async def test_ttl_sync_bug_reproduction(self, redis_store, test_file_data):
- """
- 测试1: 复现TTL不同步Bug
- 1. 存储完整文件信息
- 2. 更新callback_task_id(触发TTL不同步)
- 3. 等待原始content过期
- 4. 验证获取文件内容失败
- """
- file_id, file_info = test_file_data
- print("\n=== 开始测试TTL不同步Bug复现 ===")
- # 步骤1: 存储完整文件信息,TTL=5秒(便于测试)
- print("1. 存储完整文件信息...")
- success = await store_file_info(file_id, file_info, expire_seconds=5)
- assert success, "存储文件信息失败"
- # 验证初始状态
- print("2. 验证初始状态...")
- retrieved_info = await get_file_info(file_id, include_content=True)
- assert retrieved_info is not None, "初始获取文件信息失败"
- assert retrieved_info['file_content'] is not None, "初始文件内容为空"
- print(f" [OK] 文件内容长度: {len(retrieved_info['file_content'])}")
- # 检查键的TTL
- meta_ttl = await redis_store.ttl(f"meta:{file_id}")
- content_ttl = await redis_store.ttl(f"content:{file_id}")
- print(f" [OK] 初始TTL - meta: {meta_ttl}s, content: {content_ttl}s")
- assert abs(meta_ttl - content_ttl) <= 1, "初始TTL应该基本同步"
- # 步骤3: 等待3秒,让TTL减少一些
- print("3. 等待3秒...")
- await asyncio.sleep(3)
- # 步骤4: 更新callback_task_id(触发Bug)
- print("4. 更新callback_task_id(触发TTL不同步Bug)...")
- update_success = await store_file_info(
- file_id,
- {'callback_task_id': 'updated_task_id'},
- expire_seconds=10 # 设置更长的TTL
- )
- assert update_success, "更新callback_task_id失败"
- # 步骤5: 检查TTL不同步问题
- print("5. 检查TTL不同步问题...")
- new_meta_ttl = await redis_store.ttl(f"meta:{file_id}")
- new_content_ttl = await redis_store.ttl(f"content:{file_id}")
- print(f" [OK] 更新后TTL - meta: {new_meta_ttl}s, content: {new_content_ttl}s")
- # 验证TTL不同步:meta的TTL应该重置为10秒,content应该继续倒计时(约2秒)
- assert new_meta_ttl > 8, f"meta的TTL应该被重置为约10秒,实际为{new_meta_ttl}"
- assert new_content_ttl < 5, f"content的TTL应该继续倒计时,实际为{new_content_ttl}"
- assert abs(new_meta_ttl - new_content_ttl) > 3, "TTL应该明显不同步"
- print(" [PASS] TTL不同步Bug已复现!")
- # 步骤6: 等待content过期
- print("6. 等待content过期...")
- await asyncio.sleep(new_content_ttl + 1)
- # 步骤7: 验证获取文件内容失败
- print("7. 验证content过期后获取失败...")
- final_info = await get_file_info(file_id, include_content=True)
- # 根据当前bug逻辑,这里应该返回None(因为content缺失)
- assert final_info is None, f"content过期后应该返回None,实际返回: {final_info}"
- print(" [PASS] Bug验证成功:content过期后无法获取文件信息")
- # 清理测试数据
- await redis_store.delete(f"meta:{file_id}", f"content:{file_id}")
- async def test_only_meta_retrieval_works(self, redis_store, test_file_data):
- """
- 测试2: 验证不包含content的获取仍然正常工作
- 即使content过期,meta信息应该仍然可以获取
- """
- file_id, file_info = test_file_data
- print("\n=== 测试meta单独获取功能 ===")
- # 存储文件信息
- await store_file_info(file_id, file_info, expire_seconds=3)
- # 更新callback_task_id
- await store_file_info(file_id, {'callback_task_id': 'updated'}, expire_seconds=10)
- # 等待content过期
- await asyncio.sleep(4)
- # 测试不包含content的获取
- meta_only_info = await get_file_info(file_id, include_content=False)
- assert meta_only_info is not None, "meta信息应该可以正常获取"
- assert meta_only_info['callback_task_id'] == 'updated', "callback_task_id应该被正确更新"
- assert 'file_content' not in meta_only_info, "不应该包含file_content"
- print(" [PASS] meta信息单独获取正常")
- # 清理
- await redis_store.delete(f"meta:{file_id}")
- async def test_current_bug_detection_in_logs(self, redis_store, test_file_data):
- """
- 测试3: 模拟日志中观察到的实际场景
- 复现日志中的时间序列:
- - 11:41:53 文件信息存储
- - 11:42:01 获取失败(8秒后)
- """
- file_id, file_info = test_file_data
- print("\n=== 模拟日志中的实际场景 ===")
- # 步骤1: 模拟文件上传(11:41:53)
- print("1. 模拟文件上传(11:41:53)...")
- upload_time = time.time()
- await store_file_info(file_id, file_info, expire_seconds=10) # 10秒TTL
- # 步骤2: 模拟一些处理时间
- await asyncio.sleep(2)
- # 步骤3: 模拟callback_task_id更新(可能发生在中间某个时间点)
- print("3. 模拟callback_task_id更新...")
- await store_file_info(file_id, {'callback_task_id': 'updated_task'}, expire_seconds=15)
- # 步骤4: 等待到11:42:01(约8秒后)
- elapsed_time = time.time() - upload_time
- wait_time = 8 - elapsed_time
- if wait_time > 0:
- print(f"4. 等待到11:42:01(还需要{wait_time:.1f}秒)...")
- await asyncio.sleep(wait_time)
- # 步骤5: 验证获取失败(如日志所示)
- print("5. 验证11:42:01时的获取失败...")
- failed_info = await get_file_info(file_id, include_content=True)
- assert failed_info is None, "模拟11:42:01时应该获取失败"
- print(" [PASS] 成功复现日志中的失败场景")
- # 清理
- await redis_store.delete(f"meta:{file_id}")
- async def run_ttl_bug_tests():
- """运行TTL Bug测试"""
- print("[TEST] 开始Redis TTL不同步Bug测试")
- test_instance = TestRedisTTLSync()
- try:
- # 获取redis连接
- redis_store = await RedisConnectionFactory.get_redis_store()
- # 准备测试数据
- file_id = "test_ttl_bug_file_123"
- file_info = {
- "file_name": "测试文档.pdf",
- "file_size": 1024,
- "file_content": "这是一个测试文件内容,用于验证TTL不同步问题。" * 50,
- "callback_task_id": "original_task_id",
- "upload_time": int(time.time())
- }
- test_file_data = (file_id, file_info)
- # 运行测试
- await test_instance.test_ttl_sync_bug_reproduction(redis_store, test_file_data)
- await test_instance.test_only_meta_retrieval_works(redis_store, test_file_data)
- await test_instance.test_current_bug_detection_in_logs(redis_store, test_file_data)
- print("\n[PASS] 所有TTL Bug测试通过!")
- except Exception as e:
- print(f"\n[FAIL] 测试失败: {e}")
- import traceback
- traceback.print_exc()
- finally:
- await RedisConnectionFactory.close_all()
- if __name__ == "__main__":
- # 直接运行测试
- asyncio.run(run_ttl_bug_tests())
|