import os import time import json import requests import pymysql import zipfile import io from pathlib import Path from urllib.parse import urlparse from dotenv import load_dotenv # 加载环境变量 - 配置文件在脚本所在目录的上一级 env_path = os.path.join(os.path.dirname(__file__), "..", ".env") load_dotenv(dotenv_path=env_path) TOKEN = "eyJ0eXBlIjoiSldUIiwiYWxnIjoiSFM1MTIifQ.eyJqdGkiOiI1MzgwMDYyNSIsInJvbCI6IlJPTEVfUkVHSVNURVIiLCJpc3MiOiJPcGVuWExhYiIsImlhdCI6MTc2Nzg1OTg5NywiY2xpZW50SWQiOiJsa3pkeDU3bnZ5MjJqa3BxOXgydyIsInBob25lIjoiMTgwMzA5ODIxNTQiLCJvcGVuSWQiOm51bGwsInV1aWQiOiI0NTYyZTUyNi1iZjE3LTRhMmItODExMi04YmM5ZjNjYzMwZGMiLCJlbWFpbCI6IiIsImV4cCI6MTc2OTA2OTQ5N30.mNH7afPPANNQq_BRsBOlbk-2P7e_ewdfzPQXO4woeoT15mDEbPKc45Auk_BuRuNaAS-Gm2GK3qKGjQ2VDtepvA" API_APPLY = "https://mineru.net/api/v4/file-urls/batch" API_BATCH_RESULT = "https://mineru.net/api/v4/extract-results/batch/{}" HEADERS = { "Content-Type": "application/json", "Authorization": f"Bearer {TOKEN}", } SUPPORTED_SUFFIX = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"} def get_db_connection(): database_url = os.getenv('DATABASE_URL') if not database_url: print("DATABASE_URL not found in environment") return None try: parsed = urlparse(database_url) return pymysql.connect( host=parsed.hostname, port=parsed.port or 3306, user=parsed.username, password=parsed.password, database=parsed.path[1:], charset='utf8mb4', autocommit=True ) except Exception as e: print(f"Database connection error: {e}") return None def update_db_status(table_name, doc_id, status=None, progress=None, error=None): conn = get_db_connection() if not conn: return try: with conn.cursor() as cursor: updates = [] params = [] if status is not None: updates.append("conversion_status = %s") params.append(status) if progress is not None: updates.append("conversion_progress = %s") params.append(progress) if error is not None: updates.append("conversion_error = %s") params.append(error) if not updates: return sql = f"UPDATE {table_name} SET {', '.join(updates)} WHERE id = %s" params.append(doc_id) cursor.execute(sql, params) except Exception as e: print(f"Update DB failed: {e}") finally: conn.close() def apply_upload_urls(files_meta, model_version="vlm"): payload = { "files": files_meta, "model_version": model_version, } r = requests.post(API_APPLY, headers=HEADERS, json=payload, timeout=60) r.raise_for_status() j = r.json() if j.get("code") != 0: raise RuntimeError(f"apply upload urls failed: {j.get('msg')}") return j["data"]["batch_id"], j["data"]["file_urls"] def upload_files(file_data_list, upload_urls): for data, url in zip(file_data_list, upload_urls): res = requests.put(url, data=data, timeout=300) if res.status_code != 200: raise RuntimeError(f"upload failed to {url}, status={res.status_code}") def poll_batch(batch_id, interval_sec=5, timeout_sec=1800): deadline = time.time() + timeout_sec while True: r = requests.get(API_BATCH_RESULT.format(batch_id), headers=HEADERS, timeout=60) r.raise_for_status() j = r.json() if j.get("code") != 0: raise RuntimeError(f"poll failed: {j.get('msg')}") results = j["data"]["extract_result"] states = [it.get("state") for it in results] if all(s in ("done", "failed") for s in states): return results if time.time() > deadline: raise TimeoutError(f"poll timeout for batch_id={batch_id}") time.sleep(interval_sec) def process_document(table_name, doc_id, chinese_name, file_url, out_dir): try: # 1. 更新状态:开始转换 update_db_status(table_name, doc_id, status=1, progress=10) # 2. 下载原始文件 print(f"Downloading {file_url}...") resp = requests.get(file_url, timeout=60) resp.raise_for_status() file_content = resp.content file_ext = Path(urlparse(file_url).path).suffix.lower() if not file_ext: file_ext = ".pdf" # Default file_name = f"{chinese_name}{file_ext}" update_db_status(table_name, doc_id, progress=30) # 3. 提交到 MinerU files_meta = [{"name": file_name, "data_id": doc_id}] batch_id, upload_urls = apply_upload_urls(files_meta) upload_files([file_content], upload_urls) update_db_status(table_name, doc_id, progress=50) # 4. 轮询结果 results = poll_batch(batch_id) result = results[0] if result.get("state") == "done": zip_url = result.get("full_zip_url") if zip_url: # 5. 下载并处理结果 update_db_status(table_name, doc_id, progress=80) zip_resp = requests.get(zip_url, timeout=300) zip_resp.raise_for_status() # 解压并保存 Markdown with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z: # 查找 .md 文件 md_files = [f for f in z.namelist() if f.endswith(".md")] if md_files: md_content = z.read(md_files[0]) save_path = Path(out_dir) / f"{chinese_name}.md" save_path.parent.mkdir(parents=True, exist_ok=True) with open(save_path, "wb") as f: f.write(md_content) print(f"Saved Markdown to {save_path}") update_db_status(table_name, doc_id, status=2, progress=100) return True else: raise RuntimeError("No zip URL in result") else: err_msg = result.get("err_msg", "Unknown error") raise RuntimeError(f"MinerU extraction failed: {err_msg}") except Exception as e: print(f"Process failed: {e}") update_db_status(table_name, doc_id, status=3, error=str(e)) return False def main_cli(table_type, doc_id, out_dir=r"d:\UGit\MinerU"): # 获取表名 TABLE_MAP = { "basis": "t_basis_of_preparation", "work": "t_work_of_preparation", "job": "t_job_of_preparation" } table_name = TABLE_MAP.get(table_type, "t_basis_of_preparation") # 从数据库获取详细信息 conn = get_db_connection() if not conn: print("Database connection failed") return try: with conn.cursor() as cursor: cursor.execute(f"SELECT chinese_name, file_url FROM {table_name} WHERE id = %s", (doc_id,)) row = cursor.fetchone() if not row: print(f"Document not found: {doc_id} in {table_name}") return chinese_name, file_url = row process_document(table_name, doc_id, chinese_name, file_url, out_dir) finally: conn.close() if __name__ == "__main__": # 示例用法:python miner_u.py basis import sys if len(sys.argv) > 2: main_cli(sys.argv[1], sys.argv[2]) else: print("Usage: python miner_u.py ")