# -*- coding: utf-8 -*- import sys import os import json import base64 import urllib.request # 获取当前脚本的绝对路径 current_file = os.path.abspath(__file__) # 获取脚本所在目录(MinerU_Test) current_dir = os.path.dirname(current_file) # 获取上一级目录(utils_test) parent_dir = os.path.dirname(current_dir) # 获取项目根目录(LQAgentPlatform,即 foundation 所在的目录) root_dir = os.path.dirname(parent_dir) # 将项目根目录添加到 sys.path sys.path.append(root_dir) import requests from concurrent.futures import ThreadPoolExecutor, as_completed from foundation.observability.monitoring.time_statistics import track_execution_time @track_execution_time def save_images_from_response(response_data, output_dir="test/mineru_temp"): """从API响应中提取并保存图片""" # 创建输出目录 os.makedirs(output_dir, exist_ok=True) # 检查是否有图片数据 if "results" not in response_data or "scan" not in response_data["results"]: print("❌ 响应中未找到图片数据") return scan_result = response_data["results"]["scan"] saved_count = 0 # 保存base64图片 if "images" in scan_result: print(f"发现 {len(scan_result['images'])} 个base64图片") for filename, base64_data in scan_result["images"].items(): try: # 解析base64数据格式 if base64_data.startswith("data:image/"): # 提取实际的base64数据 header, base64_string = base64_data.split(",", 1) file_extension = header.split("/")[1].split(";")[0] # 解码base64 image_data = base64.b64decode(base64_string) # 保存图片文件 output_filename = f"table_{saved_count}.{file_extension}" output_path = os.path.join(output_dir, output_filename) with open(output_path, "wb") as f: f.write(image_data) print(f"✅ 保存图片: {output_filename} ({len(image_data)} bytes)") saved_count += 1 else: # 直接base64数据 image_data = base64.b64decode(base64_data) output_path = os.path.join(output_dir, f"table_{saved_count}.jpg") with open(output_path, "wb") as f: f.write(image_data) print(f"✅ 保存图片: table_{saved_count}.jpg ({len(image_data)} bytes)") saved_count += 1 except Exception as e: print(f"❌ 保存图片 {filename} 失败: {e}") # 检查middle_json中的图片信息(如果有额外的) if "middle_json" in scan_result: try: middle_json = json.loads(scan_result["middle_json"]) # 查找图片路径和HTML信息 for pdf_info in middle_json.get("pdf_info", []): for preproc_block in pdf_info.get("preproc_blocks", []): if preproc_block.get("type") == "table": for block in preproc_block.get("blocks", []): if block.get("type") == "table_body": for line in block.get("lines", []): for span in line.get("spans", []): if span.get("type") == "table": # 保存HTML内容 html_content = span.get('html', 'N/A') if html_content != 'N/A': html_file = os.path.join(output_dir, f"table_html_{saved_count}.html") with open(html_file, "w", encoding="utf-8") as f: f.write(html_content) print(f"✅ 保存表格HTML: table_html_{saved_count}.html") except json.JSONDecodeError as e: print(f"❌ 解析middle_json失败: {e}") print(f"✅ 图片保存完成,共保存 {saved_count} 个图片文件到 {output_dir}") return saved_count @track_execution_time def save_middle_json(response_data, output_dir="test/mineru_temp"): """提取并保存middle_json为单独文件""" os.makedirs(output_dir, exist_ok=True) if "results" not in response_data or "scan" not in response_data["results"]: print("❌ 响应中未找到middle_json数据") return False scan_result = response_data["results"]["scan"] if "middle_json" not in scan_result: print("❌ scan结果中未找到middle_json") return False try: # 解析middle_json字符串 middle_json_data = json.loads(scan_result["middle_json"]) # 保存格式化的middle_json middle_json_file = os.path.join(output_dir, "middle_json_pretty.json") with open(middle_json_file, "w", encoding="utf-8") as f: json.dump(middle_json_data, f, ensure_ascii=False, indent=4) print(f"✅ Middle JSON已保存到 {middle_json_file}") # 打印middle_json的基本信息 if "pdf_info" in middle_json_data: pdf_info = middle_json_data["pdf_info"] print(f"📄 PDF信息: {len(pdf_info)} 页") # 统计各类型块的数量 total_blocks = 0 table_count = 0 text_count = 0 title_count = 0 for page_info in pdf_info: preproc_blocks = page_info.get("preproc_blocks", []) total_blocks += len(preproc_blocks) for block in preproc_blocks: block_type = block.get("type", "") if block_type == "table": table_count += 1 elif block_type == "text": text_count += 1 elif block_type == "title": title_count += 1 print(f"📊 统计信息:") print(f" 总块数: {total_blocks}") print(f" 表格块: {table_count}") print(f" 文本块: {text_count}") print(f" 标题块: {title_count}") return True except json.JSONDecodeError as e: print(f"❌ 解析middle_json失败: {e}") return False except Exception as e: print(f"❌ 保存middle_json失败: {e}") return False @track_execution_time def parse_file(): """使用 API 解析文件""" API_URL = "http://aiclu.small-app.wang:8000/file_parse" files = [ ("files", ("scan.png", open("utils_test\MinerU_Test\sgfa_mineru_testimage.png", "rb"), "image/png")), ] data = { "backend": "pipeline", "parse_method": "auto", "formula_enable": "true", "table_enable": "true", "return_md": "true", "start_page_id": 0, "end_page_id": None, "return_middle_json": "true", "return_images": "true", "response_format_zip": "true", # 保持为 true } response = requests.post(API_URL, files=files, data=data) # 检查响应是否为 ZIP 包(通过响应头或内容判断) if "application/zip" in response.headers.get("Content-Type", "") or response.content.startswith(b"PK"): # 保存 ZIP 包到本地 zip_output_dir = "utils_test\MinerU_Test\mineru_temp" os.makedirs(zip_output_dir, exist_ok=True) zip_file_path = os.path.join(zip_output_dir, "api_response.zip") # 保存二进制 ZIP 数据 with open(zip_file_path, "wb") as f: f.write(response.content) print(f"✅ ZIP 包已保存到: {zip_file_path} ({len(response.content)} bytes)") # 自动解压 ZIP 包 import zipfile with zipfile.ZipFile(zip_file_path, "r") as zip_ref: zip_ref.extractall(os.path.join(zip_output_dir, "unzipped")) print(f"✅ ZIP 包已解压到: {os.path.join(zip_output_dir, 'unzipped')}") # 解压后可以读取里面的 JSON/图片文件(示例:读取 middle_json) unzipped_dir = os.path.join(zip_output_dir, "unzipped") middle_json_path = os.path.join(unzipped_dir, "middle_json.json") # 假设 ZIP 内有这个文件(需根据实际 ZIP 结构调整) if os.path.exists(middle_json_path): with open(middle_json_path, "r", encoding="utf-8") as f: middle_json_data = json.load(f) print(f"📊 解压后读取到 middle_json,包含 {len(middle_json_data.get('pdf_info', []))} 页 PDF 信息") else: # 若 API 未返回 ZIP(兼容情况),尝试按 JSON 解析 try: result = response.json() print("=== API响应概要(JSON格式)===") # 后续逻辑和原代码一致... except json.JSONDecodeError as e: print(f"❌ 解析失败:既不是 ZIP 也不是 JSON - {e}") parse_file()