miner_u.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import os
  2. import time
  3. import json
  4. import requests
  5. import pymysql
  6. import zipfile
  7. import io
  8. from pathlib import Path
  9. from urllib.parse import urlparse
  10. from dotenv import load_dotenv
  11. # 加载环境变量 - 配置文件在脚本所在目录的上一级
  12. env_path = os.path.join(os.path.dirname(__file__), "..", ".env")
  13. load_dotenv(dotenv_path=env_path)
  14. TOKEN = "eyJ0eXBlIjoiSldUIiwiYWxnIjoiSFM1MTIifQ.eyJqdGkiOiI1MzgwMDYyNSIsInJvbCI6IlJPTEVfUkVHSVNURVIiLCJpc3MiOiJPcGVuWExhYiIsImlhdCI6MTc2Nzg1OTg5NywiY2xpZW50SWQiOiJsa3pkeDU3bnZ5MjJqa3BxOXgydyIsInBob25lIjoiMTgwMzA5ODIxNTQiLCJvcGVuSWQiOm51bGwsInV1aWQiOiI0NTYyZTUyNi1iZjE3LTRhMmItODExMi04YmM5ZjNjYzMwZGMiLCJlbWFpbCI6IiIsImV4cCI6MTc2OTA2OTQ5N30.mNH7afPPANNQq_BRsBOlbk-2P7e_ewdfzPQXO4woeoT15mDEbPKc45Auk_BuRuNaAS-Gm2GK3qKGjQ2VDtepvA"
  15. API_APPLY = "https://mineru.net/api/v4/file-urls/batch"
  16. API_BATCH_RESULT = "https://mineru.net/api/v4/extract-results/batch/{}"
  17. HEADERS = {
  18. "Content-Type": "application/json",
  19. "Authorization": f"Bearer {TOKEN}",
  20. }
  21. SUPPORTED_SUFFIX = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"}
  22. def get_db_connection():
  23. database_url = os.getenv('DATABASE_URL')
  24. if not database_url:
  25. print("DATABASE_URL not found in environment")
  26. return None
  27. try:
  28. parsed = urlparse(database_url)
  29. return pymysql.connect(
  30. host=parsed.hostname,
  31. port=parsed.port or 3306,
  32. user=parsed.username,
  33. password=parsed.password,
  34. database=parsed.path[1:],
  35. charset='utf8mb4',
  36. autocommit=True
  37. )
  38. except Exception as e:
  39. print(f"Database connection error: {e}")
  40. return None
  41. def update_db_status(table_name, doc_id, status=None, progress=None, error=None):
  42. conn = get_db_connection()
  43. if not conn:
  44. return
  45. try:
  46. with conn.cursor() as cursor:
  47. updates = []
  48. params = []
  49. if status is not None:
  50. updates.append("conversion_status = %s")
  51. params.append(status)
  52. if progress is not None:
  53. updates.append("conversion_progress = %s")
  54. params.append(progress)
  55. if error is not None:
  56. updates.append("conversion_error = %s")
  57. params.append(error)
  58. if not updates:
  59. return
  60. sql = f"UPDATE {table_name} SET {', '.join(updates)} WHERE id = %s"
  61. params.append(doc_id)
  62. cursor.execute(sql, params)
  63. except Exception as e:
  64. print(f"Update DB failed: {e}")
  65. finally:
  66. conn.close()
  67. def apply_upload_urls(files_meta, model_version="vlm"):
  68. payload = {
  69. "files": files_meta,
  70. "model_version": model_version,
  71. }
  72. r = requests.post(API_APPLY, headers=HEADERS, json=payload, timeout=60)
  73. r.raise_for_status()
  74. j = r.json()
  75. if j.get("code") != 0:
  76. raise RuntimeError(f"apply upload urls failed: {j.get('msg')}")
  77. return j["data"]["batch_id"], j["data"]["file_urls"]
  78. def upload_files(file_data_list, upload_urls):
  79. for data, url in zip(file_data_list, upload_urls):
  80. res = requests.put(url, data=data, timeout=300)
  81. if res.status_code != 200:
  82. raise RuntimeError(f"upload failed to {url}, status={res.status_code}")
  83. def poll_batch(batch_id, interval_sec=5, timeout_sec=1800):
  84. deadline = time.time() + timeout_sec
  85. while True:
  86. r = requests.get(API_BATCH_RESULT.format(batch_id), headers=HEADERS, timeout=60)
  87. r.raise_for_status()
  88. j = r.json()
  89. if j.get("code") != 0:
  90. raise RuntimeError(f"poll failed: {j.get('msg')}")
  91. results = j["data"]["extract_result"]
  92. states = [it.get("state") for it in results]
  93. if all(s in ("done", "failed") for s in states):
  94. return results
  95. if time.time() > deadline:
  96. raise TimeoutError(f"poll timeout for batch_id={batch_id}")
  97. time.sleep(interval_sec)
  98. def process_document(table_name, doc_id, chinese_name, file_url, out_dir):
  99. try:
  100. # 1. 更新状态:开始转换
  101. update_db_status(table_name, doc_id, status=1, progress=10)
  102. # 2. 下载原始文件
  103. print(f"Downloading {file_url}...")
  104. resp = requests.get(file_url, timeout=60)
  105. resp.raise_for_status()
  106. file_content = resp.content
  107. file_ext = Path(urlparse(file_url).path).suffix.lower()
  108. if not file_ext:
  109. file_ext = ".pdf" # Default
  110. file_name = f"{chinese_name}{file_ext}"
  111. update_db_status(table_name, doc_id, progress=30)
  112. # 3. 提交到 MinerU
  113. files_meta = [{"name": file_name, "data_id": doc_id}]
  114. batch_id, upload_urls = apply_upload_urls(files_meta)
  115. upload_files([file_content], upload_urls)
  116. update_db_status(table_name, doc_id, progress=50)
  117. # 4. 轮询结果
  118. results = poll_batch(batch_id)
  119. result = results[0]
  120. if result.get("state") == "done":
  121. zip_url = result.get("full_zip_url")
  122. if zip_url:
  123. # 5. 下载并处理结果
  124. update_db_status(table_name, doc_id, progress=80)
  125. zip_resp = requests.get(zip_url, timeout=300)
  126. zip_resp.raise_for_status()
  127. # 解压并保存 Markdown
  128. with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z:
  129. # 查找 .md 文件
  130. md_files = [f for f in z.namelist() if f.endswith(".md")]
  131. if md_files:
  132. md_content = z.read(md_files[0])
  133. save_path = Path(out_dir) / f"{chinese_name}.md"
  134. save_path.parent.mkdir(parents=True, exist_ok=True)
  135. with open(save_path, "wb") as f:
  136. f.write(md_content)
  137. print(f"Saved Markdown to {save_path}")
  138. update_db_status(table_name, doc_id, status=2, progress=100)
  139. return True
  140. else:
  141. raise RuntimeError("No zip URL in result")
  142. else:
  143. err_msg = result.get("err_msg", "Unknown error")
  144. raise RuntimeError(f"MinerU extraction failed: {err_msg}")
  145. except Exception as e:
  146. print(f"Process failed: {e}")
  147. update_db_status(table_name, doc_id, status=3, error=str(e))
  148. return False
  149. def main_cli(table_type, doc_id, out_dir=r"d:\UGit\MinerU"):
  150. # 获取表名
  151. TABLE_MAP = {
  152. "basis": "t_basis_of_preparation",
  153. "work": "t_work_of_preparation",
  154. "job": "t_job_of_preparation"
  155. }
  156. table_name = TABLE_MAP.get(table_type, "t_basis_of_preparation")
  157. # 从数据库获取详细信息
  158. conn = get_db_connection()
  159. if not conn:
  160. print("Database connection failed")
  161. return
  162. try:
  163. with conn.cursor() as cursor:
  164. cursor.execute(f"SELECT chinese_name, file_url FROM {table_name} WHERE id = %s", (doc_id,))
  165. row = cursor.fetchone()
  166. if not row:
  167. print(f"Document not found: {doc_id} in {table_name}")
  168. return
  169. chinese_name, file_url = row
  170. process_document(table_name, doc_id, chinese_name, file_url, out_dir)
  171. finally:
  172. conn.close()
  173. if __name__ == "__main__":
  174. # 示例用法:python miner_u.py basis <doc_id>
  175. import sys
  176. if len(sys.argv) > 2:
  177. main_cli(sys.argv[1], sys.argv[2])
  178. else:
  179. print("Usage: python miner_u.py <table_type> <doc_id>")