| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- """
- 系统Trace ID测试
- 验证trace_id在异步并发和队列中的正确传播
- """
- import os
- import sys
- # Add the parent directory (LQAgentPlatform) to sys.path so we can import foundation
- project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(project_root)
- import asyncio
- import time
- from foundation.infrastructure.tracing import TraceContext, auto_trace
- from foundation.observability.logger.loggering import server_logger as logger
- class TraceIDTest:
- """Trace ID测试类"""
- @staticmethod
- async def test_basic_context():
- """测试基础上下文功能"""
- print("\n=== 测试1: 基础上下文功能 ===")
- # 设置trace_id
- trace_id = TraceContext.generate_trace_id()
- TraceContext.set_trace_id(trace_id)
- logger.info("测试基础日志,应该包含trace_id")
- logger.info(f"手动设置的trace_id: {trace_id}")
- logger.info(f"自动获取的trace_id: {TraceContext.get_trace_id()}")
- assert TraceContext.get_trace_id() == trace_id, "trace_id设置失败"
- print("[PASS] 基础上下文功能测试通过")
- @staticmethod
- async def test_async_propagation():
- """测试异步并发传播"""
- print("\n=== 测试2: 异步并发传播 ===")
- # 设置主trace_id
- main_trace = "main-async-test"
- TraceContext.set_trace_id(main_trace)
- logger.info("主异步任务开始")
- async def concurrent_task(task_id: int):
- """并发任务"""
- current_trace = TraceContext.get_trace_id()
- logger.info(f"并发任务 {task_id} 获取到的trace_id: {current_trace}")
- # 在异步任务中修改trace_id,应该不影响其他任务
- new_trace = f"{main_trace}-subtask-{task_id}"
- TraceContext.set_trace_id(new_trace)
- await asyncio.sleep(0.1)
- logger.info(f"并发任务 {task_id} 修改后的trace_id: {new_trace}")
- return current_trace
- # 创建并发任务
- tasks = [concurrent_task(i) for i in range(3)]
- results = await asyncio.gather(*tasks)
- # 验证所有任务都继承到了主trace_id
- for i, result in enumerate(results):
- assert result == main_trace, f"任务 {i} 没有继承主trace_id"
- # 验证主任务trace_id不受影响
- assert TraceContext.get_trace_id() == main_trace, "主trace_id被并发任务污染"
- logger.info("主异步任务完成")
- print("[PASS] 异步并发传播测试通过")
- @staticmethod
- @auto_trace('callback_task_id')
- async def test_decorator_auto_trace(callback_task_id: str):
- """测试装饰器自动trace"""
- print(f"\n=== 测试3: 装饰器自动trace ===")
- # 不需要手动设置trace_id,装饰器会自动处理
- current_trace = TraceContext.get_trace_id()
- logger.info("装饰器自动设置的日志")
- assert current_trace == callback_task_id, "装饰器没有正确设置trace_id"
- # 测试装饰器在异步并发中的表现
- async def nested_task():
- nested_trace = TraceContext.get_trace_id()
- logger.info("嵌套异步任务")
- return nested_trace
- nested_result = await nested_task()
- assert nested_result == callback_task_id, "嵌套任务没有继承装饰器设置的trace_id"
- print(f"[PASS] 装饰器自动trace测试通过,trace_id: {callback_task_id}")
- @staticmethod
- async def test_context_manager():
- """测试上下文管理器"""
- print("\n=== 测试4: 上下文管理器 ===")
- original_trace = TraceContext.get_trace_id()
- logger.info(f"原始trace_id: {original_trace}")
- # 使用上下文管理器临时设置trace_id
- temp_trace = "temporary-trace"
- with TraceContext.with_trace_context(temp_trace) as ctx:
- logger.info("上下文管理器内的日志")
- current_trace = TraceContext.get_trace_id()
- assert current_trace == temp_trace, "上下文管理器没有正确设置trace_id"
- # 退出上下文后应该恢复原始trace_id
- restored_trace = TraceContext.get_trace_id()
- logger.info(f"恢复后的trace_id: {restored_trace}")
- assert restored_trace == original_trace, "上下文管理器没有正确恢复trace_id"
- print("[PASS] 上下文管理器测试通过")
- @staticmethod
- def test_celery_task_simulation():
- """测试Celery任务trace_id模拟"""
- print("\n=== 测试5: Celery任务trace_id模拟 ===")
- # 模拟提交Celery任务前的trace_id设置
- submit_trace = "celery-submit-test"
- TraceContext.set_trace_id(submit_trace)
- logger.info("准备提交Celery任务")
- # 模拟Celery任务执行
- def simulate_celery_task_execution(file_info: dict, _system_trace_id=None):
- """模拟Celery任务执行"""
- if _system_trace_id:
- TraceContext.set_trace_id(_system_trace_id)
- current_trace = TraceContext.get_trace_id()
- logger.info("Celery任务执行中")
- logger.info(f"文件ID: {file_info.get('file_id')}")
- return current_trace
- # 提交任务(模拟)
- file_info = {'file_id': 'test-file-123'}
- extracted_trace = TraceContext.get_trace_id()
- # 执行任务
- task_trace = simulate_celery_task_execution(
- file_info,
- _system_trace_id=extracted_trace
- )
- assert task_trace == submit_trace, "Celery任务没有正确获取到trace_id"
- print("[PASS] Celery任务trace_id模拟测试通过")
- async def run_all_tests():
- """运行所有测试"""
- print("开始运行系统Trace ID测试...\n")
- try:
- # 测试1: 基础上下文功能
- await TraceIDTest.test_basic_context()
- # 测试2: 异步并发传播
- await TraceIDTest.test_async_propagation()
- # 测试3: 装饰器自动trace
- await TraceIDTest.test_decorator_auto_trace("decorator-test-123")
- # 测试4: 上下文管理器
- await TraceIDTest.test_context_manager()
- # 测试5: Celery任务模拟
- TraceIDTest.test_celery_task_simulation()
- print("\n[SUCCESS] 所有测试通过!系统Trace ID机制工作正常")
- return True
- except Exception as e:
- print(f"\n[FAIL] 测试失败: {str(e)}")
- import traceback
- traceback.print_exc()
- return False
- if __name__ == "__main__":
- # 运行测试
- success = asyncio.run(run_all_tests())
- exit(0 if success else 1)
|