| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- #!/usr/bin/env python3
- """
- SSE进度推送集成测试脚本
- 模拟文件上传后建立SSE连接的完整流程
- """
- import requests
- import json
- import time
- import threading
- import os
- import signal
- import sys
- from typing import Dict, Any
- # 全局标志用于控制程序退出
- should_exit = False
- def signal_handler(signum, frame):
- """信号处理器,处理Ctrl+C"""
- global should_exit
- print(f"\n\n⚡ 接收到信号 {signum},正在退出...")
- should_exit = True
- sys.exit(0)
- # 注册信号处理器
- signal.signal(signal.SIGINT, signal_handler)
- signal.signal(signal.SIGTERM, signal_handler)
- class ProgressTracker:
- def __init__(self):
- self.progress_events = []
- self.completed = False
- self.error = None
- def handle_sse_event(self, event_data: Dict[str, Any]):
- """处理SSE事件"""
- self.progress_events.append(event_data)
- event_type = event_data.get("type", "unknown")
- data = event_data.get("data", {})
- print(f"\n📡 SSE事件 [{event_type}]:")
- print(f" 时间: {data.get('timestamp', 'N/A')}")
- print(f" 进度: {data.get('current', 0)}%")
- print(f" 阶段: {data.get('stage_name', 'N/A')}")
- print(f" 状态: {data.get('status', 'N/A')}")
- print(f" 消息: {data.get('message', 'N/A')}")
- if event_type == "completed":
- self.completed = True
- print(f"\n✅ 任务完成: {data.get('task_status', 'N/A')}")
- elif event_type == "error":
- self.error = data.get("message", "未知错误")
- print(f"\n❌ 任务错误: {self.error}")
- def test_file_upload():
- """测试文件上传"""
- upload_url = "http://127.0.0.1:8035/sgsc/file_upload"
- file_path = "D:/wx_work/sichuan_luqiao/项目背景资料/路桥桥梁工程施工方案 7 份/罗成依达大桥拱座竖桩专项施工方案.pdf"
- try:
- with open(file_path, 'rb') as f:
- files = {
- 'file': f
- }
- data = {
- 'callback_url': 'https://client.example.com/callback?task_id=ocr-12345',
- 'project_plan_type': 'bridge_up_part',
- 'user': 'user-001'
- }
- print("📤 开始上传文件...")
- response = requests.post(upload_url, files=files, data=data)
- response.raise_for_status()
- result = response.json()
- print(f"✅ 文件上传成功")
- print(f" 文件ID: {result['data']['id']}")
- print(f" 文件名: {result['data']['name']}")
- print(f" 回调任务ID: {result['data']['callback_task_id']}")
- return result['data']['callback_task_id']
- except Exception as e:
- print(f"❌ 文件上传失败: {e}")
- return None
- def test_sse_connection(callback_task_id: str, tracker: ProgressTracker):
- """测试SSE连接"""
- sse_url = f"http://127.0.0.1:8035/sgsc/sse/progress/{callback_task_id}?user=user-001"
- try:
- print(f"🔗 建立SSE连接: {sse_url}")
- print("=" * 60)
- print("📡 SSE原始响应:")
- print("-" * 60)
- # 建立SSE连接
- response = requests.get(sse_url, stream=True)
- response.raise_for_status()
- for line in response.iter_lines():
- if should_exit:
- print("\n👋 用户请求退出,停止监听SSE")
- break
- if line:
- # 直接打印原始响应
- line_str = line.decode('utf-8')
- print(f"原始响应: {line_str}")
- # 解析SSE事件格式(可选)
- if line_str.startswith('data: '):
- try:
- event_data = json.loads(line_str[6:])
- tracker.handle_sse_event({
- "type": "data",
- "data": event_data
- })
- # 如果任务完成,退出监听
- if tracker.completed:
- print("-" * 60)
- print("📡 任务完成,结束监听")
- break
- except json.JSONDecodeError:
- # JSON解析失败也继续监听
- pass
- except Exception as e:
- print(f"❌ SSE连接失败: {e}")
- def main():
- """主测试流程"""
- print("🚀 开始SSE进度推送集成测试")
- print("💡 提示: 按 Ctrl+C 可以随时退出")
- print("=" * 60)
- # 第一步:上传文件
- callback_task_id = test_file_upload()
- if not callback_task_id or should_exit:
- print("❌ 文件上传失败,测试终止")
- return
- print(f"\n⏳ 等待2秒后建立SSE连接...")
- for i in range(2, 0, -1):
- if should_exit:
- print("\n👋 用户请求退出,测试终止")
- return
- print(f" {i}秒...")
- time.sleep(1)
- # 第二步:建立SSE连接监听进度
- tracker = ProgressTracker()
- # 在主线程中运行SSE监听
- test_sse_connection(callback_task_id, tracker)
- # 输出测试结果
- if not should_exit:
- print("\n" + "=" * 60)
- print("📊 测试结果汇总:")
- print(f" 收到事件数量: {len(tracker.progress_events)}")
- print(f" 任务是否完成: {'✅' if tracker.completed else '❌'}")
- if tracker.error:
- print(f" 错误信息: {tracker.error}")
- if tracker.completed:
- print("🎉 SSE实时推送测试成功!")
- else:
- print("⚠️ 任务可能未在预期时间内完成")
- if __name__ == "__main__":
- main()
|