miner_u.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. import os
  2. import time
  3. import json
  4. import logging
  5. import requests
  6. import pymysql
  7. import zipfile
  8. import io
  9. from datetime import datetime
  10. from pathlib import Path
  11. from urllib.parse import urlparse
  12. from app.base.minio_connection import get_minio_manager
  13. # 配置日志
  14. logging.basicConfig(
  15. level=logging.INFO,
  16. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  17. )
  18. logger = logging.getLogger("MinerU")
  19. # 导入配置
  20. import sys
  21. sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
  22. from app.core.config import config_handler
  23. # MinIO 配置
  24. minio_manager = get_minio_manager()
  25. MINIO_BASE_PATH = minio_manager.base_path
  26. def upload_to_minio(file_content, object_name, content_type="text/markdown"):
  27. try:
  28. return minio_manager.upload_file(file_content, object_name, content_type)
  29. except Exception as e:
  30. logger.error(f"Upload to MinIO failed: {e}")
  31. return None
  32. # MinERU 配置
  33. MINERU_TOKEN = config_handler.get("admin_app", "MINERU_TOKEN", "")
  34. API_APPLY = config_handler.get("admin_app", "MINERU_API_APPLY", "https://mineru.net/api/v4/file-urls/batch")
  35. API_BATCH_RESULT = config_handler.get("admin_app", "MINERU_API_BATCH_RESULT", "https://mineru.net/api/v4/extract-results/batch/{}")
  36. HEADERS = {
  37. "Content-Type": "application/json",
  38. "Authorization": f"Bearer {MINERU_TOKEN}",
  39. }
  40. SUPPORTED_SUFFIX = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"}
  41. def get_db_connection():
  42. database_url = config_handler.get("admin_app", "DATABASE_URL", "")
  43. if not database_url:
  44. logger.error("DATABASE_URL not found in configuration")
  45. return None
  46. try:
  47. parsed = urlparse(database_url)
  48. return pymysql.connect(
  49. host=parsed.hostname,
  50. port=parsed.port or 3306,
  51. user=parsed.username,
  52. password=parsed.password,
  53. database=parsed.path[1:],
  54. charset='utf8mb4',
  55. autocommit=True
  56. )
  57. except Exception as e:
  58. logger.error(f"Database connection error: {e}")
  59. return None
  60. def update_db_status(doc_id, status=None, error=None, md_url=None, json_url=None):
  61. conn = get_db_connection()
  62. if not conn:
  63. return
  64. try:
  65. with conn.cursor() as cursor:
  66. updates = []
  67. params = []
  68. if status is not None:
  69. updates.append("conversion_status = %s")
  70. params.append(status)
  71. if error is not None:
  72. updates.append("conversion_error = %s")
  73. params.append(error)
  74. if md_url is not None:
  75. updates.append("md_url = %s")
  76. params.append(md_url)
  77. if json_url is not None:
  78. updates.append("json_url = %s")
  79. params.append(json_url)
  80. if not updates:
  81. return
  82. # 同时更新修改时间
  83. updates.append("updated_time = NOW()")
  84. sql = f"UPDATE t_samp_document_main SET {', '.join(updates)} WHERE id = %s"
  85. params.append(doc_id)
  86. cursor.execute(sql, params)
  87. # 如果更新了 json_url 或 md_url,同步更新到子表
  88. if json_url is not None or md_url is not None:
  89. try:
  90. cursor.execute("SELECT source_type, source_id FROM t_samp_document_main WHERE id = %s", (doc_id,))
  91. row = cursor.fetchone()
  92. if row and row[0] and row[1]:
  93. source_type, source_id = row[0], row[1]
  94. TABLE_MAP = {
  95. "basis": "t_samp_standard_base_info",
  96. "work": "t_samp_construction_plan_base_info",
  97. "job": "t_samp_office_regulations"
  98. }
  99. table_name = TABLE_MAP.get(source_type)
  100. if table_name:
  101. sub_updates = []
  102. sub_params = []
  103. if json_url is not None:
  104. sub_updates.append("json_url = %s")
  105. sub_params.append(json_url)
  106. if sub_updates:
  107. sub_sql = f"UPDATE {table_name} SET {', '.join(sub_updates)} WHERE id = %s"
  108. sub_params.append(source_id)
  109. cursor.execute(sub_sql, sub_params)
  110. except Exception as e:
  111. logger.error(f"Sync URLs to sub-table failed: {e}")
  112. except Exception as e:
  113. logger.error(f"Update DB failed: {e}")
  114. finally:
  115. conn.close()
  116. def apply_upload_urls(files_meta, model_version="vlm"):
  117. payload = {
  118. "files": files_meta,
  119. "model_version": model_version,
  120. }
  121. r = requests.post(API_APPLY, headers=HEADERS, json=payload, timeout=60)
  122. r.raise_for_status()
  123. j = r.json()
  124. if j.get("code") != 0:
  125. raise RuntimeError(f"apply upload urls failed: {j.get('msg')}")
  126. return j["data"]["batch_id"], j["data"]["file_urls"]
  127. def upload_files(file_data_list, upload_urls):
  128. for data, url in zip(file_data_list, upload_urls):
  129. res = requests.put(url, data=data, timeout=300)
  130. if res.status_code != 200:
  131. raise RuntimeError(f"upload failed to {url}, status={res.status_code}")
  132. def poll_batch(doc_id, batch_id, interval_sec=5, timeout_sec=1800):
  133. deadline = time.time() + timeout_sec
  134. while True:
  135. r = requests.get(API_BATCH_RESULT.format(batch_id), headers=HEADERS, timeout=60)
  136. r.raise_for_status()
  137. j = r.json()
  138. if j.get("code") != 0:
  139. raise RuntimeError(f"poll failed: {j.get('msg')}")
  140. results = j["data"]["extract_result"]
  141. states = [it.get("state") for it in results]
  142. if all(s in ("done", "failed") for s in states):
  143. return results
  144. if time.time() > deadline:
  145. raise TimeoutError(f"poll timeout for batch_id={batch_id}")
  146. time.sleep(interval_sec)
  147. def process_document(doc_id, chinese_name, file_url, out_dir):
  148. try:
  149. # 1. 更新状态:开始转换
  150. update_db_status(doc_id, status=1)
  151. # 2. 下载原始文件
  152. logger.info(f"Downloading {file_url}...")
  153. resp = requests.get(file_url, timeout=60)
  154. resp.raise_for_status()
  155. file_content = resp.content
  156. # 检查文件类型
  157. content_type = resp.headers.get("Content-Type", "").lower()
  158. if "text/html" in content_type:
  159. raise RuntimeError("不支持对网页链接进行转换,请直接查看原链接。")
  160. file_ext = Path(urlparse(file_url).path).suffix.lower()
  161. if not file_ext:
  162. file_ext = ".pdf" # Default
  163. file_name = f"{chinese_name}{file_ext}"
  164. # 3. 提交到 MinerU
  165. files_meta = [{"name": file_name, "data_id": doc_id}]
  166. batch_id, upload_urls = apply_upload_urls(files_meta)
  167. upload_files([file_content], upload_urls)
  168. # 4. 轮询结果
  169. results = poll_batch(doc_id, batch_id)
  170. result = results[0]
  171. if result.get("state") == "done":
  172. zip_url = result.get("full_zip_url")
  173. if zip_url:
  174. # 5. 下载并处理结果
  175. zip_resp = requests.get(zip_url, timeout=300)
  176. zip_resp.raise_for_status()
  177. # 解压并处理结果
  178. with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z:
  179. # 查找 .md 文件
  180. md_files = [f for f in z.namelist() if f.endswith(".md")]
  181. # 查找 .json 文件 (通常是 content_list.json)
  182. json_files = [f for f in z.namelist() if f.endswith(".json")]
  183. md_cloud_url = None
  184. json_cloud_url = None
  185. if md_files:
  186. md_content = z.read(md_files[0])
  187. # 构造云端存储路径
  188. md_object_name = f"{MINIO_BASE_PATH}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.md"
  189. # 上传到 MinIO
  190. md_cloud_url = upload_to_minio(md_content, md_object_name, content_type="text/markdown")
  191. if json_files:
  192. # 优先取 content_list.json
  193. json_file = next((f for f in json_files if "content_list" in f), json_files[0])
  194. json_content = z.read(json_file)
  195. # 构造云端存储路径
  196. json_object_name = f"{MINIO_BASE_PATH}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.json"
  197. # 上传到 MinIO
  198. json_cloud_url = upload_to_minio(json_content, json_object_name, content_type="application/json")
  199. # 6. 更新数据库
  200. update_db_status(doc_id, status=2,
  201. md_url=md_cloud_url,
  202. json_url=json_cloud_url)
  203. logger.info(f"[{doc_id}] Processed successfully. MD: {md_cloud_url}, JSON: {json_cloud_url}")
  204. else:
  205. update_db_status(doc_id, status=3, error="Full ZIP URL not found")
  206. else:
  207. update_db_status(doc_id, status=3, error=result.get("err_msg", "Conversion failed"))
  208. except Exception as e:
  209. logger.exception(f"[{doc_id}] Error processing document: {e}")
  210. update_db_status(doc_id, status=3, error=str(e))
  211. def main_cli(doc_id, out_dir=r"d:\UGit\MinerU"):
  212. # 从数据库获取详细信息 - 直接从 t_samp_document_main 获取
  213. conn = get_db_connection()
  214. if not conn:
  215. logger.error("Database connection failed")
  216. return
  217. try:
  218. with conn.cursor() as cursor:
  219. # 优先从 t_samp_document_main 获取 title 和 file_url
  220. cursor.execute("SELECT title, file_url FROM t_samp_document_main WHERE id = %s", (doc_id,))
  221. row = cursor.fetchone()
  222. if not row or not row[1]: # 如果主表没有 file_url,尝试从子表获取
  223. if not row:
  224. logger.warning(f"Document not found: {doc_id}")
  225. return
  226. title = row[0]
  227. # 尝试从子表获取 (兼容旧数据)
  228. cursor.execute("SELECT source_type, source_id FROM t_samp_document_main WHERE id = %s", (doc_id,))
  229. st_row = cursor.fetchone()
  230. if st_row:
  231. source_type, source_id = st_row
  232. TABLE_MAP = {
  233. "basis": "t_samp_standard_base_info",
  234. "work": "t_samp_construction_plan_base_info",
  235. "job": "t_samp_office_regulations"
  236. }
  237. table_name = TABLE_MAP.get(source_type)
  238. if table_name:
  239. # 尝试不同的 url 字段名
  240. url_fields = ['file_url', 'source_url', 'url']
  241. for field in url_fields:
  242. try:
  243. cursor.execute(f"SELECT {field} FROM {table_name} WHERE id = %s", (source_id,))
  244. url_row = cursor.fetchone()
  245. if url_row and url_row[0]:
  246. file_url = url_row[0]
  247. process_document(doc_id, title, file_url, out_dir)
  248. return
  249. except:
  250. continue
  251. logger.error(f"No file_url found for document: {doc_id}")
  252. update_db_status(doc_id, status=3, error="未找到文件链接(file_url)")
  253. return
  254. title, file_url = row
  255. process_document(doc_id, title, file_url, out_dir)
  256. finally:
  257. conn.close()
  258. if __name__ == "__main__":
  259. # 示例用法:python miner_u.py <doc_id>
  260. import sys
  261. if len(sys.argv) > 1:
  262. # 这里的参数处理需要微调,因为以前是 python miner_u.py <table_type> <doc_id>
  263. # 现在我们只需要 <doc_id>,但为了兼容性,我们可以检查参数个数
  264. if len(sys.argv) == 3:
  265. # 旧格式: python miner_u.py basis <doc_id>
  266. main_cli(sys.argv[2])
  267. else:
  268. # 新格式: python miner_u.py <doc_id>
  269. main_cli(sys.argv[1])
  270. else:
  271. print("Usage: python miner_u.py <doc_id>")