mineru_connection.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. """
  2. MinerU 提取工具连接与业务管理
  3. """
  4. import os
  5. import time
  6. import logging
  7. import requests
  8. import zipfile
  9. import io
  10. from datetime import datetime
  11. from pathlib import Path
  12. from urllib.parse import urlparse
  13. from typing import Optional, List, Dict, Any, Tuple
  14. # 导入配置与基础连接
  15. from app.core.config import config_handler
  16. from app.base.async_mysql_connection import get_db_connection
  17. from app.base.minio_connection import get_minio_manager
  18. logger = logging.getLogger("MinerU")
  19. _mineru_manager = None
  20. class MinerUManager:
  21. """MinerU 管理器"""
  22. def __init__(self):
  23. # 优先从配置获取新的 Access Key,如果没有则使用旧的 MINERU_TOKEN
  24. self.access_key = config_handler.get("admin_app", "MINERU_ACCESS_KEY", "")
  25. self.secret_key = config_handler.get("admin_app", "MINERU_SECRET_KEY", "")
  26. self.token = self.access_key if self.access_key else config_handler.get("admin_app", "MINERU_TOKEN", "")
  27. self.api_apply = config_handler.get("admin_app", "MINERU_API_APPLY", "https://mineru.net/api/v4/file-urls/batch")
  28. self.api_batch_result = config_handler.get("admin_app", "MINERU_API_BATCH_RESULT", "https://mineru.net/api/v4/extract-results/batch/{}")
  29. self.headers = {
  30. "Content-Type": "application/json",
  31. "Authorization": f"Bearer {self.token}",
  32. }
  33. self.supported_suffix = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"}
  34. self.minio_manager = get_minio_manager()
  35. logger.info("MinerU 管理器初始化完成")
  36. def update_db_status(self, doc_id: str, status: Optional[int] = None, error: Optional[str] = None,
  37. md_url: Optional[str] = None, json_url: Optional[str] = None):
  38. """更新数据库中的转换状态和 URL"""
  39. conn = get_db_connection()
  40. if not conn:
  41. logger.error("数据库连接失败,无法更新状态")
  42. return
  43. try:
  44. with conn.cursor() as cursor:
  45. updates = []
  46. params = []
  47. if status is not None:
  48. updates.append("conversion_status = %s")
  49. params.append(status)
  50. if error is not None:
  51. updates.append("conversion_error = %s")
  52. params.append(error)
  53. if md_url is not None:
  54. updates.append("md_url = %s")
  55. params.append(md_url)
  56. if json_url is not None:
  57. updates.append("json_url = %s")
  58. params.append(json_url)
  59. if not updates:
  60. return
  61. updates.append("updated_time = NOW()")
  62. sql = f"UPDATE t_samp_document_main SET {', '.join(updates)} WHERE id = %s"
  63. params.append(doc_id)
  64. cursor.execute(sql, params)
  65. conn.commit()
  66. except Exception as e:
  67. logger.error(f"更新数据库状态失败: {e}")
  68. finally:
  69. conn.close()
  70. def apply_upload_urls(self, files_meta: List[Dict[str, Any]], model_version: str = "vlm") -> Tuple[str, List[str]]:
  71. """向 MinerU 申请上传链接"""
  72. payload = {
  73. "files": files_meta,
  74. "model_version": model_version,
  75. }
  76. r = requests.post(self.api_apply, headers=self.headers, json=payload, timeout=60)
  77. r.raise_for_status()
  78. j = r.json()
  79. if j.get("code") != 0:
  80. raise RuntimeError(f"申请上传链接失败: {j.get('msg')}")
  81. return j["data"]["batch_id"], j["data"]["file_urls"]
  82. def upload_files(self, file_data_list: List[bytes], upload_urls: List[str]):
  83. """上传文件到 MinerU 临时存储"""
  84. for data, url in zip(file_data_list, upload_urls):
  85. res = requests.put(url, data=data, timeout=300)
  86. if res.status_code != 200:
  87. raise RuntimeError(f"文件上传失败: {url}, status={res.status_code}")
  88. def poll_batch(self, doc_id: str, batch_id: str, interval_sec: int = 5, timeout_sec: int = 1800) -> List[Dict[str, Any]]:
  89. """轮询转换结果"""
  90. deadline = time.time() + timeout_sec
  91. while True:
  92. r = requests.get(self.api_batch_result.format(batch_id), headers=self.headers, timeout=60)
  93. r.raise_for_status()
  94. j = r.json()
  95. if j.get("code") != 0:
  96. raise RuntimeError(f"轮询失败: {j.get('msg')}")
  97. results = j["data"]["extract_result"]
  98. states = [it.get("state") for it in results]
  99. if all(s in ("done", "failed") for s in states):
  100. return results
  101. if time.time() > deadline:
  102. raise TimeoutError(f"轮询超时: batch_id={batch_id}")
  103. time.sleep(interval_sec)
  104. def process_document(self, doc_id: str, chinese_name: str, file_url: str):
  105. """执行完整的文档转换流程"""
  106. try:
  107. # 1. 更新状态:开始转换
  108. self.update_db_status(doc_id, status=1)
  109. # 2. 下载原始文件
  110. logger.info(f"正在下载文件: {file_url}...")
  111. resp = requests.get(file_url, timeout=60)
  112. resp.raise_for_status()
  113. file_content = resp.content
  114. # 检查文件类型
  115. content_type = resp.headers.get("Content-Type", "").lower()
  116. if "text/html" in content_type:
  117. raise RuntimeError("不支持对网页链接进行转换,请直接查看原链接。")
  118. file_ext = Path(urlparse(file_url).path).suffix.lower()
  119. if not file_ext:
  120. file_ext = ".pdf"
  121. file_name = f"{chinese_name}{file_ext}"
  122. # 检查文件扩展名,对于 .txt 直接处理为 Markdown
  123. if file_ext == ".txt":
  124. logger.info(f"[{doc_id}] 检测为 .txt 文件,跳过 MinerU 转换,直接处理为 Markdown")
  125. # 直接将 txt 内容作为 md 内容上传
  126. md_content = file_content
  127. md_object_name = f"{self.minio_manager.base_path}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.md"
  128. md_cloud_url = self.minio_manager.upload_file(md_content, md_object_name, content_type="text/markdown")
  129. # 更新数据库状态为成功,但不写入 binary content
  130. self.update_db_status(doc_id, status=2, md_url=md_cloud_url)
  131. logger.info(f"[{doc_id}] .txt 文件处理成功. MD: {md_cloud_url}")
  132. return
  133. if file_ext not in self.supported_suffix:
  134. supported_list = ", ".join(self.supported_suffix)
  135. raise RuntimeError(f"不支持的文件类型: {file_ext}。MinerU 仅支持: {supported_list}")
  136. # 4. 提交到 MinerU
  137. files_meta = [{"name": file_name, "data_id": doc_id}]
  138. batch_id, upload_urls = self.apply_upload_urls(files_meta)
  139. self.upload_files([file_content], upload_urls)
  140. # 4. 轮询结果
  141. results = self.poll_batch(doc_id, batch_id)
  142. result = results[0]
  143. if result.get("state") == "done":
  144. zip_url = result.get("full_zip_url")
  145. if zip_url:
  146. # 5. 下载并处理结果
  147. zip_resp = requests.get(zip_url, timeout=300)
  148. zip_resp.raise_for_status()
  149. with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z:
  150. md_files = [f for f in z.namelist() if f.endswith(".md")]
  151. json_files = [f for f in z.namelist() if f.endswith(".json")]
  152. md_cloud_url = None
  153. json_cloud_url = None
  154. if md_files:
  155. md_content = z.read(md_files[0])
  156. md_object_name = f"{self.minio_manager.base_path}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.md"
  157. md_cloud_url = self.minio_manager.upload_file(md_content, md_object_name, content_type="text/markdown")
  158. if json_files:
  159. json_file = next((f for f in json_files if "content_list" in f), json_files[0])
  160. json_content = z.read(json_file)
  161. json_object_name = f"{self.minio_manager.base_path}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.json"
  162. json_cloud_url = self.minio_manager.upload_file(json_content, json_object_name, content_type="application/json")
  163. # 6. 更新数据库
  164. self.update_db_status(doc_id, status=2, md_url=md_cloud_url, json_url=json_cloud_url)
  165. logger.info(f"[{doc_id}] 转换成功. MD: {md_cloud_url}, JSON: {json_cloud_url}")
  166. else:
  167. self.update_db_status(doc_id, status=3, error="未找到 ZIP 下载链接")
  168. else:
  169. self.update_db_status(doc_id, status=3, error=result.get("err_msg", "转换失败"))
  170. except Exception as e:
  171. logger.exception(f"[{doc_id}] 处理文档出错: {e}")
  172. self.update_db_status(doc_id, status=3, error=str(e))
  173. def get_mineru_manager() -> MinerUManager:
  174. """获取 MinerU 管理器单例"""
  175. global _mineru_manager
  176. if _mineru_manager is None:
  177. _mineru_manager = MinerUManager()
  178. return _mineru_manager