import os import time import json import logging import requests import pymysql import zipfile import io from datetime import datetime from pathlib import Path from urllib.parse import urlparse from app.base.minio_connection import get_minio_manager # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger("MinerU") # 导入配置 import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) from app.core.config import config_handler # MinIO 配置 minio_manager = get_minio_manager() MINIO_BASE_PATH = minio_manager.base_path def upload_to_minio(file_content, object_name, content_type="text/markdown"): try: return minio_manager.upload_file(file_content, object_name, content_type) except Exception as e: logger.error(f"Upload to MinIO failed: {e}") return None # MinERU 配置 MINERU_TOKEN = config_handler.get("admin_app", "MINERU_TOKEN", "") API_APPLY = config_handler.get("admin_app", "MINERU_API_APPLY", "https://mineru.net/api/v4/file-urls/batch") API_BATCH_RESULT = config_handler.get("admin_app", "MINERU_API_BATCH_RESULT", "https://mineru.net/api/v4/extract-results/batch/{}") HEADERS = { "Content-Type": "application/json", "Authorization": f"Bearer {MINERU_TOKEN}", } SUPPORTED_SUFFIX = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"} def get_db_connection(): database_url = config_handler.get("admin_app", "DATABASE_URL", "") if not database_url: logger.error("DATABASE_URL not found in configuration") return None try: parsed = urlparse(database_url) return pymysql.connect( host=parsed.hostname, port=parsed.port or 3306, user=parsed.username, password=parsed.password, database=parsed.path[1:], charset='utf8mb4', autocommit=True ) except Exception as e: logger.error(f"Database connection error: {e}") return None def update_db_status(doc_id, status=None, error=None, md_url=None, json_url=None): conn = get_db_connection() if not conn: return try: with conn.cursor() as cursor: updates = [] params = [] if status is not None: updates.append("conversion_status = %s") params.append(status) if error is not None: updates.append("conversion_error = %s") params.append(error) if md_url is not None: updates.append("md_url = %s") params.append(md_url) if json_url is not None: updates.append("json_url = %s") params.append(json_url) if not updates: return # 同时更新修改时间 updates.append("updated_time = NOW()") sql = f"UPDATE t_samp_document_main SET {', '.join(updates)} WHERE id = %s" params.append(doc_id) cursor.execute(sql, params) # 如果更新了 json_url 或 md_url,同步更新到子表 if json_url is not None or md_url is not None: try: cursor.execute("SELECT source_type, source_id FROM t_samp_document_main WHERE id = %s", (doc_id,)) row = cursor.fetchone() if row and row[0] and row[1]: source_type, source_id = row[0], row[1] TABLE_MAP = { "basis": "t_samp_standard_base_info", "work": "t_samp_construction_plan_base_info", "job": "t_samp_office_regulations" } table_name = TABLE_MAP.get(source_type) if table_name: sub_updates = [] sub_params = [] if json_url is not None: sub_updates.append("json_url = %s") sub_params.append(json_url) if sub_updates: sub_sql = f"UPDATE {table_name} SET {', '.join(sub_updates)} WHERE id = %s" sub_params.append(source_id) cursor.execute(sub_sql, sub_params) except Exception as e: logger.error(f"Sync URLs to sub-table failed: {e}") except Exception as e: logger.error(f"Update DB failed: {e}") finally: conn.close() def apply_upload_urls(files_meta, model_version="vlm"): payload = { "files": files_meta, "model_version": model_version, } r = requests.post(API_APPLY, headers=HEADERS, json=payload, timeout=60) r.raise_for_status() j = r.json() if j.get("code") != 0: raise RuntimeError(f"apply upload urls failed: {j.get('msg')}") return j["data"]["batch_id"], j["data"]["file_urls"] def upload_files(file_data_list, upload_urls): for data, url in zip(file_data_list, upload_urls): res = requests.put(url, data=data, timeout=300) if res.status_code != 200: raise RuntimeError(f"upload failed to {url}, status={res.status_code}") def poll_batch(doc_id, batch_id, interval_sec=5, timeout_sec=1800): deadline = time.time() + timeout_sec while True: r = requests.get(API_BATCH_RESULT.format(batch_id), headers=HEADERS, timeout=60) r.raise_for_status() j = r.json() if j.get("code") != 0: raise RuntimeError(f"poll failed: {j.get('msg')}") results = j["data"]["extract_result"] states = [it.get("state") for it in results] if all(s in ("done", "failed") for s in states): return results if time.time() > deadline: raise TimeoutError(f"poll timeout for batch_id={batch_id}") time.sleep(interval_sec) def process_document(doc_id, chinese_name, file_url, out_dir): try: # 1. 更新状态:开始转换 update_db_status(doc_id, status=1) # 2. 下载原始文件 logger.info(f"Downloading {file_url}...") resp = requests.get(file_url, timeout=60) resp.raise_for_status() file_content = resp.content # 检查文件类型 content_type = resp.headers.get("Content-Type", "").lower() if "text/html" in content_type: raise RuntimeError("不支持对网页链接进行转换,请直接查看原链接。") file_ext = Path(urlparse(file_url).path).suffix.lower() if not file_ext: file_ext = ".pdf" # Default file_name = f"{chinese_name}{file_ext}" # 3. 提交到 MinerU files_meta = [{"name": file_name, "data_id": doc_id}] batch_id, upload_urls = apply_upload_urls(files_meta) upload_files([file_content], upload_urls) # 4. 轮询结果 results = poll_batch(doc_id, batch_id) result = results[0] if result.get("state") == "done": zip_url = result.get("full_zip_url") if zip_url: # 5. 下载并处理结果 zip_resp = requests.get(zip_url, timeout=300) zip_resp.raise_for_status() # 解压并处理结果 with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z: # 查找 .md 文件 md_files = [f for f in z.namelist() if f.endswith(".md")] # 查找 .json 文件 (通常是 content_list.json) json_files = [f for f in z.namelist() if f.endswith(".json")] md_cloud_url = None json_cloud_url = None if md_files: md_content = z.read(md_files[0]) # 构造云端存储路径 md_object_name = f"{MINIO_BASE_PATH}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.md" # 上传到 MinIO md_cloud_url = upload_to_minio(md_content, md_object_name, content_type="text/markdown") if json_files: # 优先取 content_list.json json_file = next((f for f in json_files if "content_list" in f), json_files[0]) json_content = z.read(json_file) # 构造云端存储路径 json_object_name = f"{MINIO_BASE_PATH}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.json" # 上传到 MinIO json_cloud_url = upload_to_minio(json_content, json_object_name, content_type="application/json") # 6. 更新数据库 update_db_status(doc_id, status=2, md_url=md_cloud_url, json_url=json_cloud_url) logger.info(f"[{doc_id}] Processed successfully. MD: {md_cloud_url}, JSON: {json_cloud_url}") else: update_db_status(doc_id, status=3, error="Full ZIP URL not found") else: update_db_status(doc_id, status=3, error=result.get("err_msg", "Conversion failed")) except Exception as e: logger.exception(f"[{doc_id}] Error processing document: {e}") update_db_status(doc_id, status=3, error=str(e)) def main_cli(doc_id, out_dir=r"d:\UGit\MinerU"): # 从数据库获取详细信息 - 直接从 t_samp_document_main 获取 conn = get_db_connection() if not conn: logger.error("Database connection failed") return try: with conn.cursor() as cursor: # 优先从 t_samp_document_main 获取 title 和 file_url cursor.execute("SELECT title, file_url FROM t_samp_document_main WHERE id = %s", (doc_id,)) row = cursor.fetchone() if not row or not row[1]: # 如果主表没有 file_url,尝试从子表获取 if not row: logger.warning(f"Document not found: {doc_id}") return title = row[0] # 尝试从子表获取 (兼容旧数据) cursor.execute("SELECT source_type, source_id FROM t_samp_document_main WHERE id = %s", (doc_id,)) st_row = cursor.fetchone() if st_row: source_type, source_id = st_row TABLE_MAP = { "basis": "t_samp_standard_base_info", "work": "t_samp_construction_plan_base_info", "job": "t_samp_office_regulations" } table_name = TABLE_MAP.get(source_type) if table_name: # 尝试不同的 url 字段名 url_fields = ['file_url', 'source_url', 'url'] for field in url_fields: try: cursor.execute(f"SELECT {field} FROM {table_name} WHERE id = %s", (source_id,)) url_row = cursor.fetchone() if url_row and url_row[0]: file_url = url_row[0] process_document(doc_id, title, file_url, out_dir) return except: continue logger.error(f"No file_url found for document: {doc_id}") update_db_status(doc_id, status=3, error="未找到文件链接(file_url)") return title, file_url = row process_document(doc_id, title, file_url, out_dir) finally: conn.close() if __name__ == "__main__": # 示例用法:python miner_u.py import sys if len(sys.argv) > 1: # 这里的参数处理需要微调,因为以前是 python miner_u.py # 现在我们只需要 ,但为了兼容性,我们可以检查参数个数 if len(sys.argv) == 3: # 旧格式: python miner_u.py basis main_cli(sys.argv[2]) else: # 新格式: python miner_u.py main_cli(sys.argv[1]) else: print("Usage: python miner_u.py ")