#!/usr/bin/env python3 """ SSE进度推送集成测试脚本 模拟文件上传后建立SSE连接的完整流程 """ import requests import json import time import threading import os import signal import sys from typing import Dict, Any # 全局标志用于控制程序退出 should_exit = False def signal_handler(signum, frame): """信号处理器,处理Ctrl+C""" global should_exit print(f"\n\n⚡ 接收到信号 {signum},正在退出...") should_exit = True sys.exit(0) # 注册信号处理器 signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) class ProgressTracker: def __init__(self): self.progress_events = [] self.completed = False self.error = None def handle_sse_event(self, event_data: Dict[str, Any]): """处理SSE事件""" self.progress_events.append(event_data) event_type = event_data.get("type", "unknown") data = event_data.get("data", {}) print(f"\n📡 SSE事件 [{event_type}]:") print(f" 时间: {data.get('timestamp', 'N/A')}") print(f" 进度: {data.get('current', 0)}%") print(f" 阶段: {data.get('stage_name', 'N/A')}") print(f" 状态: {data.get('status', 'N/A')}") print(f" 消息: {data.get('message', 'N/A')}") if event_type == "completed": self.completed = True print(f"\n✅ 任务完成: {data.get('task_status', 'N/A')}") elif event_type == "error": self.error = data.get("message", "未知错误") print(f"\n❌ 任务错误: {self.error}") def test_file_upload(): """测试文件上传""" upload_url = "http://127.0.0.1:8035/sgsc/file_upload" file_path = "D:/wx_work/sichuan_luqiao/项目背景资料/路桥桥梁工程施工方案 7 份/罗成依达大桥拱座竖桩专项施工方案.pdf" try: with open(file_path, 'rb') as f: files = { 'file': f } data = { 'callback_url': 'https://client.example.com/callback?task_id=ocr-12345', 'project_plan_type': 'bridge_up_part', 'user': 'user-001' } print("📤 开始上传文件...") response = requests.post(upload_url, files=files, data=data) response.raise_for_status() result = response.json() print(f"✅ 文件上传成功") print(f" 文件ID: {result['data']['id']}") print(f" 文件名: {result['data']['name']}") print(f" 回调任务ID: {result['data']['callback_task_id']}") return result['data']['callback_task_id'] except Exception as e: print(f"❌ 文件上传失败: {e}") return None def test_sse_connection(callback_task_id: str, tracker: ProgressTracker): """测试SSE连接""" sse_url = f"http://127.0.0.1:8035/sgsc/sse/progress/{callback_task_id}?user=user-001" try: print(f"🔗 建立SSE连接: {sse_url}") print("=" * 60) print("📡 SSE原始响应:") print("-" * 60) # 建立SSE连接 response = requests.get(sse_url, stream=True) response.raise_for_status() for line in response.iter_lines(): if should_exit: print("\n👋 用户请求退出,停止监听SSE") break if line: # 直接打印原始响应 line_str = line.decode('utf-8') print(f"原始响应: {line_str}") # 解析SSE事件格式(可选) if line_str.startswith('data: '): try: event_data = json.loads(line_str[6:]) tracker.handle_sse_event({ "type": "data", "data": event_data }) # 如果任务完成,退出监听 if tracker.completed: print("-" * 60) print("📡 任务完成,结束监听") break except json.JSONDecodeError: # JSON解析失败也继续监听 pass except Exception as e: print(f"❌ SSE连接失败: {e}") def main(): """主测试流程""" print("🚀 开始SSE进度推送集成测试") print("💡 提示: 按 Ctrl+C 可以随时退出") print("=" * 60) # 第一步:上传文件 callback_task_id = test_file_upload() if not callback_task_id or should_exit: print("❌ 文件上传失败,测试终止") return print(f"\n⏳ 等待2秒后建立SSE连接...") for i in range(2, 0, -1): if should_exit: print("\n👋 用户请求退出,测试终止") return print(f" {i}秒...") time.sleep(1) # 第二步:建立SSE连接监听进度 tracker = ProgressTracker() # 在主线程中运行SSE监听 test_sse_connection(callback_task_id, tracker) # 输出测试结果 if not should_exit: print("\n" + "=" * 60) print("📊 测试结果汇总:") print(f" 收到事件数量: {len(tracker.progress_events)}") print(f" 任务是否完成: {'✅' if tracker.completed else '❌'}") if tracker.error: print(f" 错误信息: {tracker.error}") if tracker.completed: print("🎉 SSE实时推送测试成功!") else: print("⚠️ 任务可能未在预期时间内完成") if __name__ == "__main__": main()