miner_u.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. import os
  2. import time
  3. import json
  4. import requests
  5. import pymysql
  6. import zipfile
  7. import io
  8. from pathlib import Path
  9. from urllib.parse import urlparse
  10. # 导入配置
  11. import sys
  12. sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
  13. from app.core.config import config_handler
  14. TOKEN = "eyJ0eXBlIjoiSldUIiwiYWxnIjoiSFM1MTIifQ.eyJqdGkiOiI1MzgwMDYyNSIsInJvbCI6IlJPTEVfUkVHSVNURVIiLCJpc3MiOiJPcGVuWExhYiIsImlhdCI6MTc2Nzg1OTg5NywiY2xpZW50SWQiOiJsa3pkeDU3bnZ5MjJqa3BxOXgydyIsInBob25lIjoiMTgwMzA5ODIxNTQiLCJvcGVuSWQiOm51bGwsInV1aWQiOiI0NTYyZTUyNi1iZjE3LTRhMmItODExMi04YmM5ZjNjYzMwZGMiLCJlbWFpbCI6IiIsImV4cCI6MTc2OTA2OTQ5N30.mNH7afPPANNQq_BRsBOlbk-2P7e_ewdfzPQXO4woeoT15mDEbPKc45Auk_BuRuNaAS-Gm2GK3qKGjQ2VDtepvA"
  15. API_APPLY = "https://mineru.net/api/v4/file-urls/batch"
  16. API_BATCH_RESULT = "https://mineru.net/api/v4/extract-results/batch/{}"
  17. HEADERS = {
  18. "Content-Type": "application/json",
  19. "Authorization": f"Bearer {TOKEN}",
  20. }
  21. SUPPORTED_SUFFIX = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"}
  22. def get_db_connection():
  23. database_url = config_handler.get("admin_app", "DATABASE_URL", "")
  24. if not database_url:
  25. print("DATABASE_URL not found in configuration")
  26. return None
  27. try:
  28. parsed = urlparse(database_url)
  29. return pymysql.connect(
  30. host=parsed.hostname,
  31. port=parsed.port or 3306,
  32. user=parsed.username,
  33. password=parsed.password,
  34. database=parsed.path[1:],
  35. charset='utf8mb4',
  36. autocommit=True
  37. )
  38. except Exception as e:
  39. print(f"Database connection error: {e}")
  40. return None
  41. def update_db_status(doc_id, status=None, progress=None, error=None, converted_file_name=None):
  42. conn = get_db_connection()
  43. if not conn:
  44. return
  45. try:
  46. with conn.cursor() as cursor:
  47. updates = []
  48. params = []
  49. if status is not None:
  50. updates.append("conversion_status = %s")
  51. params.append(status)
  52. if progress is not None:
  53. updates.append("conversion_progress = %s")
  54. params.append(progress)
  55. if error is not None:
  56. updates.append("conversion_error = %s")
  57. params.append(error)
  58. if converted_file_name is not None:
  59. updates.append("converted_file_name = %s")
  60. params.append(converted_file_name)
  61. if not updates:
  62. return
  63. # 同时更新修改时间
  64. updates.append("updated_time = NOW()")
  65. sql = f"UPDATE t_document_main SET {', '.join(updates)} WHERE id = %s"
  66. params.append(doc_id)
  67. cursor.execute(sql, params)
  68. except Exception as e:
  69. print(f"Update DB failed: {e}")
  70. finally:
  71. conn.close()
  72. def apply_upload_urls(files_meta, model_version="vlm"):
  73. payload = {
  74. "files": files_meta,
  75. "model_version": model_version,
  76. }
  77. r = requests.post(API_APPLY, headers=HEADERS, json=payload, timeout=60)
  78. r.raise_for_status()
  79. j = r.json()
  80. if j.get("code") != 0:
  81. raise RuntimeError(f"apply upload urls failed: {j.get('msg')}")
  82. return j["data"]["batch_id"], j["data"]["file_urls"]
  83. def upload_files(file_data_list, upload_urls):
  84. for data, url in zip(file_data_list, upload_urls):
  85. res = requests.put(url, data=data, timeout=300)
  86. if res.status_code != 200:
  87. raise RuntimeError(f"upload failed to {url}, status={res.status_code}")
  88. def poll_batch(batch_id, interval_sec=5, timeout_sec=1800):
  89. deadline = time.time() + timeout_sec
  90. while True:
  91. r = requests.get(API_BATCH_RESULT.format(batch_id), headers=HEADERS, timeout=60)
  92. r.raise_for_status()
  93. j = r.json()
  94. if j.get("code") != 0:
  95. raise RuntimeError(f"poll failed: {j.get('msg')}")
  96. results = j["data"]["extract_result"]
  97. states = [it.get("state") for it in results]
  98. if all(s in ("done", "failed") for s in states):
  99. return results
  100. if time.time() > deadline:
  101. raise TimeoutError(f"poll timeout for batch_id={batch_id}")
  102. time.sleep(interval_sec)
  103. def process_document(doc_id, chinese_name, file_url, out_dir):
  104. try:
  105. # 1. 更新状态:开始转换
  106. update_db_status(doc_id, status=1, progress=10)
  107. # 2. 下载原始文件
  108. print(f"Downloading {file_url}...")
  109. resp = requests.get(file_url, timeout=60)
  110. resp.raise_for_status()
  111. file_content = resp.content
  112. # 检查文件类型
  113. content_type = resp.headers.get("Content-Type", "").lower()
  114. if "text/html" in content_type:
  115. raise RuntimeError("不支持对网页链接进行转换,请直接查看原链接。")
  116. file_ext = Path(urlparse(file_url).path).suffix.lower()
  117. if not file_ext:
  118. file_ext = ".pdf" # Default
  119. file_name = f"{chinese_name}{file_ext}"
  120. update_db_status(doc_id, progress=30)
  121. # 3. 提交到 MinerU
  122. files_meta = [{"name": file_name, "data_id": doc_id}]
  123. batch_id, upload_urls = apply_upload_urls(files_meta)
  124. upload_files([file_content], upload_urls)
  125. update_db_status(doc_id, progress=50)
  126. # 4. 轮询结果
  127. results = poll_batch(batch_id)
  128. result = results[0]
  129. if result.get("state") == "done":
  130. zip_url = result.get("full_zip_url")
  131. if zip_url:
  132. # 5. 下载并处理结果
  133. update_db_status(doc_id, progress=80)
  134. zip_resp = requests.get(zip_url, timeout=300)
  135. zip_resp.raise_for_status()
  136. # 解压并保存 Markdown
  137. converted_file_name = f"{chinese_name}.md"
  138. with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z:
  139. # 查找 .md 文件
  140. md_files = [f for f in z.namelist() if f.endswith(".md")]
  141. if md_files:
  142. md_content = z.read(md_files[0])
  143. save_path = Path(out_dir) / converted_file_name
  144. save_path.parent.mkdir(parents=True, exist_ok=True)
  145. with open(save_path, "wb") as f:
  146. f.write(md_content)
  147. print(f"Saved Markdown to {save_path}")
  148. update_db_status(doc_id, status=2, progress=100, converted_file_name=converted_file_name)
  149. return True
  150. else:
  151. raise RuntimeError("No zip URL in result")
  152. else:
  153. err_msg = result.get("err_msg", "Unknown error")
  154. raise RuntimeError(f"MinerU extraction failed: {err_msg}")
  155. except Exception as e:
  156. print(f"Process failed: {e}")
  157. update_db_status(doc_id, status=3, error=str(e))
  158. return False
  159. def main_cli(doc_id, out_dir=r"d:\UGit\MinerU"):
  160. # 从数据库获取详细信息 - 直接从 t_document_main 获取
  161. conn = get_db_connection()
  162. if not conn:
  163. print("Database connection failed")
  164. return
  165. try:
  166. with conn.cursor() as cursor:
  167. # 优先从 t_document_main 获取 title 和 file_url
  168. cursor.execute("SELECT title, file_url FROM t_document_main WHERE id = %s", (doc_id,))
  169. row = cursor.fetchone()
  170. if not row or not row[1]: # 如果主表没有 file_url,尝试从子表获取
  171. if not row:
  172. print(f"Document not found: {doc_id}")
  173. return
  174. title = row[0]
  175. # 尝试从子表获取 (兼容旧数据)
  176. cursor.execute("SELECT source_type, source_id FROM t_document_main WHERE id = %s", (doc_id,))
  177. st_row = cursor.fetchone()
  178. if st_row:
  179. source_type, source_id = st_row
  180. TABLE_MAP = {
  181. "basis": "t_basis_of_preparation",
  182. "work": "t_work_of_preparation",
  183. "job": "t_job_of_preparation"
  184. }
  185. table_name = TABLE_MAP.get(source_type)
  186. if table_name:
  187. # 尝试不同的 url 字段名
  188. url_fields = ['file_url', 'source_url', 'url']
  189. for field in url_fields:
  190. try:
  191. cursor.execute(f"SELECT {field} FROM {table_name} WHERE id = %s", (source_id,))
  192. url_row = cursor.fetchone()
  193. if url_row and url_row[0]:
  194. file_url = url_row[0]
  195. process_document(doc_id, title, file_url, out_dir)
  196. return
  197. except:
  198. continue
  199. print(f"No file_url found for document: {doc_id}")
  200. update_db_status(doc_id, status=3, error="未找到文件链接(file_url)")
  201. return
  202. title, file_url = row
  203. process_document(doc_id, title, file_url, out_dir)
  204. finally:
  205. conn.close()
  206. if __name__ == "__main__":
  207. # 示例用法:python miner_u.py <doc_id>
  208. import sys
  209. if len(sys.argv) > 1:
  210. # 这里的参数处理需要微调,因为以前是 python miner_u.py <table_type> <doc_id>
  211. # 现在我们只需要 <doc_id>,但为了兼容性,我们可以检查参数个数
  212. if len(sys.argv) == 3:
  213. # 旧格式: python miner_u.py basis <doc_id>
  214. main_cli(sys.argv[2])
  215. else:
  216. # 新格式: python miner_u.py <doc_id>
  217. main_cli(sys.argv[1])
  218. else:
  219. print("Usage: python miner_u.py <doc_id>")