mineru_性能预估脚本.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. import sys
  2. import os
  3. import requests
  4. import time
  5. from foundation.observability.monitoring.time_statistics import track_execution_time
  6. # 批处理配置
  7. BATCH_CONFIG = {
  8. "api_url": "http://aiclu.small-app.wang:8000/file_parse",
  9. "batch_size": 3, # 批量大小:调整为3次,避免超时
  10. "timeout": 360, # 增加超时时间,PDF可能更耗时
  11. "test_file": "data_pipeline/test_rawdata/成渝扩容桥梁下部结构专项施工方案(正式版).pdf"
  12. }
  13. def process_pdf_batch(pdf_file, start_page, end_page, batch_idx):
  14. """处理PDF的一个批次"""
  15. print(f"[批次{batch_idx}] 开始处理第{start_page}-{end_page}页 {time.strftime('%H:%M:%S')}")
  16. files = [
  17. ("files", ("chengyu_bridge.pdf", open(pdf_file, "rb"), "application/pdf"))
  18. ]
  19. data = {
  20. "backend": "pipeline",
  21. "parse_method": "auto",
  22. "formula_enable": "true",
  23. "table_enable": "true",
  24. "return_md": "true",
  25. "return_middle_json": "true",
  26. "start_page_id": start_page,
  27. "end_page_id": end_page,
  28. }
  29. try:
  30. response = requests.post(
  31. BATCH_CONFIG["api_url"],
  32. files=files,
  33. data=data,
  34. timeout=BATCH_CONFIG["timeout"]
  35. )
  36. print(f"[批次{batch_idx}] 完成时间: {time.strftime('%H:%M:%S')} - 状态码: {response.status_code}")
  37. if response.status_code == 200:
  38. result = response.json()
  39. page_count = end_page - start_page + 1
  40. print(f"[批次{batch_idx}] ✅ 完成,处理了{page_count}页")
  41. return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "success": True, "result": result}
  42. else:
  43. print(f"[批次{batch_idx}] ❌ 失败: {response.status_code}")
  44. return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "error": f"HTTP {response.status_code}"}
  45. except Exception as e:
  46. print(f"[批次{batch_idx}] ❌ 异常: {e}")
  47. return {"batch": batch_idx, "start_page": start_page, "end_page": end_page, "error": str(e)}
  48. finally:
  49. # 确保关闭文件句柄
  50. for _, file_tuple in files:
  51. file_obj = file_tuple[1]
  52. if hasattr(file_obj, 'close'):
  53. file_obj.close()
  54. @track_execution_time
  55. def parse_pdf_batches():
  56. """分批处理119页PDF文件(每批20页)"""
  57. pdf_file = BATCH_CONFIG["test_file"]
  58. total_pages = 119
  59. batch_size = 20 # 每批处理20页
  60. batches_count = (total_pages + batch_size - 1) // batch_size # 向上取整
  61. print(f"开始分批处理PDF文件: {pdf_file}")
  62. print(f"总页数: {total_pages}页")
  63. print(f"每批处理: {batch_size}页")
  64. print(f"总批次数: {batches_count}批")
  65. print(f"开始时间: {time.strftime('%H:%M:%S')}")
  66. # 准备所有批次
  67. batches = []
  68. for i in range(batches_count):
  69. start_page = i * batch_size
  70. end_page = min((i + 1) * batch_size - 1, total_pages - 1)
  71. batches.append((start_page, end_page, i + 1))
  72. # 串行处理所有批次
  73. results = []
  74. for start_page, end_page, batch_idx in batches:
  75. print(f"\n开始处理批次 {batch_idx}/{batches_count}")
  76. result = process_pdf_batch(pdf_file, start_page, end_page, batch_idx)
  77. results.append(result)
  78. # 打印进度
  79. processed_pages = end_page + 1
  80. progress = (processed_pages / total_pages) * 100
  81. print(f"进度: {processed_pages}/{total_pages} 页 ({progress:.1f}%)")
  82. # 统计结果
  83. successful_batches = [r for r in results if "success" in r and r["success"]]
  84. failed_batches = [r for r in results if "error" in r]
  85. print(f"\n=== 处理完成统计 ===")
  86. print(f"成功批次: {len(successful_batches)}/{batches_count}")
  87. print(f"失败批次: {len(failed_batches)}")
  88. print(f"总处理页数: {total_pages}")
  89. return results
  90. @track_execution_time
  91. def parse_multiple_batches():
  92. """多批处理并发测试"""
  93. print(f"开始多批处理并发测试,每个批次5个文件")
  94. # 准备不同批次的文件列表(为了区分,可以复制同一个文件到不同名称)
  95. batch_files = []
  96. for batch_idx in range(3): # 3个批次
  97. files = []
  98. for i in range(BATCH_CONFIG["batch_size"]):
  99. files.append((
  100. "files",
  101. (f"batch{batch_idx+1}_scan_{i+1}.png",
  102. open("test/sgfa_mineru_testimage.png", "rb"),
  103. "image/png")
  104. ))
  105. batch_files.append(files)
  106. try:
  107. # 并发发送多个批处理请求
  108. from concurrent.futures import ThreadPoolExecutor, as_completed
  109. with ThreadPoolExecutor(max_workers=3) as executor:
  110. futures = []
  111. for batch_idx, files in enumerate(batch_files):
  112. future = executor.submit(send_batch_request, files, batch_idx + 1)
  113. futures.append(future)
  114. results = []
  115. for future in as_completed(futures):
  116. try:
  117. result = future.result()
  118. results.append(result)
  119. except Exception as e:
  120. print(f"批处理请求失败: {e}")
  121. results.append({"error": str(e)})
  122. print(f"\n=== 多批处理测试完成 ===")
  123. return results
  124. finally:
  125. # 关闭所有文件句柄
  126. for files in batch_files:
  127. for _, file_tuple in files:
  128. file_obj = file_tuple[1] # 获取文件对象
  129. if hasattr(file_obj, 'close'):
  130. file_obj.close()
  131. def send_batch_request(files, batch_idx):
  132. """发送单个批处理请求"""
  133. print(f"[批次{batch_idx}] 开始请求 {time.strftime('%H:%M:%S')}")
  134. data = {
  135. "backend": "pipeline",
  136. "parse_method": "auto",
  137. "formula_enable": "true",
  138. "table_enable": "true",
  139. "return_md": "true",
  140. "return_middle_json": "true",
  141. "response_format_zip": "true",
  142. "start_page_id": 0,
  143. "end_page_id": None,
  144. }
  145. try:
  146. response = requests.post(
  147. BATCH_CONFIG["api_url"],
  148. files=files,
  149. data=data,
  150. timeout=BATCH_CONFIG["timeout"]
  151. )
  152. print(f"[批次{batch_idx}] 请求完成 {time.strftime('%H:%M:%S')} - 状态码: {response.status_code}")
  153. if response.status_code == 200:
  154. with open(f"batch_{batch_idx}_result.zip", "wb") as f:
  155. f.write(response.content)
  156. print(f"[批次{batch_idx}] ✅ 完成,结果保存为 batch_{batch_idx}_result.zip")
  157. return {"batch": batch_idx, "success": True}
  158. else:
  159. print(f"[批次{batch_idx}] ❌ 失败: {response.status_code}")
  160. return {"batch": batch_idx, "error": f"HTTP {response.status_code}"}
  161. except Exception as e:
  162. print(f"[批次{batch_idx}] ❌ 异常: {e}")
  163. return {"batch": batch_idx, "error": str(e)}
  164. if __name__ == "__main__":
  165. # 分批处理119页PDF测试
  166. print("=== 119页PDF分批处理测试 ===")
  167. results = parse_pdf_batches()
  168. # 根据实际耗时计算150页预估时间
  169. successful_results = [r for r in results if "success" in r and r["success"]]
  170. if successful_results:
  171. print(f"\n=== 150页处理时间预估 ===")
  172. print("基于119页的实际处理时间进行推算...")
  173. # 这里会显示装饰器统计的总耗时
  174. print("总耗时将显示在日志中,基于此可推算150页处理时间")