""" 系统Trace ID测试 验证trace_id在异步并发和队列中的正确传播 """ import os import sys # Add the parent directory (LQAgentPlatform) to sys.path so we can import foundation project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(project_root) import asyncio import time from foundation.infrastructure.tracing import TraceContext, auto_trace from foundation.observability.logger.loggering import server_logger as logger class TraceIDTest: """Trace ID测试类""" @staticmethod async def test_basic_context(): """测试基础上下文功能""" print("\n=== 测试1: 基础上下文功能 ===") # 设置trace_id trace_id = TraceContext.generate_trace_id() TraceContext.set_trace_id(trace_id) logger.info("测试基础日志,应该包含trace_id") logger.info(f"手动设置的trace_id: {trace_id}") logger.info(f"自动获取的trace_id: {TraceContext.get_trace_id()}") assert TraceContext.get_trace_id() == trace_id, "trace_id设置失败" print("[PASS] 基础上下文功能测试通过") @staticmethod async def test_async_propagation(): """测试异步并发传播""" print("\n=== 测试2: 异步并发传播 ===") # 设置主trace_id main_trace = "main-async-test" TraceContext.set_trace_id(main_trace) logger.info("主异步任务开始") async def concurrent_task(task_id: int): """并发任务""" current_trace = TraceContext.get_trace_id() logger.info(f"并发任务 {task_id} 获取到的trace_id: {current_trace}") # 在异步任务中修改trace_id,应该不影响其他任务 new_trace = f"{main_trace}-subtask-{task_id}" TraceContext.set_trace_id(new_trace) await asyncio.sleep(0.1) logger.info(f"并发任务 {task_id} 修改后的trace_id: {new_trace}") return current_trace # 创建并发任务 tasks = [concurrent_task(i) for i in range(3)] results = await asyncio.gather(*tasks) # 验证所有任务都继承到了主trace_id for i, result in enumerate(results): assert result == main_trace, f"任务 {i} 没有继承主trace_id" # 验证主任务trace_id不受影响 assert TraceContext.get_trace_id() == main_trace, "主trace_id被并发任务污染" logger.info("主异步任务完成") print("[PASS] 异步并发传播测试通过") @staticmethod @auto_trace('callback_task_id') async def test_decorator_auto_trace(callback_task_id: str): """测试装饰器自动trace""" print(f"\n=== 测试3: 装饰器自动trace ===") # 不需要手动设置trace_id,装饰器会自动处理 current_trace = TraceContext.get_trace_id() logger.info("装饰器自动设置的日志") assert current_trace == callback_task_id, "装饰器没有正确设置trace_id" # 测试装饰器在异步并发中的表现 async def nested_task(): nested_trace = TraceContext.get_trace_id() logger.info("嵌套异步任务") return nested_trace nested_result = await nested_task() assert nested_result == callback_task_id, "嵌套任务没有继承装饰器设置的trace_id" print(f"[PASS] 装饰器自动trace测试通过,trace_id: {callback_task_id}") @staticmethod async def test_context_manager(): """测试上下文管理器""" print("\n=== 测试4: 上下文管理器 ===") original_trace = TraceContext.get_trace_id() logger.info(f"原始trace_id: {original_trace}") # 使用上下文管理器临时设置trace_id temp_trace = "temporary-trace" with TraceContext.with_trace_context(temp_trace) as ctx: logger.info("上下文管理器内的日志") current_trace = TraceContext.get_trace_id() assert current_trace == temp_trace, "上下文管理器没有正确设置trace_id" # 退出上下文后应该恢复原始trace_id restored_trace = TraceContext.get_trace_id() logger.info(f"恢复后的trace_id: {restored_trace}") assert restored_trace == original_trace, "上下文管理器没有正确恢复trace_id" print("[PASS] 上下文管理器测试通过") @staticmethod def test_celery_task_simulation(): """测试Celery任务trace_id模拟""" print("\n=== 测试5: Celery任务trace_id模拟 ===") # 模拟提交Celery任务前的trace_id设置 submit_trace = "celery-submit-test" TraceContext.set_trace_id(submit_trace) logger.info("准备提交Celery任务") # 模拟Celery任务执行 def simulate_celery_task_execution(file_info: dict, _system_trace_id=None): """模拟Celery任务执行""" if _system_trace_id: TraceContext.set_trace_id(_system_trace_id) current_trace = TraceContext.get_trace_id() logger.info("Celery任务执行中") logger.info(f"文件ID: {file_info.get('file_id')}") return current_trace # 提交任务(模拟) file_info = {'file_id': 'test-file-123'} extracted_trace = TraceContext.get_trace_id() # 执行任务 task_trace = simulate_celery_task_execution( file_info, _system_trace_id=extracted_trace ) assert task_trace == submit_trace, "Celery任务没有正确获取到trace_id" print("[PASS] Celery任务trace_id模拟测试通过") async def run_all_tests(): """运行所有测试""" print("开始运行系统Trace ID测试...\n") try: # 测试1: 基础上下文功能 await TraceIDTest.test_basic_context() # 测试2: 异步并发传播 await TraceIDTest.test_async_propagation() # 测试3: 装饰器自动trace await TraceIDTest.test_decorator_auto_trace("decorator-test-123") # 测试4: 上下文管理器 await TraceIDTest.test_context_manager() # 测试5: Celery任务模拟 TraceIDTest.test_celery_task_simulation() print("\n[SUCCESS] 所有测试通过!系统Trace ID机制工作正常") return True except Exception as e: print(f"\n[FAIL] 测试失败: {str(e)}") import traceback traceback.print_exc() return False if __name__ == "__main__": # 运行测试 success = asyncio.run(run_all_tests()) exit(0 if success else 1)