mineru_test.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # -*- coding: utf-8 -*-
  2. import sys
  3. import os
  4. import json
  5. import base64
  6. import urllib.request
  7. # 获取当前脚本的绝对路径
  8. current_file = os.path.abspath(__file__)
  9. # 获取脚本所在目录(MinerU_Test)
  10. current_dir = os.path.dirname(current_file)
  11. # 获取上一级目录(utils_test)
  12. parent_dir = os.path.dirname(current_dir)
  13. root_dir = os.path.dirname(parent_dir)
  14. import requests
  15. from concurrent.futures import ThreadPoolExecutor, as_completed
  16. from foundation.observability.monitoring.time_statistics import track_execution_time
  17. @track_execution_time
  18. def save_images_from_response(response_data, output_dir="test/mineru_temp"):
  19. """从API响应中提取并保存图片"""
  20. # 创建输出目录
  21. os.makedirs(output_dir, exist_ok=True)
  22. # 检查是否有图片数据
  23. if "results" not in response_data or "scan" not in response_data["results"]:
  24. print("❌ 响应中未找到图片数据")
  25. return
  26. scan_result = response_data["results"]["scan"]
  27. saved_count = 0
  28. # 保存base64图片
  29. if "images" in scan_result:
  30. print(f"发现 {len(scan_result['images'])} 个base64图片")
  31. for filename, base64_data in scan_result["images"].items():
  32. try:
  33. # 解析base64数据格式
  34. if base64_data.startswith("data:image/"):
  35. # 提取实际的base64数据
  36. header, base64_string = base64_data.split(",", 1)
  37. file_extension = header.split("/")[1].split(";")[0]
  38. # 解码base64
  39. image_data = base64.b64decode(base64_string)
  40. # 保存图片文件
  41. output_filename = f"table_{saved_count}.{file_extension}"
  42. output_path = os.path.join(output_dir, output_filename)
  43. with open(output_path, "wb") as f:
  44. f.write(image_data)
  45. print(f"✅ 保存图片: {output_filename} ({len(image_data)} bytes)")
  46. saved_count += 1
  47. else:
  48. # 直接base64数据
  49. image_data = base64.b64decode(base64_data)
  50. output_path = os.path.join(output_dir, f"table_{saved_count}.jpg")
  51. with open(output_path, "wb") as f:
  52. f.write(image_data)
  53. print(f"✅ 保存图片: table_{saved_count}.jpg ({len(image_data)} bytes)")
  54. saved_count += 1
  55. except Exception as e:
  56. print(f"❌ 保存图片 {filename} 失败: {e}")
  57. # 检查middle_json中的图片信息(如果有额外的)
  58. if "middle_json" in scan_result:
  59. try:
  60. middle_json = json.loads(scan_result["middle_json"])
  61. # 查找图片路径和HTML信息
  62. for pdf_info in middle_json.get("pdf_info", []):
  63. for preproc_block in pdf_info.get("preproc_blocks", []):
  64. if preproc_block.get("type") == "table":
  65. for block in preproc_block.get("blocks", []):
  66. if block.get("type") == "table_body":
  67. for line in block.get("lines", []):
  68. for span in line.get("spans", []):
  69. if span.get("type") == "table":
  70. # 保存HTML内容
  71. html_content = span.get('html', 'N/A')
  72. if html_content != 'N/A':
  73. html_file = os.path.join(output_dir, f"table_html_{saved_count}.html")
  74. with open(html_file, "w", encoding="utf-8") as f:
  75. f.write(html_content)
  76. print(f"✅ 保存表格HTML: table_html_{saved_count}.html")
  77. except json.JSONDecodeError as e:
  78. print(f"❌ 解析middle_json失败: {e}")
  79. print(f"✅ 图片保存完成,共保存 {saved_count} 个图片文件到 {output_dir}")
  80. return saved_count
  81. @track_execution_time
  82. def save_middle_json(response_data, output_dir="test/mineru_temp"):
  83. """提取并保存middle_json为单独文件"""
  84. os.makedirs(output_dir, exist_ok=True)
  85. if "results" not in response_data or "scan" not in response_data["results"]:
  86. print("❌ 响应中未找到middle_json数据")
  87. return False
  88. scan_result = response_data["results"]["scan"]
  89. if "middle_json" not in scan_result:
  90. print("❌ scan结果中未找到middle_json")
  91. return False
  92. try:
  93. # 解析middle_json字符串
  94. middle_json_data = json.loads(scan_result["middle_json"])
  95. # 保存格式化的middle_json
  96. middle_json_file = os.path.join(output_dir, "middle_json_pretty.json")
  97. with open(middle_json_file, "w", encoding="utf-8") as f:
  98. json.dump(middle_json_data, f, ensure_ascii=False, indent=4)
  99. print(f"✅ Middle JSON已保存到 {middle_json_file}")
  100. # 打印middle_json的基本信息
  101. if "pdf_info" in middle_json_data:
  102. pdf_info = middle_json_data["pdf_info"]
  103. print(f"📄 PDF信息: {len(pdf_info)} 页")
  104. # 统计各类型块的数量
  105. total_blocks = 0
  106. table_count = 0
  107. text_count = 0
  108. title_count = 0
  109. for page_info in pdf_info:
  110. preproc_blocks = page_info.get("preproc_blocks", [])
  111. total_blocks += len(preproc_blocks)
  112. for block in preproc_blocks:
  113. block_type = block.get("type", "")
  114. if block_type == "table":
  115. table_count += 1
  116. elif block_type == "text":
  117. text_count += 1
  118. elif block_type == "title":
  119. title_count += 1
  120. print(f"📊 统计信息:")
  121. print(f" 总块数: {total_blocks}")
  122. print(f" 表格块: {table_count}")
  123. print(f" 文本块: {text_count}")
  124. print(f" 标题块: {title_count}")
  125. return True
  126. except json.JSONDecodeError as e:
  127. print(f"❌ 解析middle_json失败: {e}")
  128. return False
  129. except Exception as e:
  130. print(f"❌ 保存middle_json失败: {e}")
  131. return False
  132. @track_execution_time
  133. def parse_file():
  134. """使用 API 解析文件"""
  135. API_URL = "http://aiclu.small-app.wang:8000/file_parse"
  136. files = [
  137. ("files", ("scan.png", open("utils_test\MinerU_Test\sgfa_mineru_testimage.png", "rb"), "image/png")),
  138. ]
  139. data = {
  140. "backend": "pipeline",
  141. "parse_method": "auto",
  142. "formula_enable": "true",
  143. "table_enable": "true",
  144. "return_md": "true",
  145. "start_page_id": 0,
  146. "end_page_id": None,
  147. "return_middle_json": "true",
  148. "return_images": "true",
  149. "response_format_zip": "true", # 保持为 true
  150. }
  151. response = requests.post(API_URL, files=files, data=data)
  152. # 检查响应是否为 ZIP 包(通过响应头或内容判断)
  153. if "application/zip" in response.headers.get("Content-Type", "") or response.content.startswith(b"PK"):
  154. # 保存 ZIP 包到本地
  155. zip_output_dir = "utils_test\MinerU_Test\mineru_temp"
  156. os.makedirs(zip_output_dir, exist_ok=True)
  157. zip_file_path = os.path.join(zip_output_dir, "api_response.zip")
  158. # 保存二进制 ZIP 数据
  159. with open(zip_file_path, "wb") as f:
  160. f.write(response.content)
  161. print(f"✅ ZIP 包已保存到: {zip_file_path} ({len(response.content)} bytes)")
  162. # 自动解压 ZIP 包
  163. import zipfile
  164. with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
  165. zip_ref.extractall(os.path.join(zip_output_dir, "unzipped"))
  166. print(f"✅ ZIP 包已解压到: {os.path.join(zip_output_dir, 'unzipped')}")
  167. # 解压后可以读取里面的 JSON/图片文件(示例:读取 middle_json)
  168. unzipped_dir = os.path.join(zip_output_dir, "unzipped")
  169. middle_json_path = os.path.join(unzipped_dir, "middle_json.json") # 假设 ZIP 内有这个文件(需根据实际 ZIP 结构调整)
  170. if os.path.exists(middle_json_path):
  171. with open(middle_json_path, "r", encoding="utf-8") as f:
  172. middle_json_data = json.load(f)
  173. print(f"📊 解压后读取到 middle_json,包含 {len(middle_json_data.get('pdf_info', []))} 页 PDF 信息")
  174. else:
  175. # 若 API 未返回 ZIP(兼容情况),尝试按 JSON 解析
  176. try:
  177. result = response.json()
  178. print("=== API响应概要(JSON格式)===")
  179. # 后续逻辑和原代码一致...
  180. except json.JSONDecodeError as e:
  181. print(f"❌ 解析失败:既不是 ZIP 也不是 JSON - {e}")
  182. parse_file()