| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- # -*- coding: utf-8 -*-
- import sys
- import os
- import json
- import base64
- import urllib.request
- # 获取当前脚本的绝对路径
- current_file = os.path.abspath(__file__)
- # 获取脚本所在目录(MinerU_Test)
- current_dir = os.path.dirname(current_file)
- # 获取上一级目录(utils_test)
- parent_dir = os.path.dirname(current_dir)
- # 获取项目根目录(LQAgentPlatform,即 foundation 所在的目录)
- root_dir = os.path.dirname(parent_dir)
- # 将项目根目录添加到 sys.path
- sys.path.append(root_dir)
- import requests
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from foundation.observability.monitoring.time_statistics import track_execution_time
- @track_execution_time
- def save_images_from_response(response_data, output_dir="test/mineru_temp"):
- """从API响应中提取并保存图片"""
- # 创建输出目录
- os.makedirs(output_dir, exist_ok=True)
- # 检查是否有图片数据
- if "results" not in response_data or "scan" not in response_data["results"]:
- print("❌ 响应中未找到图片数据")
- return
- scan_result = response_data["results"]["scan"]
- saved_count = 0
- # 保存base64图片
- if "images" in scan_result:
- print(f"发现 {len(scan_result['images'])} 个base64图片")
- for filename, base64_data in scan_result["images"].items():
- try:
- # 解析base64数据格式
- if base64_data.startswith("data:image/"):
- # 提取实际的base64数据
- header, base64_string = base64_data.split(",", 1)
- file_extension = header.split("/")[1].split(";")[0]
- # 解码base64
- image_data = base64.b64decode(base64_string)
- # 保存图片文件
- output_filename = f"table_{saved_count}.{file_extension}"
- output_path = os.path.join(output_dir, output_filename)
- with open(output_path, "wb") as f:
- f.write(image_data)
- print(f"✅ 保存图片: {output_filename} ({len(image_data)} bytes)")
- saved_count += 1
- else:
- # 直接base64数据
- image_data = base64.b64decode(base64_data)
- output_path = os.path.join(output_dir, f"table_{saved_count}.jpg")
- with open(output_path, "wb") as f:
- f.write(image_data)
- print(f"✅ 保存图片: table_{saved_count}.jpg ({len(image_data)} bytes)")
- saved_count += 1
- except Exception as e:
- print(f"❌ 保存图片 {filename} 失败: {e}")
- # 检查middle_json中的图片信息(如果有额外的)
- if "middle_json" in scan_result:
- try:
- middle_json = json.loads(scan_result["middle_json"])
- # 查找图片路径和HTML信息
- for pdf_info in middle_json.get("pdf_info", []):
- for preproc_block in pdf_info.get("preproc_blocks", []):
- if preproc_block.get("type") == "table":
- for block in preproc_block.get("blocks", []):
- if block.get("type") == "table_body":
- for line in block.get("lines", []):
- for span in line.get("spans", []):
- if span.get("type") == "table":
- # 保存HTML内容
- html_content = span.get('html', 'N/A')
- if html_content != 'N/A':
- html_file = os.path.join(output_dir, f"table_html_{saved_count}.html")
- with open(html_file, "w", encoding="utf-8") as f:
- f.write(html_content)
- print(f"✅ 保存表格HTML: table_html_{saved_count}.html")
- except json.JSONDecodeError as e:
- print(f"❌ 解析middle_json失败: {e}")
- print(f"✅ 图片保存完成,共保存 {saved_count} 个图片文件到 {output_dir}")
- return saved_count
- @track_execution_time
- def save_middle_json(response_data, output_dir="test/mineru_temp"):
- """提取并保存middle_json为单独文件"""
- os.makedirs(output_dir, exist_ok=True)
- if "results" not in response_data or "scan" not in response_data["results"]:
- print("❌ 响应中未找到middle_json数据")
- return False
- scan_result = response_data["results"]["scan"]
- if "middle_json" not in scan_result:
- print("❌ scan结果中未找到middle_json")
- return False
- try:
- # 解析middle_json字符串
- middle_json_data = json.loads(scan_result["middle_json"])
- # 保存格式化的middle_json
- middle_json_file = os.path.join(output_dir, "middle_json_pretty.json")
- with open(middle_json_file, "w", encoding="utf-8") as f:
- json.dump(middle_json_data, f, ensure_ascii=False, indent=4)
- print(f"✅ Middle JSON已保存到 {middle_json_file}")
- # 打印middle_json的基本信息
- if "pdf_info" in middle_json_data:
- pdf_info = middle_json_data["pdf_info"]
- print(f"📄 PDF信息: {len(pdf_info)} 页")
- # 统计各类型块的数量
- total_blocks = 0
- table_count = 0
- text_count = 0
- title_count = 0
- for page_info in pdf_info:
- preproc_blocks = page_info.get("preproc_blocks", [])
- total_blocks += len(preproc_blocks)
- for block in preproc_blocks:
- block_type = block.get("type", "")
- if block_type == "table":
- table_count += 1
- elif block_type == "text":
- text_count += 1
- elif block_type == "title":
- title_count += 1
- print(f"📊 统计信息:")
- print(f" 总块数: {total_blocks}")
- print(f" 表格块: {table_count}")
- print(f" 文本块: {text_count}")
- print(f" 标题块: {title_count}")
- return True
- except json.JSONDecodeError as e:
- print(f"❌ 解析middle_json失败: {e}")
- return False
- except Exception as e:
- print(f"❌ 保存middle_json失败: {e}")
- return False
- @track_execution_time
- def parse_file():
- """使用 API 解析文件"""
- API_URL = "http://aiclu.small-app.wang:8000/file_parse"
- files = [
- ("files", ("scan.png", open("utils_test\MinerU_Test\sgfa_mineru_testimage.png", "rb"), "image/png")),
- ]
- data = {
- "backend": "pipeline",
- "parse_method": "auto",
- "formula_enable": "true",
- "table_enable": "true",
- "return_md": "true",
- "start_page_id": 0,
- "end_page_id": None,
- "return_middle_json": "true",
- "return_images": "true",
- "response_format_zip": "true", # 保持为 true
- }
- response = requests.post(API_URL, files=files, data=data)
- # 检查响应是否为 ZIP 包(通过响应头或内容判断)
- if "application/zip" in response.headers.get("Content-Type", "") or response.content.startswith(b"PK"):
- # 保存 ZIP 包到本地
- zip_output_dir = "utils_test\MinerU_Test\mineru_temp"
- os.makedirs(zip_output_dir, exist_ok=True)
- zip_file_path = os.path.join(zip_output_dir, "api_response.zip")
- # 保存二进制 ZIP 数据
- with open(zip_file_path, "wb") as f:
- f.write(response.content)
- print(f"✅ ZIP 包已保存到: {zip_file_path} ({len(response.content)} bytes)")
- # 自动解压 ZIP 包
- import zipfile
- with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
- zip_ref.extractall(os.path.join(zip_output_dir, "unzipped"))
- print(f"✅ ZIP 包已解压到: {os.path.join(zip_output_dir, 'unzipped')}")
- # 解压后可以读取里面的 JSON/图片文件(示例:读取 middle_json)
- unzipped_dir = os.path.join(zip_output_dir, "unzipped")
- middle_json_path = os.path.join(unzipped_dir, "middle_json.json") # 假设 ZIP 内有这个文件(需根据实际 ZIP 结构调整)
- if os.path.exists(middle_json_path):
- with open(middle_json_path, "r", encoding="utf-8") as f:
- middle_json_data = json.load(f)
- print(f"📊 解压后读取到 middle_json,包含 {len(middle_json_data.get('pdf_info', []))} 页 PDF 信息")
- else:
- # 若 API 未返回 ZIP(兼容情况),尝试按 JSON 解析
- try:
- result = response.json()
- print("=== API响应概要(JSON格式)===")
- # 后续逻辑和原代码一致...
- except json.JSONDecodeError as e:
- print(f"❌ 解析失败:既不是 ZIP 也不是 JSON - {e}")
- parse_file()
|