import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import requests import time from foundation.observability.monitoring.time_statistics import track_execution_time # 批处理配置 BATCH_CONFIG = { "api_url": "http://aiclu.small-app.wang:8000/file_parse", "batch_size": 3, # 批量大小:调整为3次,避免超时 "timeout": 360, # 增加超时时间,PDF可能更耗时 "test_file": "data_pipeline/test_rawdata/成渝扩容桥梁下部结构专项施工方案(正式版).pdf" } def process_pdf_batch(pdf_file, start_page, end_page, batch_idx): """处理PDF的一个批次""" print(f"[批次{batch_idx}] 开始处理第{start_page}-{end_page}页 {time.strftime('%H:%M:%S')}") files = [ ("files", ("chengyu_bridge.pdf", open(pdf_file, "rb"), "application/pdf")) ] data = { "backend": "pipeline", "parse_method": "auto", "formula_enable": "true", "table_enable": "true", "return_md": "true", "return_middle_json": "true", "start_page_id": start_page, "end_page_id": end_page, } try: response = requests.post( BATCH_CONFIG["api_url"], files=files, data=data, timeout=BATCH_CONFIG["timeout"] ) print(f"[批次{batch_idx}] 完成时间: {time.strftime('%H:%M:%S')} - 状态码: {response.status_code}") if response.status_code == 200: result = response.json() page_count = end_page - start_page + 1 print(f"[批次{batch_idx}] ✅ 完成,处理了{page_count}页") return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "success": True, "result": result} else: print(f"[批次{batch_idx}] ❌ 失败: {response.status_code}") return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "error": f"HTTP {response.status_code}"} except Exception as e: print(f"[批次{batch_idx}] ❌ 异常: {e}") return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "error": str(e)} finally: # 确保关闭文件句柄 for _, file_tuple in files: file_obj = file_tuple[1] if hasattr(file_obj, 'close'): file_obj.close() @track_execution_time def parse_pdf_batches(): """分批处理119页PDF文件(每批20页)""" pdf_file = BATCH_CONFIG["test_file"] total_pages = 119 batch_size = 20 # 每批处理20页 batches_count = (total_pages + batch_size - 1) // batch_size # 向上取整 print(f"开始分批处理PDF文件: {pdf_file}") print(f"总页数: {total_pages}页") print(f"每批处理: {batch_size}页") print(f"总批次数: {batches_count}批") print(f"开始时间: {time.strftime('%H:%M:%S')}") # 准备所有批次 batches = [] for i in range(batches_count): start_page = i * batch_size end_page = min((i + 1) * batch_size - 1, total_pages - 1) batches.append((start_page, end_page, i + 1)) # 串行处理所有批次 results = [] for start_page, end_page, batch_idx in batches: print(f"\n开始处理批次 {batch_idx}/{batches_count}") result = process_pdf_batch(pdf_file, start_page, end_page, batch_idx) results.append(result) # 打印进度 processed_pages = end_page + 1 progress = (processed_pages / total_pages) * 100 print(f"进度: {processed_pages}/{total_pages} 页 ({progress:.1f}%)") # 统计结果 successful_batches = [r for r in results if "success" in r and r["success"]] failed_batches = [r for r in results if "error" in r] print(f"\n=== 处理完成统计 ===") print(f"成功批次: {len(successful_batches)}/{batches_count}") print(f"失败批次: {len(failed_batches)}") print(f"总处理页数: {total_pages}") return results @track_execution_time def parse_multiple_batches(): """多批处理并发测试""" print(f"开始多批处理并发测试,每个批次5个文件") # 准备不同批次的文件列表(为了区分,可以复制同一个文件到不同名称) batch_files = [] for batch_idx in range(3): # 3个批次 files = [] for i in range(BATCH_CONFIG["batch_size"]): files.append(( "files", (f"batch{batch_idx+1}_scan_{i+1}.png", open("test/sgfa_mineru_testimage.png", "rb"), "image/png") )) batch_files.append(files) try: # 并发发送多个批处理请求 from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor(max_workers=3) as executor: futures = [] for batch_idx, files in enumerate(batch_files): future = executor.submit(send_batch_request, files, batch_idx + 1) futures.append(future) results = [] for future in as_completed(futures): try: result = future.result() results.append(result) except Exception as e: print(f"批处理请求失败: {e}") results.append({"error": str(e)}) print(f"\n=== 多批处理测试完成 ===") return results finally: # 关闭所有文件句柄 for files in batch_files: for _, file_tuple in files: file_obj = file_tuple[1] # 获取文件对象 if hasattr(file_obj, 'close'): file_obj.close() def send_batch_request(files, batch_idx): """发送单个批处理请求""" print(f"[批次{batch_idx}] 开始请求 {time.strftime('%H:%M:%S')}") data = { "backend": "pipeline", "parse_method": "auto", "formula_enable": "true", "table_enable": "true", "return_md": "true", "return_middle_json": "true", "response_format_zip": "true", "start_page_id": 0, "end_page_id": None, } try: response = requests.post( BATCH_CONFIG["api_url"], files=files, data=data, timeout=BATCH_CONFIG["timeout"] ) print(f"[批次{batch_idx}] 请求完成 {time.strftime('%H:%M:%S')} - 状态码: {response.status_code}") if response.status_code == 200: with open(f"batch_{batch_idx}_result.zip", "wb") as f: f.write(response.content) print(f"[批次{batch_idx}] ✅ 完成,结果保存为 batch_{batch_idx}_result.zip") return {"batch": batch_idx, "success": True} else: print(f"[批次{batch_idx}] ❌ 失败: {response.status_code}") return {"batch": batch_idx, "error": f"HTTP {response.status_code}"} except Exception as e: print(f"[批次{batch_idx}] ❌ 异常: {e}") return {"batch": batch_idx, "error": str(e)} if __name__ == "__main__": # 分批处理119页PDF测试 print("=== 119页PDF分批处理测试 ===") results = parse_pdf_batches() # 根据实际耗时计算150页预估时间 successful_results = [r for r in results if "success" in r and r["success"]] if successful_results: print(f"\n=== 150页处理时间预估 ===") print("基于119页的实际处理时间进行推算...") # 这里会显示装饰器统计的总耗时 print("总耗时将显示在日志中,基于此可推算150页处理时间")