| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- import sys
- import os
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- import requests
- import time
- from foundation.observability.monitoring.time_statistics import track_execution_time
- # 批处理配置
- BATCH_CONFIG = {
- "api_url": "http://aiclu.small-app.wang:8000/file_parse",
- "batch_size": 3, # 批量大小:调整为3次,避免超时
- "timeout": 360, # 增加超时时间,PDF可能更耗时
- "test_file": "data_pipeline/test_rawdata/成渝扩容桥梁下部结构专项施工方案(正式版).pdf"
- }
- def process_pdf_batch(pdf_file, start_page, end_page, batch_idx):
- """处理PDF的一个批次"""
- print(f"[批次{batch_idx}] 开始处理第{start_page}-{end_page}页 {time.strftime('%H:%M:%S')}")
- files = [
- ("files", ("chengyu_bridge.pdf", open(pdf_file, "rb"), "application/pdf"))
- ]
- data = {
- "backend": "pipeline",
- "parse_method": "auto",
- "formula_enable": "true",
- "table_enable": "true",
- "return_md": "true",
- "return_middle_json": "true",
- "start_page_id": start_page,
- "end_page_id": end_page,
- }
- try:
- response = requests.post(
- BATCH_CONFIG["api_url"],
- files=files,
- data=data,
- timeout=BATCH_CONFIG["timeout"]
- )
- print(f"[批次{batch_idx}] 完成时间: {time.strftime('%H:%M:%S')} - 状态码: {response.status_code}")
- if response.status_code == 200:
- result = response.json()
- page_count = end_page - start_page + 1
- print(f"[批次{batch_idx}] ✅ 完成,处理了{page_count}页")
- return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "success": True, "result": result}
- else:
- print(f"[批次{batch_idx}] ❌ 失败: {response.status_code}")
- return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "error": f"HTTP {response.status_code}"}
- except Exception as e:
- print(f"[批次{batch_idx}] ❌ 异常: {e}")
- return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "error": str(e)}
- finally:
- # 确保关闭文件句柄
- for _, file_tuple in files:
- file_obj = file_tuple[1]
- if hasattr(file_obj, 'close'):
- file_obj.close()
- @track_execution_time
- def parse_pdf_batches():
- """分批处理119页PDF文件(每批20页)"""
- pdf_file = BATCH_CONFIG["test_file"]
- total_pages = 119
- batch_size = 20 # 每批处理20页
- batches_count = (total_pages + batch_size - 1) // batch_size # 向上取整
- print(f"开始分批处理PDF文件: {pdf_file}")
- print(f"总页数: {total_pages}页")
- print(f"每批处理: {batch_size}页")
- print(f"总批次数: {batches_count}批")
- print(f"开始时间: {time.strftime('%H:%M:%S')}")
- # 准备所有批次
- batches = []
- for i in range(batches_count):
- start_page = i * batch_size
- end_page = min((i + 1) * batch_size - 1, total_pages - 1)
- batches.append((start_page, end_page, i + 1))
- # 串行处理所有批次
- results = []
- for start_page, end_page, batch_idx in batches:
- print(f"\n开始处理批次 {batch_idx}/{batches_count}")
- result = process_pdf_batch(pdf_file, start_page, end_page, batch_idx)
- results.append(result)
- # 打印进度
- processed_pages = end_page + 1
- progress = (processed_pages / total_pages) * 100
- print(f"进度: {processed_pages}/{total_pages} 页 ({progress:.1f}%)")
- # 统计结果
- successful_batches = [r for r in results if "success" in r and r["success"]]
- failed_batches = [r for r in results if "error" in r]
- print(f"\n=== 处理完成统计 ===")
- print(f"成功批次: {len(successful_batches)}/{batches_count}")
- print(f"失败批次: {len(failed_batches)}")
- print(f"总处理页数: {total_pages}")
- return results
- @track_execution_time
- def parse_multiple_batches():
- """多批处理并发测试"""
- print(f"开始多批处理并发测试,每个批次5个文件")
- # 准备不同批次的文件列表(为了区分,可以复制同一个文件到不同名称)
- batch_files = []
- for batch_idx in range(3): # 3个批次
- files = []
- for i in range(BATCH_CONFIG["batch_size"]):
- files.append((
- "files",
- (f"batch{batch_idx+1}_scan_{i+1}.png",
- open("test/sgfa_mineru_testimage.png", "rb"),
- "image/png")
- ))
- batch_files.append(files)
- try:
- # 并发发送多个批处理请求
- from concurrent.futures import ThreadPoolExecutor, as_completed
- with ThreadPoolExecutor(max_workers=3) as executor:
- futures = []
- for batch_idx, files in enumerate(batch_files):
- future = executor.submit(send_batch_request, files, batch_idx + 1)
- futures.append(future)
- results = []
- for future in as_completed(futures):
- try:
- result = future.result()
- results.append(result)
- except Exception as e:
- print(f"批处理请求失败: {e}")
- results.append({"error": str(e)})
- print(f"\n=== 多批处理测试完成 ===")
- return results
- finally:
- # 关闭所有文件句柄
- for files in batch_files:
- for _, file_tuple in files:
- file_obj = file_tuple[1] # 获取文件对象
- if hasattr(file_obj, 'close'):
- file_obj.close()
- def send_batch_request(files, batch_idx):
- """发送单个批处理请求"""
- print(f"[批次{batch_idx}] 开始请求 {time.strftime('%H:%M:%S')}")
- data = {
- "backend": "pipeline",
- "parse_method": "auto",
- "formula_enable": "true",
- "table_enable": "true",
- "return_md": "true",
- "return_middle_json": "true",
- "response_format_zip": "true",
- "start_page_id": 0,
- "end_page_id": None,
- }
- try:
- response = requests.post(
- BATCH_CONFIG["api_url"],
- files=files,
- data=data,
- timeout=BATCH_CONFIG["timeout"]
- )
- print(f"[批次{batch_idx}] 请求完成 {time.strftime('%H:%M:%S')} - 状态码: {response.status_code}")
- if response.status_code == 200:
- with open(f"batch_{batch_idx}_result.zip", "wb") as f:
- f.write(response.content)
- print(f"[批次{batch_idx}] ✅ 完成,结果保存为 batch_{batch_idx}_result.zip")
- return {"batch": batch_idx, "success": True}
- else:
- print(f"[批次{batch_idx}] ❌ 失败: {response.status_code}")
- return {"batch": batch_idx, "error": f"HTTP {response.status_code}"}
- except Exception as e:
- print(f"[批次{batch_idx}] ❌ 异常: {e}")
- return {"batch": batch_idx, "error": str(e)}
- if __name__ == "__main__":
- # 分批处理119页PDF测试
- print("=== 119页PDF分批处理测试 ===")
- results = parse_pdf_batches()
- # 根据实际耗时计算150页预估时间
- successful_results = [r for r in results if "success" in r and r["success"]]
- if successful_results:
- print(f"\n=== 150页处理时间预估 ===")
- print("基于119页的实际处理时间进行推算...")
- # 这里会显示装饰器统计的总耗时
- print("总耗时将显示在日志中,基于此可推算150页处理时间")
|