| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- """
- MinerU 提取工具连接与业务管理
- """
- import os
- import time
- import logging
- import requests
- import zipfile
- import io
- from datetime import datetime
- from pathlib import Path
- from urllib.parse import urlparse
- from typing import Optional, List, Dict, Any, Tuple
- # 导入配置与基础连接
- from app.core.config import config_handler
- from app.base.async_mysql_connection import get_db_connection
- from app.base.minio_connection import get_minio_manager
- logger = logging.getLogger("MinerU")
- _mineru_manager = None
- class MinerUManager:
- """MinerU 管理器"""
-
- def __init__(self):
- # 优先从配置获取新的 Access Key,如果没有则使用旧的 MINERU_TOKEN
- self.access_key = config_handler.get("admin_app", "MINERU_ACCESS_KEY", "")
- self.secret_key = config_handler.get("admin_app", "MINERU_SECRET_KEY", "")
- self.token = self.access_key if self.access_key else config_handler.get("admin_app", "MINERU_TOKEN", "")
-
- self.api_apply = config_handler.get("admin_app", "MINERU_API_APPLY", "https://mineru.net/api/v4/file-urls/batch")
- self.api_batch_result = config_handler.get("admin_app", "MINERU_API_BATCH_RESULT", "https://mineru.net/api/v4/extract-results/batch/{}")
-
- self.headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {self.token}",
- }
-
- self.supported_suffix = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"}
- self.minio_manager = get_minio_manager()
- logger.info("MinerU 管理器初始化完成")
- def update_db_status(self, doc_id: str, status: Optional[int] = None, error: Optional[str] = None,
- md_url: Optional[str] = None, json_url: Optional[str] = None):
- """更新数据库中的转换状态和 URL"""
- conn = get_db_connection()
- if not conn:
- logger.error("数据库连接失败,无法更新状态")
- return
- try:
- with conn.cursor() as cursor:
- updates = []
- params = []
- if status is not None:
- updates.append("conversion_status = %s")
- params.append(status)
- if error is not None:
- updates.append("conversion_error = %s")
- params.append(error)
- if md_url is not None:
- updates.append("md_url = %s")
- params.append(md_url)
- if json_url is not None:
- updates.append("json_url = %s")
- params.append(json_url)
-
- if not updates:
- return
-
- updates.append("updated_time = NOW()")
- sql = f"UPDATE t_samp_document_main SET {', '.join(updates)} WHERE id = %s"
- params.append(doc_id)
- cursor.execute(sql, params)
-
- conn.commit()
- except Exception as e:
- logger.error(f"更新数据库状态失败: {e}")
- finally:
- conn.close()
- def apply_upload_urls(self, files_meta: List[Dict[str, Any]], model_version: str = "vlm") -> Tuple[str, List[str]]:
- """向 MinerU 申请上传链接"""
- payload = {
- "files": files_meta,
- "model_version": model_version,
- }
- r = requests.post(self.api_apply, headers=self.headers, json=payload, timeout=60)
- r.raise_for_status()
- j = r.json()
- if j.get("code") != 0:
- raise RuntimeError(f"申请上传链接失败: {j.get('msg')}")
- return j["data"]["batch_id"], j["data"]["file_urls"]
- def upload_files(self, file_data_list: List[bytes], upload_urls: List[str]):
- """上传文件到 MinerU 临时存储"""
- for data, url in zip(file_data_list, upload_urls):
- res = requests.put(url, data=data, timeout=300)
- if res.status_code != 200:
- raise RuntimeError(f"文件上传失败: {url}, status={res.status_code}")
- def poll_batch(self, doc_id: str, batch_id: str, interval_sec: int = 5, timeout_sec: int = 1800) -> List[Dict[str, Any]]:
- """轮询转换结果"""
- deadline = time.time() + timeout_sec
- while True:
- r = requests.get(self.api_batch_result.format(batch_id), headers=self.headers, timeout=60)
- r.raise_for_status()
- j = r.json()
- if j.get("code") != 0:
- raise RuntimeError(f"轮询失败: {j.get('msg')}")
- results = j["data"]["extract_result"]
- states = [it.get("state") for it in results]
- if all(s in ("done", "failed") for s in states):
- return results
- if time.time() > deadline:
- raise TimeoutError(f"轮询超时: batch_id={batch_id}")
- time.sleep(interval_sec)
- def process_document(self, doc_id: str, chinese_name: str, file_url: str):
- """执行完整的文档转换流程"""
- try:
- # 1. 更新状态:开始转换
- self.update_db_status(doc_id, status=1)
-
- # 2. 下载原始文件
- logger.info(f"正在下载文件: {file_url}...")
- resp = requests.get(file_url, timeout=60)
- resp.raise_for_status()
- file_content = resp.content
-
- # 检查文件类型
- content_type = resp.headers.get("Content-Type", "").lower()
- if "text/html" in content_type:
- raise RuntimeError("不支持对网页链接进行转换,请直接查看原链接。")
-
- file_ext = Path(urlparse(file_url).path).suffix.lower()
- if not file_ext:
- file_ext = ".pdf"
-
- file_name = f"{chinese_name}{file_ext}"
- # 检查文件扩展名,对于 .txt 直接处理为 Markdown
- if file_ext == ".txt":
- logger.info(f"[{doc_id}] 检测为 .txt 文件,跳过 MinerU 转换,直接处理为 Markdown")
- # 直接将 txt 内容作为 md 内容上传
- md_content = file_content
- md_object_name = f"{self.minio_manager.base_path}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.md"
- md_cloud_url = self.minio_manager.upload_file(md_content, md_object_name, content_type="text/markdown")
-
- # 更新数据库状态为成功,但不写入 binary content
- self.update_db_status(doc_id, status=2, md_url=md_cloud_url)
- logger.info(f"[{doc_id}] .txt 文件处理成功. MD: {md_cloud_url}")
- return
- if file_ext not in self.supported_suffix:
- supported_list = ", ".join(self.supported_suffix)
- raise RuntimeError(f"不支持的文件类型: {file_ext}。MinerU 仅支持: {supported_list}")
- # 4. 提交到 MinerU
- files_meta = [{"name": file_name, "data_id": doc_id}]
- batch_id, upload_urls = self.apply_upload_urls(files_meta)
-
- self.upload_files([file_content], upload_urls)
-
- # 4. 轮询结果
- results = self.poll_batch(doc_id, batch_id)
- result = results[0]
-
- if result.get("state") == "done":
- zip_url = result.get("full_zip_url")
- if zip_url:
- # 5. 下载并处理结果
- zip_resp = requests.get(zip_url, timeout=300)
- zip_resp.raise_for_status()
-
- with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z:
- md_files = [f for f in z.namelist() if f.endswith(".md")]
- json_files = [f for f in z.namelist() if f.endswith(".json")]
-
- md_cloud_url = None
- json_cloud_url = None
-
- if md_files:
- md_content = z.read(md_files[0])
- md_object_name = f"{self.minio_manager.base_path}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.md"
- md_cloud_url = self.minio_manager.upload_file(md_content, md_object_name, content_type="text/markdown")
-
- if json_files:
- json_file = next((f for f in json_files if "content_list" in f), json_files[0])
- json_content = z.read(json_file)
- json_object_name = f"{self.minio_manager.base_path}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.json"
- json_cloud_url = self.minio_manager.upload_file(json_content, json_object_name, content_type="application/json")
-
- # 6. 更新数据库
- self.update_db_status(doc_id, status=2, md_url=md_cloud_url, json_url=json_cloud_url)
- logger.info(f"[{doc_id}] 转换成功. MD: {md_cloud_url}, JSON: {json_cloud_url}")
- else:
- self.update_db_status(doc_id, status=3, error="未找到 ZIP 下载链接")
- else:
- self.update_db_status(doc_id, status=3, error=result.get("err_msg", "转换失败"))
-
- except Exception as e:
- logger.exception(f"[{doc_id}] 处理文档出错: {e}")
- self.update_db_status(doc_id, status=3, error=str(e))
- def get_mineru_manager() -> MinerUManager:
- """获取 MinerU 管理器单例"""
- global _mineru_manager
- if _mineru_manager is None:
- _mineru_manager = MinerUManager()
- return _mineru_manager
|