import os import time import json import requests import pymysql import zipfile import io from pathlib import Path from urllib.parse import urlparse from dotenv import load_dotenv # 加载环境变量 - 配置文件在脚本所在目录的上一级 env_path = os.path.join(os.path.dirname(__file__), "..", ".env") load_dotenv(dotenv_path=env_path) TOKEN = "eyJ0eXBlIjoiSldUIiwiYWxnIjoiSFM1MTIifQ.eyJqdGkiOiI1MzgwMDYyNSIsInJvbCI6IlJPTEVfUkVHSVNURVIiLCJpc3MiOiJPcGVuWExhYiIsImlhdCI6MTc2Nzg1OTg5NywiY2xpZW50SWQiOiJsa3pkeDU3bnZ5MjJqa3BxOXgydyIsInBob25lIjoiMTgwMzA5ODIxNTQiLCJvcGVuSWQiOm51bGwsInV1aWQiOiI0NTYyZTUyNi1iZjE3LTRhMmItODExMi04YmM5ZjNjYzMwZGMiLCJlbWFpbCI6IiIsImV4cCI6MTc2OTA2OTQ5N30.mNH7afPPANNQq_BRsBOlbk-2P7e_ewdfzPQXO4woeoT15mDEbPKc45Auk_BuRuNaAS-Gm2GK3qKGjQ2VDtepvA" API_APPLY = "https://mineru.net/api/v4/file-urls/batch" API_BATCH_RESULT = "https://mineru.net/api/v4/extract-results/batch/{}" HEADERS = { "Content-Type": "application/json", "Authorization": f"Bearer {TOKEN}", } SUPPORTED_SUFFIX = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"} def get_db_connection(): database_url = os.getenv('DATABASE_URL') if not database_url: print("DATABASE_URL not found in environment") return None try: parsed = urlparse(database_url) return pymysql.connect( host=parsed.hostname, port=parsed.port or 3306, user=parsed.username, password=parsed.password, database=parsed.path[1:], charset='utf8mb4', autocommit=True ) except Exception as e: print(f"Database connection error: {e}") return None def update_db_status(doc_id, status=None, progress=None, error=None, converted_file_name=None): conn = get_db_connection() if not conn: return try: with conn.cursor() as cursor: updates = [] params = [] if status is not None: updates.append("conversion_status = %s") params.append(status) if progress is not None: updates.append("conversion_progress = %s") params.append(progress) if error is not None: updates.append("conversion_error = %s") params.append(error) if converted_file_name is not None: updates.append("converted_file_name = %s") params.append(converted_file_name) if not updates: return # 同时更新修改时间 updates.append("updated_time = NOW()") sql = f"UPDATE t_document_main SET {', '.join(updates)} WHERE id = %s" params.append(doc_id) cursor.execute(sql, params) except Exception as e: print(f"Update DB failed: {e}") finally: conn.close() def apply_upload_urls(files_meta, model_version="vlm"): payload = { "files": files_meta, "model_version": model_version, } r = requests.post(API_APPLY, headers=HEADERS, json=payload, timeout=60) r.raise_for_status() j = r.json() if j.get("code") != 0: raise RuntimeError(f"apply upload urls failed: {j.get('msg')}") return j["data"]["batch_id"], j["data"]["file_urls"] def upload_files(file_data_list, upload_urls): for data, url in zip(file_data_list, upload_urls): res = requests.put(url, data=data, timeout=300) if res.status_code != 200: raise RuntimeError(f"upload failed to {url}, status={res.status_code}") def poll_batch(batch_id, interval_sec=5, timeout_sec=1800): deadline = time.time() + timeout_sec while True: r = requests.get(API_BATCH_RESULT.format(batch_id), headers=HEADERS, timeout=60) r.raise_for_status() j = r.json() if j.get("code") != 0: raise RuntimeError(f"poll failed: {j.get('msg')}") results = j["data"]["extract_result"] states = [it.get("state") for it in results] if all(s in ("done", "failed") for s in states): return results if time.time() > deadline: raise TimeoutError(f"poll timeout for batch_id={batch_id}") time.sleep(interval_sec) def process_document(doc_id, chinese_name, file_url, out_dir): try: # 1. 更新状态:开始转换 update_db_status(doc_id, status=1, progress=10) # 2. 下载原始文件 print(f"Downloading {file_url}...") resp = requests.get(file_url, timeout=60) resp.raise_for_status() file_content = resp.content # 检查文件类型 content_type = resp.headers.get("Content-Type", "").lower() if "text/html" in content_type: raise RuntimeError("不支持对网页链接进行转换,请直接查看原链接。") file_ext = Path(urlparse(file_url).path).suffix.lower() if not file_ext: file_ext = ".pdf" # Default file_name = f"{chinese_name}{file_ext}" update_db_status(doc_id, progress=30) # 3. 提交到 MinerU files_meta = [{"name": file_name, "data_id": doc_id}] batch_id, upload_urls = apply_upload_urls(files_meta) upload_files([file_content], upload_urls) update_db_status(doc_id, progress=50) # 4. 轮询结果 results = poll_batch(batch_id) result = results[0] if result.get("state") == "done": zip_url = result.get("full_zip_url") if zip_url: # 5. 下载并处理结果 update_db_status(doc_id, progress=80) zip_resp = requests.get(zip_url, timeout=300) zip_resp.raise_for_status() # 解压并保存 Markdown converted_file_name = f"{chinese_name}.md" with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z: # 查找 .md 文件 md_files = [f for f in z.namelist() if f.endswith(".md")] if md_files: md_content = z.read(md_files[0]) save_path = Path(out_dir) / converted_file_name save_path.parent.mkdir(parents=True, exist_ok=True) with open(save_path, "wb") as f: f.write(md_content) print(f"Saved Markdown to {save_path}") update_db_status(doc_id, status=2, progress=100, converted_file_name=converted_file_name) return True else: raise RuntimeError("No zip URL in result") else: err_msg = result.get("err_msg", "Unknown error") raise RuntimeError(f"MinerU extraction failed: {err_msg}") except Exception as e: print(f"Process failed: {e}") update_db_status(doc_id, status=3, error=str(e)) return False def main_cli(doc_id, out_dir=r"d:\UGit\MinerU"): # 从数据库获取详细信息 - 直接从 t_document_main 获取 conn = get_db_connection() if not conn: print("Database connection failed") return try: with conn.cursor() as cursor: # 优先从 t_document_main 获取 title 和 file_url cursor.execute("SELECT title, file_url FROM t_document_main WHERE id = %s", (doc_id,)) row = cursor.fetchone() if not row or not row[1]: # 如果主表没有 file_url,尝试从子表获取 if not row: print(f"Document not found: {doc_id}") return title = row[0] # 尝试从子表获取 (兼容旧数据) cursor.execute("SELECT source_type, source_id FROM t_document_main WHERE id = %s", (doc_id,)) st_row = cursor.fetchone() if st_row: source_type, source_id = st_row TABLE_MAP = { "basis": "t_basis_of_preparation", "work": "t_work_of_preparation", "job": "t_job_of_preparation" } table_name = TABLE_MAP.get(source_type) if table_name: # 尝试不同的 url 字段名 url_fields = ['file_url', 'source_url', 'url'] for field in url_fields: try: cursor.execute(f"SELECT {field} FROM {table_name} WHERE id = %s", (source_id,)) url_row = cursor.fetchone() if url_row and url_row[0]: file_url = url_row[0] process_document(doc_id, title, file_url, out_dir) return except: continue print(f"No file_url found for document: {doc_id}") update_db_status(doc_id, status=3, error="未找到文件链接(file_url)") return title, file_url = row process_document(doc_id, title, file_url, out_dir) finally: conn.close() if __name__ == "__main__": # 示例用法:python miner_u.py import sys if len(sys.argv) > 1: # 这里的参数处理需要微调,因为以前是 python miner_u.py # 现在我们只需要 ,但为了兼容性,我们可以检查参数个数 if len(sys.argv) == 3: # 旧格式: python miner_u.py basis main_cli(sys.argv[2]) else: # 新格式: python miner_u.py main_cli(sys.argv[1]) else: print("Usage: python miner_u.py ")