Kaynağa Gözat

minerU转换逻辑修改

lingmin_package@163.com 1 hafta önce
ebeveyn
işleme
beb877d73e

+ 122 - 0
src/app/minerU/dev_minerU.py

@@ -0,0 +1,122 @@
+#!/usr/bin/env python3
+import os
+import requests
+import zipfile
+import shutil
+from pathlib import Path
+from concurrent.futures import ThreadPoolExecutor, as_completed
+
+#API_URL = "http://183.220.37.46:25428/file_parse"
+API_URL = "http://183.220.37.46:23428/mineru/file_parse"
+API_HEADERS = {
+    "Authorization": "Bearer sk_dev_aC_2gg8BS5ImUScrpaHIKS5x6gdLO9Js_ba854894"
+}
+
+# 默认路径(可通过命令行参数覆盖)
+DEFAULT_INPUT_DIR = Path(r"I:\wangxun_dev_workspace\lq_data_wrokspace\bpf_pdf\input")
+DEFAULT_OUTPUT_DIR = Path(r"I:\wangxun_dev_workspace\lq_data_wrokspace\bpf_pdf\output")
+
+def parse_file(file_path, output_dir):
+    filename = file_path.name
+
+    try:
+        with open(file_path, 'rb') as f:
+            files = {
+                'files': (filename, f, 'application/pdf')
+            }
+            data = {
+                'return_md': 'true',
+                'response_format_zip': 'true',
+                'return_original_file': 'true',
+                'return_middle_json': 'true',
+                'return_content_list': 'true',
+                'return_images': 'true'
+            }
+
+            print(f"Processing: {filename}")
+            response = requests.post(API_URL, files=files, data=data, headers=API_HEADERS)
+
+            if response.status_code == 200:
+                zip_filename = f"{file_path.stem}_result.zip"
+                zip_path = output_dir / zip_filename
+                extract_dir = output_dir / file_path.stem
+                
+                with open(zip_path, 'wb') as out_f:
+                    out_f.write(response.content)
+                
+                print(f"  Saved zip to: {zip_path}")
+                
+                extract_dir.mkdir(exist_ok=True)
+                with zipfile.ZipFile(zip_path, 'r') as zipf:
+                    zipf.extractall(extract_dir)
+                
+                nested_dir = extract_dir / file_path.stem
+                if nested_dir.exists() and nested_dir.is_dir():
+                    for item in nested_dir.iterdir():
+                        shutil.move(str(item), str(extract_dir / item.name))
+                    nested_dir.rmdir()
+                
+                os.remove(zip_path)
+                print(f"  Extracted to: {extract_dir}")
+                return (filename, True, None)
+            else:
+                error_msg = f"HTTP {response.status_code}: {response.text}"
+                print(f"  Error: {error_msg}")
+                return (filename, False, error_msg)
+    except Exception as e:
+        print(f"  Exception: {str(e)}")
+        return (filename, False, str(e))
+
+def main(input_dir, output_dir, max_workers=10):
+    input_dir = Path(input_dir)
+    output_dir = Path(output_dir)
+
+    input_dir.mkdir(exist_ok=True)
+    output_dir.mkdir(exist_ok=True)
+
+    pdf_files = list(input_dir.glob("*.pdf"))
+    
+    if not pdf_files:
+        print("No PDF files found in input directory")
+        return
+    
+    print(f"Found {len(pdf_files)} PDF file(s)")
+    print(f"Input: {input_dir}")
+    print(f"Output: {output_dir}")
+    print(f"Processing with {max_workers} concurrent workers\n")
+
+    with ThreadPoolExecutor(max_workers=max_workers) as executor:
+        futures = {executor.submit(parse_file, pdf_file, output_dir): pdf_file for pdf_file in pdf_files}
+        
+        success_count = 0
+        fail_count = 0
+        failed_files = []
+        
+        for future in as_completed(futures):
+            filename, success, error = future.result()
+            if success:
+                success_count += 1
+            else:
+                fail_count += 1
+                failed_files.append((filename, error))
+    
+    print(f"\nDone! Success: {success_count}, Failed: {fail_count}")
+    
+    if failed_files:
+        print("\nFailed files:")
+        for filename, error in failed_files:
+            print(f"  - {filename}: {error}")
+
+if __name__ == "__main__":
+    import argparse
+
+    parser = argparse.ArgumentParser(description='Parse PDF files using MinerU API')
+    parser.add_argument('-i', '--input', type=str, default=str(DEFAULT_INPUT_DIR),
+                        help='Input directory containing PDF files')
+    parser.add_argument('-o', '--output', type=str, default=str(DEFAULT_OUTPUT_DIR),
+                        help='Output directory for parsed results')
+    parser.add_argument('-w', '--workers', type=int, default=10,
+                        help='Number of concurrent workers (default: 10)')
+    args = parser.parse_args()
+
+    main(input_dir=args.input, output_dir=args.output, max_workers=args.workers)

+ 57 - 18
src/app/minerU/minerU.py

@@ -1,5 +1,6 @@
 #!/usr/bin/env python3
 import os
+import re
 import requests
 import zipfile
 import shutil
@@ -7,13 +8,24 @@ from pathlib import Path
 from concurrent.futures import ThreadPoolExecutor, as_completed
 
 API_URL = "http://183.220.37.46:25428/file_parse"
+#API_URL = "http://183.220.37.46:23428/mineru/file_parse"
 
 # 默认路径(可通过命令行参数覆盖)
 DEFAULT_INPUT_DIR = Path(r"I:\wangxun_dev_workspace\lq_data_wrokspace\bpf_pdf\input")
 DEFAULT_OUTPUT_DIR = Path(r"I:\wangxun_dev_workspace\lq_data_wrokspace\bpf_pdf\output")
 
+
+def clean_filename(name):
+    """清理文件名:去除中文标点符号(括号、书名号等),保留字母数字中文和下划线"""
+    return re.sub(r'[()()\[\]【】《》<>]', '_', name).strip('_')
+
 def parse_file(file_path, output_dir):
-    filename = file_path.name
+    filename = file_path.name  # e.g. "1《中华人民共和国水土保持法》(主席令第39号).pdf"
+    pdf_stem = file_path.stem  # e.g. "1《中华人民共和国水土保持法》(主席令第39号)"
+    cleaned_stem = clean_filename(pdf_stem)  # e.g. "1_中华人民共和国水土保持法__主席令第39号_"
+
+    outer_dir = output_dir / pdf_stem        # 外层:原始PDF文件名
+    inner_dir = outer_dir / cleaned_stem     # 内层:清理后的文件名
 
     try:
         with open(file_path, 'rb') as f:
@@ -33,27 +45,54 @@ def parse_file(file_path, output_dir):
             response = requests.post(API_URL, files=files, data=data)
 
             if response.status_code == 200:
-                zip_filename = f"{file_path.stem}_result.zip"
-                zip_path = output_dir / zip_filename
-                extract_dir = output_dir / file_path.stem
-                
+                zip_path = output_dir / f"{pdf_stem}_result.zip"
+
                 with open(zip_path, 'wb') as out_f:
                     out_f.write(response.content)
-                
+
                 print(f"  Saved zip to: {zip_path}")
-                
-                extract_dir.mkdir(exist_ok=True)
+
+                # 解压到临时目录
+                temp_dir = output_dir / f"__temp_{pdf_stem}"
+                temp_dir.mkdir(exist_ok=True)
                 with zipfile.ZipFile(zip_path, 'r') as zipf:
-                    zipf.extractall(extract_dir)
-                
-                nested_dir = extract_dir / file_path.stem
-                if nested_dir.exists() and nested_dir.is_dir():
-                    for item in nested_dir.iterdir():
-                        shutil.move(str(item), str(extract_dir / item.name))
-                    nested_dir.rmdir()
-                
-                os.remove(zip_path)
-                print(f"  Extracted to: {extract_dir}")
+                    zipf.extractall(temp_dir)
+
+                # 创建内外层目录
+                outer_dir.mkdir(exist_ok=True)
+                inner_dir.mkdir(exist_ok=True)
+
+                # 定位解压后的实际内容目录
+                source = temp_dir / pdf_stem
+                if source.exists() and source.is_dir():
+                    extract_source = source
+                elif (temp_dir / cleaned_stem).exists() and (temp_dir / cleaned_stem).is_dir():
+                    extract_source = temp_dir / cleaned_stem
+                else:
+                    # 取第一个子目录
+                    dirs = [d for d in temp_dir.iterdir() if d.is_dir()]
+                    extract_source = dirs[0] if dirs else temp_dir
+
+                # 移动所有文件到内层目录
+                for item in extract_source.iterdir():
+                    shutil.move(str(item), str(inner_dir / item.name))
+
+                # 清理临时目录和zip
+                shutil.rmtree(temp_dir, ignore_errors=True)
+                if zip_path.exists():
+                    os.remove(zip_path)
+
+                # 复制原始 PDF 到外层
+                shutil.copy2(file_path, outer_dir / filename)
+
+                # 复制 .md 文件到外层(副本)
+                md_files = list(inner_dir.glob("*.md"))
+                if md_files:
+                    for md_file in md_files:
+                        shutil.copy2(md_file, outer_dir / md_file.name)
+                    print(f"  PDF + MD copied to outer dir")
+
+                print(f"  Extracted to: {inner_dir}")
                 return (filename, True, None)
             else:
                 error_msg = f"HTTP {response.status_code}: {response.text}"

+ 357 - 0
src/app/minerU/official_api_minerU.py

@@ -0,0 +1,357 @@
+#!/usr/bin/env python3
+"""
+MinerU 官方 API (V4) PDF批量转换 脚本
+====================================================
+使用前先改下面 "===== 配置区 =====" 的路径和token
+
+功能:
+  1. 自动上传PDF → 轮询结果 → 下载解压
+  2. 自动清理多余文件,只保留 images/ .md _content_list.json _middle.json
+  3. UUID文件名自动改成 {文件夹名}_xxx
+  4. 输出文件夹下生成 转换报告.txt(含成功/失败/跳过明细)
+  5. 已转换的自动跳过(续传)
+
+  1《中华人民共和国水土保持法》(主席令第39号)
+    1_中华人民共和国水土保持法__主席令第39号_
+        1_中华人民共和国水土保持法__主席令第39号_.md                 (转换后的Markdown文件)
+        1_中华人民共和国水土保持法__主席令第39号__content_list.json  (转换后的json文件)
+        1_中华人民共和国水土保持法__主席令第39号__middle.json        (转换后的json文件)
+        ...
+    1_中华人民共和国水土保持法__主席令第39号_.md   (转换后的Markdown文件)
+    1_中华人民共和国水土保持法__主席令第39号_.pdf  (原始PDF文件)
+
+
+    ● 目录结构完全正确。以第一个文件为例:
+
+  1《中华人民共和国水土保持法》(主席令第39号)/           ← 外层(原始PDF文件名)
+      1《中华人民共和国水土保持法》(主席令第39号).pdf     ← 原始PDF
+      1《中华人民共和国水土保持法》_主席令第39号_.md        ← .md 副本(外层)
+      1《中华人民共和国水土保持法》_主席令第39号_/          ← 内层(清理后文件名)
+          1《中华人民共和国水土保持法》_主席令第39号_.md
+          1《中华人民共和国水土保持法》_主席令第39号__content_list.json
+          1《中华人民共和国水土保持法》_主席令第39号__middle.json
+          images/
+              *.jpg   
+====================================================
+"""
+import os, sys, json, time, re, requests, zipfile, shutil
+from pathlib import Path
+
+# ============================================================
+#                      配置区(改这里!)
+# ============================================================
+TOKEN = "eyJ0eXBlIjoiSldUIiwiYWxnIjoiSFM1MTIifQ.eyJqdGkiOiI0MjcwMDM4NiIsInJvbCI6IlJPTEVfUkVHSVNURVIiLCJpc3MiOiJPcGVuWExhYiIsImlhdCI6MTc3MTg0MDYyOCwiY2xpZW50SWQiOiJsa3pkeDU3bnZ5MjJqa3BxOXgydyIsInBob25lIjoiIiwib3BlbklkIjpudWxsLCJ1dWlkIjoiZWRkZmIyNDctY2Q3Ni00YTFhLTk4OTMtODgxZWQ3ZGMyNzYzIiwiZW1haWwiOiIiLCJleHAiOjE3Nzk2MTY2Mjh9.63bhVtvVOJekpbh6Clprz9xlqA2lKAKTy0jswNjs77gRd-iTvtNLabP6YCm3pq4-miyYh946SHozGz4WAV-fBQ"
+
+源文件夹 = r"I:\wangxun_dev_workspace\lq_data_wrokspace\bpf_pdf\input"
+输出文件夹 = r"I:\wangxun_dev_workspace\lq_data_wrokspace\bpf_pdf\output"
+模型版本 = "vlm"   # pipeline / vlm / MinerU-HTML
+# ============================================================
+
+请求头 = {
+    "Content-Type": "application/json",
+    "Authorization": f"Bearer {TOKEN}"
+}
+
+def 清理文件名(文件名):
+    """将文件名中的特殊字符(如括号)替换为下划线"""
+    return re.sub(r'[()\(\)【】\[\]{}<>]', '_', 文件名)
+
+
+def 清理并重命名(文件夹路径, 原始PDF路径):
+    """调整为两层目录结构:
+    外层文件夹(原始PDF文件名)/
+        内层文件夹(清理后文件名)/
+            {清理后文件名}.md
+            {清理后文件名}_content_list.json
+            {清理后文件名}_middle.json
+            images/
+        {清理后文件名}.md      (外层也放一份.md)
+        {原始PDF文件名}.pdf    (原始PDF复制)
+    """
+    folder = Path(文件夹路径)
+    if not folder.exists():
+        return
+
+    原始PDF = Path(原始PDF路径)
+    前缀 = 清理文件名(folder.name)  # 清理后文件名
+
+    # 创建内层子目录
+    子目录 = folder / 前缀
+    子目录.mkdir(exist_ok=True)
+
+    # 找到需要处理的工作目录(可能是folder本身,或嵌套子目录)
+    工作目录 = folder
+    for sub in sorted(folder.iterdir()):
+        if sub.is_dir() and sub.name != 前缀:
+            if any(f.is_file() and f.suffix in ('.json', '.md') for f in sub.iterdir()):
+                工作目录 = sub
+                break
+
+    print(f"    清理: {工作目录.name} → {前缀}")
+
+    # 先处理文件重命名和移动
+    for f in list(工作目录.iterdir()):
+        if f.is_dir():
+            if f.name == 'images':
+                目标 = 子目录 / 'images'
+                if 目标.exists():
+                    shutil.rmtree(目标)
+                shutil.move(str(f), str(目标))
+            elif f.name == 前缀:
+                continue  # 跳过刚创建的子目录
+            else:
+                shutil.rmtree(f)
+            continue
+
+        stem = f.stem
+        name = f.name
+
+        if name.endswith('_origin.pdf'):
+            f.unlink()
+            continue
+
+        # 确定目标文件名
+        目标名 = None
+        if name.endswith('.md') and stem == 'full':
+            目标名 = f'{前缀}.md'
+        elif '_content_list_v2' in stem and name.endswith('.json'):
+            目标名 = f'{前缀}_content_list.json'
+        elif '_content_list' in stem and name.endswith('.json'):
+            目标名 = f'{前缀}_content_list.json'
+        elif stem == 'layout' and name.endswith('.json'):
+            目标名 = f'{前缀}_middle.json'
+        elif '_model' in stem and name.endswith('.json'):
+            目标名 = f'{前缀}_middle.json'
+
+        if 目标名:
+            目标路径 = 子目录 / 目标名
+            if not 目标路径.exists():
+                shutil.move(str(f), str(目标路径))
+            else:
+                f.unlink()
+        else:
+            # 不认识的JSON文件也放到内层子目录
+            目标路径 = 子目录 / name
+            if not 目标路径.exists():
+                shutil.move(str(f), str(目标路径))
+            else:
+                f.unlink()
+
+    # 复制 .md 到外层
+    md_src = 子目录 / f'{前缀}.md'
+    md_dst = folder / f'{前缀}.md'
+    if md_src.exists() and not md_dst.exists():
+        shutil.copy2(str(md_src), str(md_dst))
+
+    # 复制原始PDF到外层
+    pdf_dst = folder / f'{原始PDF.stem}.pdf'
+    if 原始PDF.exists() and not pdf_dst.exists():
+        shutil.copy2(str(原始PDF), str(pdf_dst))
+
+    # 删除旧的工作目录(如果不是folder本身)
+    if 工作目录 != folder and 工作目录.exists():
+        shutil.rmtree(工作目录)
+
+    # 清理外层多余文件/目录
+    for f in list(folder.iterdir()):
+        if f.is_dir():
+            if f.name == 前缀:
+                continue
+            shutil.rmtree(f)
+        else:
+            name = f.name
+            if name.startswith(前缀) or name == f'{原始PDF.stem}.pdf':
+                continue
+            f.unlink()
+
+
+def 申请上传链接(文件名列表):
+    url = "https://mineru.net/api/v4/file-urls/batch"
+    data = {
+        "files": [{"name": f, "data_id": str(i)} for i, f in enumerate(文件名列表)],
+        "model_version": 模型版本
+    }
+    resp = requests.post(url, headers=请求头, json=data, timeout=30)
+    result = resp.json()
+    if result.get("code") != 0:
+        raise Exception(f"申请上传链接失败: {result.get('msg', resp.text)}")
+    return result["data"]["batch_id"], result["data"]["file_urls"]
+
+def 上传文件(本地路径, 上传链接):
+    with open(本地路径, "rb") as f:
+        resp = requests.put(上传链接, data=f, timeout=600)
+    if resp.status_code != 200:
+        raise Exception(f"上传失败 HTTP {resp.status_code}")
+
+def 查询结果(batch_id):
+    url = f"https://mineru.net/api/v4/extract-results/batch/{batch_id}"
+    while True:
+        resp = requests.get(url, headers=请求头, timeout=30)
+        data = resp.json()
+        if data.get("code") != 0:
+            raise Exception(f"查询失败: {data.get('msg')}")
+        
+        结果列表 = data["data"]["extract_result"]
+        全部完成 = True
+        for r in 结果列表:
+            s = r["state"]
+            if s == "done": continue
+            elif s == "failed": continue
+            else: 全部完成 = False
+        if 全部完成:
+            return 结果列表
+        time.sleep(5)
+
+def 下载并整理(文件结果, 输出目录, 原始PDF路径=None):
+    文件名 = 文件结果["file_name"]
+    文件夹名 = Path(文件名).stem
+
+    if 文件结果["state"] != "done":
+        return False, 文件结果.get("err_msg", "状态异常")
+
+    zip地址 = 文件结果.get("full_zip_url", "")
+    if not zip地址:
+        return False, "无下载链接"
+
+    目标目录 = Path(输出目录) / 文件夹名
+    目标目录.mkdir(parents=True, exist_ok=True)
+
+    # 下载zip
+    临时zip = 目标目录 / "_temp.zip"
+    resp = requests.get(zip地址, timeout=600)
+    with open(临时zip, "wb") as f:
+        f.write(resp.content)
+
+    # 解压
+    with zipfile.ZipFile(临时zip, 'r') as zf:
+        zf.extractall(目标目录)
+    临时zip.unlink()
+
+    md_files = list(目标目录.rglob('*.md'))
+    if not md_files:
+        return False, "解压后无MD文件"
+
+    # 清理文件并调整为两层目录结构
+    if 原始PDF路径:
+        清理并重命名(目标目录, 原始PDF路径)
+
+    return True, ""
+
+def 主程序():
+    源 = Path(源文件夹)
+    输出 = Path(输出文件夹)
+    
+    if not 源.exists():
+        print(f"错误: 源文件夹不存在: {源}")
+        return
+    
+    输出.mkdir(parents=True, exist_ok=True)
+    
+    pdf列表 = sorted([f for f in 源.glob("*.pdf")])
+    print(f"PDF: 共 {len(pdf列表)} 个\n")
+    
+    if not pdf列表:
+        print("没有PDF文件")
+        return
+    
+    # 检查续传
+    待处理 = []
+    已转换 = []
+    for f in pdf列表:
+        # 外层文件夹名为原始PDF文件名
+        fd = 输出 / f.stem
+        if fd.is_dir():
+            # 递归查找所有md文件
+            md_files = list(fd.rglob('*.md'))
+            md_ok = md_files and any(m.stat().st_size > 0 for m in md_files)
+            # 检查三个文件是否齐全(可能在任意子目录中)
+            has_cl = any('_content_list.json' in m.name for m in fd.rglob('*.json'))
+            has_mid = any('_middle.json' in m.name for m in fd.rglob('*.json'))
+            if md_ok and has_cl and has_mid:
+                已转换.append(f.name)
+                continue
+        待处理.append(f)
+    
+    print(f"  已转换(跳过): {len(已转换)}")
+    print(f"  待处理: {len(待处理)}\n")
+    
+    成功 = []
+    失败 = []
+    
+    # 分批上传
+    批次大小 = 50
+    for 批次起始 in range(0, len(待处理), 批次大小):
+        当前批次 = 待处理[批次起始:批次起始+批次大小]
+        文件名列表 = [f.name for f in 当前批次]
+        
+        print(f"{'='*50}")
+        print(f"第 {批次起始//批次大小 + 1} 批 ({len(当前批次)} 个)")
+        print(f"{'='*50}")
+        
+        try:
+            print("[上传] 申请上传链接...")
+            batch_id, 链接列表 = 申请上传链接(文件名列表)
+            print(f"  批次ID: {batch_id}")
+            
+            print("[上传] 上传文件...")
+            for i, (pdf, link) in enumerate(zip(当前批次, 链接列表)):
+                print(f"  {i+1}/{len(当前批次)} {pdf.name[:50]}")
+                上传文件(str(pdf), link)
+            
+            print("[等待] 等待解析结果...")
+            结果列表 = 查询结果(batch_id)
+            
+            print("[下载] 下载整理...")
+            for 结果, pdf in zip(结果列表, 当前批次):
+                ok, err = 下载并整理(结果, str(输出), str(pdf))
+                if ok:
+                    成功.append(结果['file_name'])
+                    print(f"  OK {结果['file_name'][:50]}")
+                else:
+                    失败.append((结果['file_name'], err))
+                    print(f"  FAIL {结果['file_name'][:50]}: {err[:60]}")
+                    
+        except Exception as e:
+            print(f"错误: 批次处理异常: {e}")
+            for f in 当前批次:
+                失败.append((f.name, str(e)))
+    
+    # 生成报告
+    报告路径 = 输出 / "转换报告.txt"
+    with open(报告路径, 'w', encoding='utf-8') as fp:
+        fp.write("=" * 50 + "\n")
+        fp.write("MinerU官方V4 API - 转换报告\n")
+        fp.write("=" * 50 + "\n\n")
+        fp.write(f"源文件路径: {源}\n")
+        fp.write(f"输出路径: {输出}\n")
+        fp.write(f"PDF总数: {len(pdf列表)}\n")
+        fp.write(f"已转换(跳过): {len(已转换)}\n")
+        fp.write(f"本次成功: {len(成功)}\n")
+        fp.write(f"本次失败: {len(失败)}\n\n")
+        
+        if 已转换:
+            fp.write("--- 已转换(续传跳过)---\n")
+            for f in 已转换:
+                fp.write(f"  ⏭️ {f}\n")
+        
+        if 成功:
+            fp.write("\n--- 本次成功 ---\n")
+            for f in 成功:
+                fp.write(f"  ✅ {f}\n")
+        
+        if 失败:
+            fp.write("\n--- 本次失败 ---\n")
+            for f, err in 失败:
+                fp.write(f"  ❌ {f}: {err}\n")
+        
+        fp.write(f"\n合计: 成功={len(已转换)+len(成功)}  失败={len(失败)}\n")
+    
+    print(f"\n{'='*50}")
+    print(f"完成!")
+    print(f"  已转换(跳过): {len(已转换)}")
+    print(f"  本次成功: {len(成功)}")
+    print(f"  本次失败: {len(失败)}")
+    print(f"报告: {报告路径}")
+
+if __name__ == "__main__":
+    主程序()