import os import time import json import logging import requests import pymysql import zipfile import io from datetime import datetime from pathlib import Path from urllib.parse import urlparse from minio import Minio # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("MinerU") # 导入配置 import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) from app.core.config import config_handler # MinIO 配置 MINIO_ENDPOINT = config_handler.get("admin_app", "MINIO_ENDPOINT", "192.168.91.15:19000") MINIO_ACCESS_KEY = config_handler.get("admin_app", "MINIO_ACCESS_KEY", "minioadmin") MINIO_SECRET_KEY = config_handler.get("admin_app", "MINIO_SECRET_KEY", "minioadmin") MINIO_BUCKET = config_handler.get("admin_app", "MINIO_BUCKET_NAME", "aidata") MINIO_USE_SSL = config_handler.get_bool("admin_app", "MINIO_USE_SSL", False) MINIO_BASE_PATH = config_handler.get("admin_app", "MINIO_BASE_PATH", "sampledata") def get_minio_client(): try: return Minio( MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=MINIO_USE_SSL ) except Exception as e: logger.error(f"MinIO client init failed: {e}") return None def upload_to_minio(file_content, object_name, content_type="text/markdown"): client = get_minio_client() if not client: return None try: # 确保桶存在 if not client.bucket_exists(MINIO_BUCKET): client.make_bucket(MINIO_BUCKET) # 上传文件 client.put_object( MINIO_BUCKET, object_name, io.BytesIO(file_content), len(file_content), content_type=content_type ) # 返回访问链接 return f"http://{MINIO_ENDPOINT}/{MINIO_BUCKET}/{object_name}" except Exception as e: logger.error(f"Upload to MinIO failed: {e}") return None # MinERU 配置 MINERU_TOKEN = config_handler.get("admin_app", "MINERU_TOKEN", "") API_APPLY = config_handler.get("admin_app", "MINERU_API_APPLY", "https://mineru.net/api/v4/file-urls/batch") API_BATCH_RESULT = config_handler.get("admin_app", "MINERU_API_BATCH_RESULT", "https://mineru.net/api/v4/extract-results/batch/{}") HEADERS = { "Content-Type": "application/json", "Authorization": f"Bearer {MINERU_TOKEN}", } SUPPORTED_SUFFIX = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"} def get_db_connection(): database_url = config_handler.get("admin_app", "DATABASE_URL", "") if not database_url: logger.error("DATABASE_URL not found in configuration") return None try: parsed = urlparse(database_url) return pymysql.connect( host=parsed.hostname, port=parsed.port or 3306, user=parsed.username, password=parsed.password, database=parsed.path[1:], charset='utf8mb4', autocommit=True ) except Exception as e: logger.error(f"Database connection error: {e}") return None def update_db_status(doc_id, status=None, progress=None, error=None, converted_file_name=None): conn = get_db_connection() if not conn: return try: with conn.cursor() as cursor: updates = [] params = [] if status is not None: updates.append("conversion_status = %s") params.append(status) if progress is not None: updates.append("conversion_progress = %s") params.append(progress) if error is not None: updates.append("conversion_error = %s") params.append(error) if converted_file_name is not None: updates.append("converted_file_name = %s") params.append(converted_file_name) if not updates: return # 同时更新修改时间 updates.append("updated_time = NOW()") sql = f"UPDATE t_document_main SET {', '.join(updates)} WHERE id = %s" params.append(doc_id) cursor.execute(sql, params) except Exception as e: logger.error(f"Update DB failed: {e}") finally: conn.close() def apply_upload_urls(files_meta, model_version="vlm"): payload = { "files": files_meta, "model_version": model_version, } r = requests.post(API_APPLY, headers=HEADERS, json=payload, timeout=60) r.raise_for_status() j = r.json() if j.get("code") != 0: raise RuntimeError(f"apply upload urls failed: {j.get('msg')}") return j["data"]["batch_id"], j["data"]["file_urls"] def upload_files(file_data_list, upload_urls): for data, url in zip(file_data_list, upload_urls): res = requests.put(url, data=data, timeout=300) if res.status_code != 200: raise RuntimeError(f"upload failed to {url}, status={res.status_code}") def poll_batch(batch_id, interval_sec=5, timeout_sec=1800): deadline = time.time() + timeout_sec while True: r = requests.get(API_BATCH_RESULT.format(batch_id), headers=HEADERS, timeout=60) r.raise_for_status() j = r.json() if j.get("code") != 0: raise RuntimeError(f"poll failed: {j.get('msg')}") results = j["data"]["extract_result"] states = [it.get("state") for it in results] if all(s in ("done", "failed") for s in states): return results if time.time() > deadline: raise TimeoutError(f"poll timeout for batch_id={batch_id}") time.sleep(interval_sec) def process_document(doc_id, chinese_name, file_url, out_dir): try: # 1. 更新状态:开始转换 update_db_status(doc_id, status=1, progress=10) # 2. 下载原始文件 logger.info(f"Downloading {file_url}...") resp = requests.get(file_url, timeout=60) resp.raise_for_status() file_content = resp.content # 检查文件类型 content_type = resp.headers.get("Content-Type", "").lower() if "text/html" in content_type: raise RuntimeError("不支持对网页链接进行转换,请直接查看原链接。") file_ext = Path(urlparse(file_url).path).suffix.lower() if not file_ext: file_ext = ".pdf" # Default file_name = f"{chinese_name}{file_ext}" update_db_status(doc_id, progress=30) # 3. 提交到 MinerU files_meta = [{"name": file_name, "data_id": doc_id}] batch_id, upload_urls = apply_upload_urls(files_meta) upload_files([file_content], upload_urls) update_db_status(doc_id, progress=50) # 4. 轮询结果 results = poll_batch(batch_id) result = results[0] if result.get("state") == "done": zip_url = result.get("full_zip_url") if zip_url: # 5. 下载并处理结果 update_db_status(doc_id, progress=80) zip_resp = requests.get(zip_url, timeout=300) zip_resp.raise_for_status() # 解压并处理结果 with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z: # 查找 .md 文件 md_files = [f for f in z.namelist() if f.endswith(".md")] if md_files: md_content = z.read(md_files[0]) # 构造云端存储路径 object_name = f"{MINIO_BASE_PATH}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.md" # 上传到 MinIO cloud_url = upload_to_minio(md_content, object_name) if cloud_url: logger.info(f"Uploaded converted file to {cloud_url}") update_db_status(doc_id, status=2, progress=100, converted_file_name=cloud_url) return True else: raise RuntimeError("Failed to upload converted file to MinIO") else: raise RuntimeError("No .md file found in the converted zip") else: raise RuntimeError("No zip URL in result") else: err_msg = result.get("err_msg", "Unknown error") raise RuntimeError(f"MinerU extraction failed: {err_msg}") except Exception as e: logger.error(f"Process failed: {e}") update_db_status(doc_id, status=3, error=str(e)) return False def main_cli(doc_id, out_dir=r"d:\UGit\MinerU"): # 从数据库获取详细信息 - 直接从 t_document_main 获取 conn = get_db_connection() if not conn: logger.error("Database connection failed") return try: with conn.cursor() as cursor: # 优先从 t_document_main 获取 title 和 file_url cursor.execute("SELECT title, file_url FROM t_document_main WHERE id = %s", (doc_id,)) row = cursor.fetchone() if not row or not row[1]: # 如果主表没有 file_url,尝试从子表获取 if not row: logger.warning(f"Document not found: {doc_id}") return title = row[0] # 尝试从子表获取 (兼容旧数据) cursor.execute("SELECT source_type, source_id FROM t_document_main WHERE id = %s", (doc_id,)) st_row = cursor.fetchone() if st_row: source_type, source_id = st_row TABLE_MAP = { "basis": "t_basis_of_preparation", "work": "t_work_of_preparation", "job": "t_job_of_preparation" } table_name = TABLE_MAP.get(source_type) if table_name: # 尝试不同的 url 字段名 url_fields = ['file_url', 'source_url', 'url'] for field in url_fields: try: cursor.execute(f"SELECT {field} FROM {table_name} WHERE id = %s", (source_id,)) url_row = cursor.fetchone() if url_row and url_row[0]: file_url = url_row[0] process_document(doc_id, title, file_url, out_dir) return except: continue logger.error(f"No file_url found for document: {doc_id}") update_db_status(doc_id, status=3, error="未找到文件链接(file_url)") return title, file_url = row process_document(doc_id, title, file_url, out_dir) finally: conn.close() if __name__ == "__main__": # 示例用法:python miner_u.py import sys if len(sys.argv) > 1: # 这里的参数处理需要微调,因为以前是 python miner_u.py # 现在我们只需要 ,但为了兼容性,我们可以检查参数个数 if len(sys.argv) == 3: # 旧格式: python miner_u.py basis main_cli(sys.argv[2]) else: # 新格式: python miner_u.py main_cli(sys.argv[1]) else: print("Usage: python miner_u.py ")