system_trace_id_test.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. """
  2. 系统Trace ID测试
  3. 验证trace_id在异步并发和队列中的正确传播
  4. """
  5. import os
  6. import sys
  7. # Add the parent directory (LQAgentPlatform) to sys.path so we can import foundation
  8. project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  9. sys.path.append(project_root)
  10. import asyncio
  11. import time
  12. from foundation.trace.trace_context import TraceContext, auto_trace
  13. from foundation.logger.loggering import server_logger as logger
  14. class TraceIDTest:
  15. """Trace ID测试类"""
  16. @staticmethod
  17. async def test_basic_context():
  18. """测试基础上下文功能"""
  19. print("\n=== 测试1: 基础上下文功能 ===")
  20. # 设置trace_id
  21. trace_id = TraceContext.generate_trace_id()
  22. TraceContext.set_trace_id(trace_id)
  23. logger.info("测试基础日志,应该包含trace_id")
  24. logger.info(f"手动设置的trace_id: {trace_id}")
  25. logger.info(f"自动获取的trace_id: {TraceContext.get_trace_id()}")
  26. assert TraceContext.get_trace_id() == trace_id, "trace_id设置失败"
  27. print("[PASS] 基础上下文功能测试通过")
  28. @staticmethod
  29. async def test_async_propagation():
  30. """测试异步并发传播"""
  31. print("\n=== 测试2: 异步并发传播 ===")
  32. # 设置主trace_id
  33. main_trace = "main-async-test"
  34. TraceContext.set_trace_id(main_trace)
  35. logger.info("主异步任务开始")
  36. async def concurrent_task(task_id: int):
  37. """并发任务"""
  38. current_trace = TraceContext.get_trace_id()
  39. logger.info(f"并发任务 {task_id} 获取到的trace_id: {current_trace}")
  40. # 在异步任务中修改trace_id,应该不影响其他任务
  41. new_trace = f"{main_trace}-subtask-{task_id}"
  42. TraceContext.set_trace_id(new_trace)
  43. await asyncio.sleep(0.1)
  44. logger.info(f"并发任务 {task_id} 修改后的trace_id: {new_trace}")
  45. return current_trace
  46. # 创建并发任务
  47. tasks = [concurrent_task(i) for i in range(3)]
  48. results = await asyncio.gather(*tasks)
  49. # 验证所有任务都继承到了主trace_id
  50. for i, result in enumerate(results):
  51. assert result == main_trace, f"任务 {i} 没有继承主trace_id"
  52. # 验证主任务trace_id不受影响
  53. assert TraceContext.get_trace_id() == main_trace, "主trace_id被并发任务污染"
  54. logger.info("主异步任务完成")
  55. print("[PASS] 异步并发传播测试通过")
  56. @staticmethod
  57. @auto_trace('callback_task_id')
  58. async def test_decorator_auto_trace(callback_task_id: str):
  59. """测试装饰器自动trace"""
  60. print(f"\n=== 测试3: 装饰器自动trace ===")
  61. # 不需要手动设置trace_id,装饰器会自动处理
  62. current_trace = TraceContext.get_trace_id()
  63. logger.info("装饰器自动设置的日志")
  64. assert current_trace == callback_task_id, "装饰器没有正确设置trace_id"
  65. # 测试装饰器在异步并发中的表现
  66. async def nested_task():
  67. nested_trace = TraceContext.get_trace_id()
  68. logger.info("嵌套异步任务")
  69. return nested_trace
  70. nested_result = await nested_task()
  71. assert nested_result == callback_task_id, "嵌套任务没有继承装饰器设置的trace_id"
  72. print(f"[PASS] 装饰器自动trace测试通过,trace_id: {callback_task_id}")
  73. @staticmethod
  74. async def test_context_manager():
  75. """测试上下文管理器"""
  76. print("\n=== 测试4: 上下文管理器 ===")
  77. original_trace = TraceContext.get_trace_id()
  78. logger.info(f"原始trace_id: {original_trace}")
  79. # 使用上下文管理器临时设置trace_id
  80. temp_trace = "temporary-trace"
  81. with TraceContext.with_trace_context(temp_trace) as ctx:
  82. logger.info("上下文管理器内的日志")
  83. current_trace = TraceContext.get_trace_id()
  84. assert current_trace == temp_trace, "上下文管理器没有正确设置trace_id"
  85. # 退出上下文后应该恢复原始trace_id
  86. restored_trace = TraceContext.get_trace_id()
  87. logger.info(f"恢复后的trace_id: {restored_trace}")
  88. assert restored_trace == original_trace, "上下文管理器没有正确恢复trace_id"
  89. print("[PASS] 上下文管理器测试通过")
  90. @staticmethod
  91. def test_celery_task_simulation():
  92. """测试Celery任务trace_id模拟"""
  93. print("\n=== 测试5: Celery任务trace_id模拟 ===")
  94. # 模拟提交Celery任务前的trace_id设置
  95. submit_trace = "celery-submit-test"
  96. TraceContext.set_trace_id(submit_trace)
  97. logger.info("准备提交Celery任务")
  98. # 模拟Celery任务执行
  99. def simulate_celery_task_execution(file_info: dict, _system_trace_id=None):
  100. """模拟Celery任务执行"""
  101. if _system_trace_id:
  102. TraceContext.set_trace_id(_system_trace_id)
  103. current_trace = TraceContext.get_trace_id()
  104. logger.info("Celery任务执行中")
  105. logger.info(f"文件ID: {file_info.get('file_id')}")
  106. return current_trace
  107. # 提交任务(模拟)
  108. file_info = {'file_id': 'test-file-123'}
  109. extracted_trace = TraceContext.get_trace_id()
  110. # 执行任务
  111. task_trace = simulate_celery_task_execution(
  112. file_info,
  113. _system_trace_id=extracted_trace
  114. )
  115. assert task_trace == submit_trace, "Celery任务没有正确获取到trace_id"
  116. print("[PASS] Celery任务trace_id模拟测试通过")
  117. async def run_all_tests():
  118. """运行所有测试"""
  119. print("开始运行系统Trace ID测试...\n")
  120. try:
  121. # 测试1: 基础上下文功能
  122. await TraceIDTest.test_basic_context()
  123. # 测试2: 异步并发传播
  124. await TraceIDTest.test_async_propagation()
  125. # 测试3: 装饰器自动trace
  126. await TraceIDTest.test_decorator_auto_trace("decorator-test-123")
  127. # 测试4: 上下文管理器
  128. await TraceIDTest.test_context_manager()
  129. # 测试5: Celery任务模拟
  130. TraceIDTest.test_celery_task_simulation()
  131. print("\n[SUCCESS] 所有测试通过!系统Trace ID机制工作正常")
  132. return True
  133. except Exception as e:
  134. print(f"\n[FAIL] 测试失败: {str(e)}")
  135. import traceback
  136. traceback.print_exc()
  137. return False
  138. if __name__ == "__main__":
  139. # 运行测试
  140. success = asyncio.run(run_all_tests())
  141. exit(0 if success else 1)