system_trace_id_test.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. """
  2. 系统Trace ID测试
  3. 验证trace_id在异步并发和队列中的正确传播
  4. """
  5. import os
  6. import sys
  7. # Add the parent directory (LQAgentPlatform) to sys.path so we can import foundation
  8. project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  9. import asyncio
  10. import time
  11. from foundation.infrastructure.tracing import TraceContext, auto_trace
  12. from foundation.observability.logger.loggering import review_logger as logger
  13. class TraceIDTest:
  14. """Trace ID测试类"""
  15. @staticmethod
  16. async def test_basic_context():
  17. """测试基础上下文功能"""
  18. print("\n=== 测试1: 基础上下文功能 ===")
  19. # 设置trace_id
  20. trace_id = TraceContext.generate_trace_id()
  21. TraceContext.set_trace_id(trace_id)
  22. logger.info("测试基础日志,应该包含trace_id")
  23. logger.info(f"手动设置的trace_id: {trace_id}")
  24. logger.info(f"自动获取的trace_id: {TraceContext.get_trace_id()}")
  25. assert TraceContext.get_trace_id() == trace_id, "trace_id设置失败"
  26. print("[PASS] 基础上下文功能测试通过")
  27. @staticmethod
  28. async def test_async_propagation():
  29. """测试异步并发传播"""
  30. print("\n=== 测试2: 异步并发传播 ===")
  31. # 设置主trace_id
  32. main_trace = "main-async-test"
  33. TraceContext.set_trace_id(main_trace)
  34. logger.info("主异步任务开始")
  35. async def concurrent_task(task_id: int):
  36. """并发任务"""
  37. current_trace = TraceContext.get_trace_id()
  38. logger.info(f"并发任务 {task_id} 获取到的trace_id: {current_trace}")
  39. # 在异步任务中修改trace_id,应该不影响其他任务
  40. new_trace = f"{main_trace}-subtask-{task_id}"
  41. TraceContext.set_trace_id(new_trace)
  42. await asyncio.sleep(0.1)
  43. logger.info(f"并发任务 {task_id} 修改后的trace_id: {new_trace}")
  44. return current_trace
  45. # 创建并发任务
  46. tasks = [concurrent_task(i) for i in range(3)]
  47. results = await asyncio.gather(*tasks)
  48. # 验证所有任务都继承到了主trace_id
  49. for i, result in enumerate(results):
  50. assert result == main_trace, f"任务 {i} 没有继承主trace_id"
  51. # 验证主任务trace_id不受影响
  52. assert TraceContext.get_trace_id() == main_trace, "主trace_id被并发任务污染"
  53. logger.info("主异步任务完成")
  54. print("[PASS] 异步并发传播测试通过")
  55. @staticmethod
  56. @auto_trace('callback_task_id')
  57. async def test_decorator_auto_trace(callback_task_id: str):
  58. """测试装饰器自动trace"""
  59. print(f"\n=== 测试3: 装饰器自动trace ===")
  60. # 不需要手动设置trace_id,装饰器会自动处理
  61. current_trace = TraceContext.get_trace_id()
  62. logger.info("装饰器自动设置的日志")
  63. assert current_trace == callback_task_id, "装饰器没有正确设置trace_id"
  64. # 测试装饰器在异步并发中的表现
  65. async def nested_task():
  66. nested_trace = TraceContext.get_trace_id()
  67. logger.info("嵌套异步任务")
  68. return nested_trace
  69. nested_result = await nested_task()
  70. assert nested_result == callback_task_id, "嵌套任务没有继承装饰器设置的trace_id"
  71. print(f"[PASS] 装饰器自动trace测试通过,trace_id: {callback_task_id}")
  72. @staticmethod
  73. async def test_context_manager():
  74. """测试上下文管理器"""
  75. print("\n=== 测试4: 上下文管理器 ===")
  76. original_trace = TraceContext.get_trace_id()
  77. logger.info(f"原始trace_id: {original_trace}")
  78. # 使用上下文管理器临时设置trace_id
  79. temp_trace = "temporary-trace"
  80. with TraceContext.with_trace_context(temp_trace) as ctx:
  81. logger.info("上下文管理器内的日志")
  82. current_trace = TraceContext.get_trace_id()
  83. assert current_trace == temp_trace, "上下文管理器没有正确设置trace_id"
  84. # 退出上下文后应该恢复原始trace_id
  85. restored_trace = TraceContext.get_trace_id()
  86. logger.info(f"恢复后的trace_id: {restored_trace}")
  87. assert restored_trace == original_trace, "上下文管理器没有正确恢复trace_id"
  88. print("[PASS] 上下文管理器测试通过")
  89. @staticmethod
  90. def test_celery_task_simulation():
  91. """测试Celery任务trace_id模拟"""
  92. print("\n=== 测试5: Celery任务trace_id模拟 ===")
  93. # 模拟提交Celery任务前的trace_id设置
  94. submit_trace = "celery-submit-test"
  95. TraceContext.set_trace_id(submit_trace)
  96. logger.info("准备提交Celery任务")
  97. # 模拟Celery任务执行
  98. def simulate_celery_task_execution(file_info: dict, _system_trace_id=None):
  99. """模拟Celery任务执行"""
  100. if _system_trace_id:
  101. TraceContext.set_trace_id(_system_trace_id)
  102. current_trace = TraceContext.get_trace_id()
  103. logger.info("Celery任务执行中")
  104. logger.info(f"文件ID: {file_info.get('file_id')}")
  105. return current_trace
  106. # 提交任务(模拟)
  107. file_info = {'file_id': 'test-file-123'}
  108. extracted_trace = TraceContext.get_trace_id()
  109. # 执行任务
  110. task_trace = simulate_celery_task_execution(
  111. file_info,
  112. _system_trace_id=extracted_trace
  113. )
  114. assert task_trace == submit_trace, "Celery任务没有正确获取到trace_id"
  115. print("[PASS] Celery任务trace_id模拟测试通过")
  116. async def run_all_tests():
  117. """运行所有测试"""
  118. print("开始运行系统Trace ID测试...\n")
  119. try:
  120. # 测试1: 基础上下文功能
  121. await TraceIDTest.test_basic_context()
  122. # 测试2: 异步并发传播
  123. await TraceIDTest.test_async_propagation()
  124. # 测试3: 装饰器自动trace
  125. await TraceIDTest.test_decorator_auto_trace("decorator-test-123")
  126. # 测试4: 上下文管理器
  127. await TraceIDTest.test_context_manager()
  128. # 测试5: Celery任务模拟
  129. TraceIDTest.test_celery_task_simulation()
  130. print("\n[SUCCESS] 所有测试通过!系统Trace ID机制工作正常")
  131. return True
  132. except Exception as e:
  133. print(f"\n[FAIL] 测试失败: {str(e)}")
  134. import traceback
  135. traceback.print_exc()
  136. return False
  137. if __name__ == "__main__":
  138. # 运行测试
  139. success = asyncio.run(run_all_tests())
  140. exit(0 if success else 1)