test_sse_integration.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #!/usr/bin/env python3
  2. """
  3. SSE进度推送集成测试脚本
  4. 模拟文件上传后建立SSE连接的完整流程
  5. """
  6. import requests
  7. import json
  8. import time
  9. import threading
  10. import os
  11. import signal
  12. import sys
  13. from typing import Dict, Any
  14. # 全局标志用于控制程序退出
  15. should_exit = False
  16. def signal_handler(signum, frame):
  17. """信号处理器,处理Ctrl+C"""
  18. global should_exit
  19. print(f"\n\n⚡ 接收到信号 {signum},正在退出...")
  20. should_exit = True
  21. sys.exit(0)
  22. # 注册信号处理器
  23. signal.signal(signal.SIGINT, signal_handler)
  24. signal.signal(signal.SIGTERM, signal_handler)
  25. class ProgressTracker:
  26. def __init__(self):
  27. self.progress_events = []
  28. self.completed = False
  29. self.error = None
  30. def handle_sse_event(self, event_data: Dict[str, Any]):
  31. """处理SSE事件"""
  32. self.progress_events.append(event_data)
  33. event_type = event_data.get("type", "unknown")
  34. data = event_data.get("data", {})
  35. print(f"\n📡 SSE事件 [{event_type}]:")
  36. print(f" 时间: {data.get('timestamp', 'N/A')}")
  37. print(f" 进度: {data.get('current', 0)}%")
  38. print(f" 阶段: {data.get('stage_name', 'N/A')}")
  39. print(f" 状态: {data.get('status', 'N/A')}")
  40. print(f" 消息: {data.get('message', 'N/A')}")
  41. if event_type == "completed":
  42. self.completed = True
  43. print(f"\n✅ 任务完成: {data.get('task_status', 'N/A')}")
  44. elif event_type == "error":
  45. self.error = data.get("message", "未知错误")
  46. print(f"\n❌ 任务错误: {self.error}")
  47. def test_file_upload():
  48. """测试文件上传"""
  49. upload_url = "http://127.0.0.1:8035/sgsc/file_upload"
  50. file_path = "D:/wx_work/sichuan_luqiao/项目背景资料/路桥桥梁工程施工方案 7 份/罗成依达大桥拱座竖桩专项施工方案.pdf"
  51. try:
  52. with open(file_path, 'rb') as f:
  53. files = {
  54. 'file': f
  55. }
  56. data = {
  57. 'callback_url': 'https://client.example.com/callback?task_id=ocr-12345',
  58. 'project_plan_type': 'bridge_up_part',
  59. 'user': 'user-001'
  60. }
  61. print("📤 开始上传文件...")
  62. response = requests.post(upload_url, files=files, data=data)
  63. response.raise_for_status()
  64. result = response.json()
  65. print(f"✅ 文件上传成功")
  66. print(f" 文件ID: {result['data']['id']}")
  67. print(f" 文件名: {result['data']['name']}")
  68. print(f" 回调任务ID: {result['data']['callback_task_id']}")
  69. return result['data']['callback_task_id']
  70. except Exception as e:
  71. print(f"❌ 文件上传失败: {e}")
  72. return None
  73. def test_sse_connection(callback_task_id: str, tracker: ProgressTracker):
  74. """测试SSE连接"""
  75. sse_url = f"http://127.0.0.1:8035/sgsc/sse/progress/{callback_task_id}?user=user-001"
  76. try:
  77. print(f"🔗 建立SSE连接: {sse_url}")
  78. print("=" * 60)
  79. print("📡 SSE原始响应:")
  80. print("-" * 60)
  81. # 建立SSE连接
  82. response = requests.get(sse_url, stream=True)
  83. response.raise_for_status()
  84. for line in response.iter_lines():
  85. if should_exit:
  86. print("\n👋 用户请求退出,停止监听SSE")
  87. break
  88. if line:
  89. # 直接打印原始响应
  90. line_str = line.decode('utf-8')
  91. print(f"原始响应: {line_str}")
  92. # 解析SSE事件格式(可选)
  93. if line_str.startswith('data: '):
  94. try:
  95. event_data = json.loads(line_str[6:])
  96. tracker.handle_sse_event({
  97. "type": "data",
  98. "data": event_data
  99. })
  100. # 如果任务完成,退出监听
  101. if tracker.completed:
  102. print("-" * 60)
  103. print("📡 任务完成,结束监听")
  104. break
  105. except json.JSONDecodeError:
  106. # JSON解析失败也继续监听
  107. pass
  108. except Exception as e:
  109. print(f"❌ SSE连接失败: {e}")
  110. def main():
  111. """主测试流程"""
  112. print("🚀 开始SSE进度推送集成测试")
  113. print("💡 提示: 按 Ctrl+C 可以随时退出")
  114. print("=" * 60)
  115. # 第一步:上传文件
  116. callback_task_id = test_file_upload()
  117. if not callback_task_id or should_exit:
  118. print("❌ 文件上传失败,测试终止")
  119. return
  120. print(f"\n⏳ 等待2秒后建立SSE连接...")
  121. for i in range(2, 0, -1):
  122. if should_exit:
  123. print("\n👋 用户请求退出,测试终止")
  124. return
  125. print(f" {i}秒...")
  126. time.sleep(1)
  127. # 第二步:建立SSE连接监听进度
  128. tracker = ProgressTracker()
  129. # 在主线程中运行SSE监听
  130. test_sse_connection(callback_task_id, tracker)
  131. # 输出测试结果
  132. if not should_exit:
  133. print("\n" + "=" * 60)
  134. print("📊 测试结果汇总:")
  135. print(f" 收到事件数量: {len(tracker.progress_events)}")
  136. print(f" 任务是否完成: {'✅' if tracker.completed else '❌'}")
  137. if tracker.error:
  138. print(f" 错误信息: {tracker.error}")
  139. if tracker.completed:
  140. print("🎉 SSE实时推送测试成功!")
  141. else:
  142. print("⚠️ 任务可能未在预期时间内完成")
  143. if __name__ == "__main__":
  144. main()