mineru_性能预估脚本.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. import sys
  2. import os
  3. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  4. import requests
  5. import time
  6. from foundation.observability.monitoring.time_statistics import track_execution_time
  7. # 批处理配置
  8. BATCH_CONFIG = {
  9. "api_url": "http://aiclu.small-app.wang:8000/file_parse",
  10. "batch_size": 3, # 批量大小:调整为3次,避免超时
  11. "timeout": 360, # 增加超时时间,PDF可能更耗时
  12. "test_file": "data_pipeline/test_rawdata/成渝扩容桥梁下部结构专项施工方案(正式版).pdf"
  13. }
  14. def process_pdf_batch(pdf_file, start_page, end_page, batch_idx):
  15. """处理PDF的一个批次"""
  16. print(f"[批次{batch_idx}] 开始处理第{start_page}-{end_page}页 {time.strftime('%H:%M:%S')}")
  17. files = [
  18. ("files", ("chengyu_bridge.pdf", open(pdf_file, "rb"), "application/pdf"))
  19. ]
  20. data = {
  21. "backend": "pipeline",
  22. "parse_method": "auto",
  23. "formula_enable": "true",
  24. "table_enable": "true",
  25. "return_md": "true",
  26. "return_middle_json": "true",
  27. "start_page_id": start_page,
  28. "end_page_id": end_page,
  29. }
  30. try:
  31. response = requests.post(
  32. BATCH_CONFIG["api_url"],
  33. files=files,
  34. data=data,
  35. timeout=BATCH_CONFIG["timeout"]
  36. )
  37. print(f"[批次{batch_idx}] 完成时间: {time.strftime('%H:%M:%S')} - 状态码: {response.status_code}")
  38. if response.status_code == 200:
  39. result = response.json()
  40. page_count = end_page - start_page + 1
  41. print(f"[批次{batch_idx}] ✅ 完成,处理了{page_count}页")
  42. return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "success": True, "result": result}
  43. else:
  44. print(f"[批次{batch_idx}] ❌ 失败: {response.status_code}")
  45. return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "error": f"HTTP {response.status_code}"}
  46. except Exception as e:
  47. print(f"[批次{batch_idx}] ❌ 异常: {e}")
  48. return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "error": str(e)}
  49. finally:
  50. # 确保关闭文件句柄
  51. for _, file_tuple in files:
  52. file_obj = file_tuple[1]
  53. if hasattr(file_obj, 'close'):
  54. file_obj.close()
  55. @track_execution_time
  56. def parse_pdf_batches():
  57. """分批处理119页PDF文件(每批20页)"""
  58. pdf_file = BATCH_CONFIG["test_file"]
  59. total_pages = 119
  60. batch_size = 20 # 每批处理20页
  61. batches_count = (total_pages + batch_size - 1) // batch_size # 向上取整
  62. print(f"开始分批处理PDF文件: {pdf_file}")
  63. print(f"总页数: {total_pages}页")
  64. print(f"每批处理: {batch_size}页")
  65. print(f"总批次数: {batches_count}批")
  66. print(f"开始时间: {time.strftime('%H:%M:%S')}")
  67. # 准备所有批次
  68. batches = []
  69. for i in range(batches_count):
  70. start_page = i * batch_size
  71. end_page = min((i + 1) * batch_size - 1, total_pages - 1)
  72. batches.append((start_page, end_page, i + 1))
  73. # 串行处理所有批次
  74. results = []
  75. for start_page, end_page, batch_idx in batches:
  76. print(f"\n开始处理批次 {batch_idx}/{batches_count}")
  77. result = process_pdf_batch(pdf_file, start_page, end_page, batch_idx)
  78. results.append(result)
  79. # 打印进度
  80. processed_pages = end_page + 1
  81. progress = (processed_pages / total_pages) * 100
  82. print(f"进度: {processed_pages}/{total_pages} 页 ({progress:.1f}%)")
  83. # 统计结果
  84. successful_batches = [r for r in results if "success" in r and r["success"]]
  85. failed_batches = [r for r in results if "error" in r]
  86. print(f"\n=== 处理完成统计 ===")
  87. print(f"成功批次: {len(successful_batches)}/{batches_count}")
  88. print(f"失败批次: {len(failed_batches)}")
  89. print(f"总处理页数: {total_pages}")
  90. return results
  91. @track_execution_time
  92. def parse_multiple_batches():
  93. """多批处理并发测试"""
  94. print(f"开始多批处理并发测试,每个批次5个文件")
  95. # 准备不同批次的文件列表(为了区分,可以复制同一个文件到不同名称)
  96. batch_files = []
  97. for batch_idx in range(3): # 3个批次
  98. files = []
  99. for i in range(BATCH_CONFIG["batch_size"]):
  100. files.append((
  101. "files",
  102. (f"batch{batch_idx+1}_scan_{i+1}.png",
  103. open("test/sgfa_mineru_testimage.png", "rb"),
  104. "image/png")
  105. ))
  106. batch_files.append(files)
  107. try:
  108. # 并发发送多个批处理请求
  109. from concurrent.futures import ThreadPoolExecutor, as_completed
  110. with ThreadPoolExecutor(max_workers=3) as executor:
  111. futures = []
  112. for batch_idx, files in enumerate(batch_files):
  113. future = executor.submit(send_batch_request, files, batch_idx + 1)
  114. futures.append(future)
  115. results = []
  116. for future in as_completed(futures):
  117. try:
  118. result = future.result()
  119. results.append(result)
  120. except Exception as e:
  121. print(f"批处理请求失败: {e}")
  122. results.append({"error": str(e)})
  123. print(f"\n=== 多批处理测试完成 ===")
  124. return results
  125. finally:
  126. # 关闭所有文件句柄
  127. for files in batch_files:
  128. for _, file_tuple in files:
  129. file_obj = file_tuple[1] # 获取文件对象
  130. if hasattr(file_obj, 'close'):
  131. file_obj.close()
  132. def send_batch_request(files, batch_idx):
  133. """发送单个批处理请求"""
  134. print(f"[批次{batch_idx}] 开始请求 {time.strftime('%H:%M:%S')}")
  135. data = {
  136. "backend": "pipeline",
  137. "parse_method": "auto",
  138. "formula_enable": "true",
  139. "table_enable": "true",
  140. "return_md": "true",
  141. "return_middle_json": "true",
  142. "response_format_zip": "true",
  143. "start_page_id": 0,
  144. "end_page_id": None,
  145. }
  146. try:
  147. response = requests.post(
  148. BATCH_CONFIG["api_url"],
  149. files=files,
  150. data=data,
  151. timeout=BATCH_CONFIG["timeout"]
  152. )
  153. print(f"[批次{batch_idx}] 请求完成 {time.strftime('%H:%M:%S')} - 状态码: {response.status_code}")
  154. if response.status_code == 200:
  155. with open(f"batch_{batch_idx}_result.zip", "wb") as f:
  156. f.write(response.content)
  157. print(f"[批次{batch_idx}] ✅ 完成,结果保存为 batch_{batch_idx}_result.zip")
  158. return {"batch": batch_idx, "success": True}
  159. else:
  160. print(f"[批次{batch_idx}] ❌ 失败: {response.status_code}")
  161. return {"batch": batch_idx, "error": f"HTTP {response.status_code}"}
  162. except Exception as e:
  163. print(f"[批次{batch_idx}] ❌ 异常: {e}")
  164. return {"batch": batch_idx, "error": str(e)}
  165. if __name__ == "__main__":
  166. # 分批处理119页PDF测试
  167. print("=== 119页PDF分批处理测试 ===")
  168. results = parse_pdf_batches()
  169. # 根据实际耗时计算150页预估时间
  170. successful_results = [r for r in results if "success" in r and r["success"]]
  171. if successful_results:
  172. print(f"\n=== 150页处理时间预估 ===")
  173. print("基于119页的实际处理时间进行推算...")
  174. # 这里会显示装饰器统计的总耗时
  175. print("总耗时将显示在日志中,基于此可推算150页处理时间")