| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- import os
- import time
- import json
- import logging
- import requests
- import pymysql
- import zipfile
- import io
- from datetime import datetime
- from pathlib import Path
- from urllib.parse import urlparse
- from app.base.minio_connection import get_minio_manager
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
- )
- logger = logging.getLogger("MinerU")
- # 导入配置
- import sys
- sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
- from app.core.config import config_handler
- # MinIO 配置
- minio_manager = get_minio_manager()
- MINIO_BASE_PATH = minio_manager.base_path
- def upload_to_minio(file_content, object_name, content_type="text/markdown"):
- try:
- return minio_manager.upload_file(file_content, object_name, content_type)
- except Exception as e:
- logger.error(f"Upload to MinIO failed: {e}")
- return None
- # MinERU 配置
- MINERU_TOKEN = config_handler.get("admin_app", "MINERU_TOKEN", "")
- API_APPLY = config_handler.get("admin_app", "MINERU_API_APPLY", "https://mineru.net/api/v4/file-urls/batch")
- API_BATCH_RESULT = config_handler.get("admin_app", "MINERU_API_BATCH_RESULT", "https://mineru.net/api/v4/extract-results/batch/{}")
- HEADERS = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {MINERU_TOKEN}",
- }
- SUPPORTED_SUFFIX = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"}
- def get_db_connection():
- database_url = config_handler.get("admin_app", "DATABASE_URL", "")
- if not database_url:
- logger.error("DATABASE_URL not found in configuration")
- return None
- try:
- parsed = urlparse(database_url)
- return pymysql.connect(
- host=parsed.hostname,
- port=parsed.port or 3306,
- user=parsed.username,
- password=parsed.password,
- database=parsed.path[1:],
- charset='utf8mb4',
- autocommit=True
- )
- except Exception as e:
- logger.error(f"Database connection error: {e}")
- return None
- def update_db_status(doc_id, status=None, error=None, md_url=None, json_url=None):
- conn = get_db_connection()
- if not conn:
- return
- try:
- with conn.cursor() as cursor:
- updates = []
- params = []
- if status is not None:
- updates.append("conversion_status = %s")
- params.append(status)
- if error is not None:
- updates.append("conversion_error = %s")
- params.append(error)
- if md_url is not None:
- updates.append("md_url = %s")
- params.append(md_url)
- if json_url is not None:
- updates.append("json_url = %s")
- params.append(json_url)
-
- if not updates:
- return
-
- # 同时更新修改时间
- updates.append("updated_time = NOW()")
-
- sql = f"UPDATE t_samp_document_main SET {', '.join(updates)} WHERE id = %s"
- params.append(doc_id)
- cursor.execute(sql, params)
-
- # 如果更新了 json_url 或 md_url,同步更新到子表
- if json_url is not None or md_url is not None:
- try:
- cursor.execute("SELECT source_type, source_id FROM t_samp_document_main WHERE id = %s", (doc_id,))
- row = cursor.fetchone()
- if row and row[0] and row[1]:
- source_type, source_id = row[0], row[1]
- TABLE_MAP = {
- "basis": "t_samp_standard_base_info",
- "work": "t_samp_construction_plan_base_info",
- "job": "t_samp_office_regulations"
- }
- table_name = TABLE_MAP.get(source_type)
- if table_name:
- sub_updates = []
- sub_params = []
- if json_url is not None:
- sub_updates.append("json_url = %s")
- sub_params.append(json_url)
-
- if sub_updates:
- sub_sql = f"UPDATE {table_name} SET {', '.join(sub_updates)} WHERE id = %s"
- sub_params.append(source_id)
- cursor.execute(sub_sql, sub_params)
- except Exception as e:
- logger.error(f"Sync URLs to sub-table failed: {e}")
- except Exception as e:
- logger.error(f"Update DB failed: {e}")
- finally:
- conn.close()
- def apply_upload_urls(files_meta, model_version="vlm"):
- payload = {
- "files": files_meta,
- "model_version": model_version,
- }
- r = requests.post(API_APPLY, headers=HEADERS, json=payload, timeout=60)
- r.raise_for_status()
- j = r.json()
- if j.get("code") != 0:
- raise RuntimeError(f"apply upload urls failed: {j.get('msg')}")
- return j["data"]["batch_id"], j["data"]["file_urls"]
- def upload_files(file_data_list, upload_urls):
- for data, url in zip(file_data_list, upload_urls):
- res = requests.put(url, data=data, timeout=300)
- if res.status_code != 200:
- raise RuntimeError(f"upload failed to {url}, status={res.status_code}")
- def poll_batch(doc_id, batch_id, interval_sec=5, timeout_sec=1800):
- deadline = time.time() + timeout_sec
- while True:
- r = requests.get(API_BATCH_RESULT.format(batch_id), headers=HEADERS, timeout=60)
- r.raise_for_status()
- j = r.json()
- if j.get("code") != 0:
- raise RuntimeError(f"poll failed: {j.get('msg')}")
- results = j["data"]["extract_result"]
- states = [it.get("state") for it in results]
- if all(s in ("done", "failed") for s in states):
- return results
- if time.time() > deadline:
- raise TimeoutError(f"poll timeout for batch_id={batch_id}")
- time.sleep(interval_sec)
- def process_document(doc_id, chinese_name, file_url, out_dir):
- try:
- # 1. 更新状态:开始转换
- update_db_status(doc_id, status=1)
-
- # 2. 下载原始文件
- logger.info(f"Downloading {file_url}...")
- resp = requests.get(file_url, timeout=60)
- resp.raise_for_status()
- file_content = resp.content
-
- # 检查文件类型
- content_type = resp.headers.get("Content-Type", "").lower()
- if "text/html" in content_type:
- raise RuntimeError("不支持对网页链接进行转换,请直接查看原链接。")
-
- file_ext = Path(urlparse(file_url).path).suffix.lower()
- if not file_ext:
- file_ext = ".pdf" # Default
-
- file_name = f"{chinese_name}{file_ext}"
-
- # 3. 提交到 MinerU
- files_meta = [{"name": file_name, "data_id": doc_id}]
- batch_id, upload_urls = apply_upload_urls(files_meta)
-
- upload_files([file_content], upload_urls)
-
- # 4. 轮询结果
- results = poll_batch(doc_id, batch_id)
- result = results[0]
-
- if result.get("state") == "done":
- zip_url = result.get("full_zip_url")
- if zip_url:
- # 5. 下载并处理结果
- zip_resp = requests.get(zip_url, timeout=300)
- zip_resp.raise_for_status()
-
- # 解压并处理结果
- with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z:
- # 查找 .md 文件
- md_files = [f for f in z.namelist() if f.endswith(".md")]
- # 查找 .json 文件 (通常是 content_list.json)
- json_files = [f for f in z.namelist() if f.endswith(".json")]
-
- md_cloud_url = None
- json_cloud_url = None
-
- if md_files:
- md_content = z.read(md_files[0])
- # 构造云端存储路径
- md_object_name = f"{MINIO_BASE_PATH}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.md"
- # 上传到 MinIO
- md_cloud_url = upload_to_minio(md_content, md_object_name, content_type="text/markdown")
-
- if json_files:
- # 优先取 content_list.json
- json_file = next((f for f in json_files if "content_list" in f), json_files[0])
- json_content = z.read(json_file)
- # 构造云端存储路径
- json_object_name = f"{MINIO_BASE_PATH}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.json"
- # 上传到 MinIO
- json_cloud_url = upload_to_minio(json_content, json_object_name, content_type="application/json")
-
- # 6. 更新数据库
- update_db_status(doc_id, status=2,
- md_url=md_cloud_url,
- json_url=json_cloud_url)
- logger.info(f"[{doc_id}] Processed successfully. MD: {md_cloud_url}, JSON: {json_cloud_url}")
- else:
- update_db_status(doc_id, status=3, error="Full ZIP URL not found")
- else:
- update_db_status(doc_id, status=3, error=result.get("err_msg", "Conversion failed"))
-
- except Exception as e:
- logger.exception(f"[{doc_id}] Error processing document: {e}")
- update_db_status(doc_id, status=3, error=str(e))
- def main_cli(doc_id, out_dir=r"d:\UGit\MinerU"):
- # 从数据库获取详细信息 - 直接从 t_samp_document_main 获取
- conn = get_db_connection()
- if not conn:
- logger.error("Database connection failed")
- return
-
- try:
- with conn.cursor() as cursor:
- # 优先从 t_samp_document_main 获取 title 和 file_url
- cursor.execute("SELECT title, file_url FROM t_samp_document_main WHERE id = %s", (doc_id,))
- row = cursor.fetchone()
- if not row or not row[1]: # 如果主表没有 file_url,尝试从子表获取
- if not row:
- logger.warning(f"Document not found: {doc_id}")
- return
-
- title = row[0]
- # 尝试从子表获取 (兼容旧数据)
- cursor.execute("SELECT source_type, source_id FROM t_samp_document_main WHERE id = %s", (doc_id,))
- st_row = cursor.fetchone()
- if st_row:
- source_type, source_id = st_row
- TABLE_MAP = {
- "basis": "t_samp_standard_base_info",
- "work": "t_samp_construction_plan_base_info",
- "job": "t_samp_office_regulations"
- }
- table_name = TABLE_MAP.get(source_type)
- if table_name:
- # 尝试不同的 url 字段名
- url_fields = ['file_url', 'source_url', 'url']
- for field in url_fields:
- try:
- cursor.execute(f"SELECT {field} FROM {table_name} WHERE id = %s", (source_id,))
- url_row = cursor.fetchone()
- if url_row and url_row[0]:
- file_url = url_row[0]
- process_document(doc_id, title, file_url, out_dir)
- return
- except:
- continue
-
- logger.error(f"No file_url found for document: {doc_id}")
- update_db_status(doc_id, status=3, error="未找到文件链接(file_url)")
- return
-
- title, file_url = row
- process_document(doc_id, title, file_url, out_dir)
- finally:
- conn.close()
- if __name__ == "__main__":
- # 示例用法:python miner_u.py <doc_id>
- import sys
- if len(sys.argv) > 1:
- # 这里的参数处理需要微调,因为以前是 python miner_u.py <table_type> <doc_id>
- # 现在我们只需要 <doc_id>,但为了兼容性,我们可以检查参数个数
- if len(sys.argv) == 3:
- # 旧格式: python miner_u.py basis <doc_id>
- main_cli(sys.argv[2])
- else:
- # 新格式: python miner_u.py <doc_id>
- main_cli(sys.argv[1])
- else:
- print("Usage: python miner_u.py <doc_id>")
|