|
|
@@ -1,15 +1,7 @@
|
|
|
import os
|
|
|
-import time
|
|
|
-import json
|
|
|
+import sys
|
|
|
import logging
|
|
|
-import requests
|
|
|
-import pymysql
|
|
|
-import zipfile
|
|
|
-import io
|
|
|
-from datetime import datetime
|
|
|
-from pathlib import Path
|
|
|
from urllib.parse import urlparse
|
|
|
-from app.base.minio_connection import get_minio_manager
|
|
|
|
|
|
# 配置日志
|
|
|
logging.basicConfig(
|
|
|
@@ -18,294 +10,75 @@ logging.basicConfig(
|
|
|
)
|
|
|
logger = logging.getLogger("MinerU")
|
|
|
|
|
|
-# 导入配置
|
|
|
-import sys
|
|
|
+# 导入配置和管理器
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
|
|
|
-from app.core.config import config_handler
|
|
|
-
|
|
|
-# MinIO 配置
|
|
|
-minio_manager = get_minio_manager()
|
|
|
-MINIO_BASE_PATH = minio_manager.base_path
|
|
|
-
|
|
|
-def upload_to_minio(file_content, object_name, content_type="text/markdown"):
|
|
|
- try:
|
|
|
- return minio_manager.upload_file(file_content, object_name, content_type)
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Upload to MinIO failed: {e}")
|
|
|
- return None
|
|
|
-
|
|
|
-# MinERU 配置
|
|
|
-MINERU_TOKEN = config_handler.get("admin_app", "MINERU_TOKEN", "")
|
|
|
-API_APPLY = config_handler.get("admin_app", "MINERU_API_APPLY", "https://mineru.net/api/v4/file-urls/batch")
|
|
|
-API_BATCH_RESULT = config_handler.get("admin_app", "MINERU_API_BATCH_RESULT", "https://mineru.net/api/v4/extract-results/batch/{}")
|
|
|
-
|
|
|
-HEADERS = {
|
|
|
- "Content-Type": "application/json",
|
|
|
- "Authorization": f"Bearer {MINERU_TOKEN}",
|
|
|
-}
|
|
|
-
|
|
|
-SUPPORTED_SUFFIX = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"}
|
|
|
-
|
|
|
-def get_db_connection():
|
|
|
- database_url = config_handler.get("admin_app", "DATABASE_URL", "")
|
|
|
- if not database_url:
|
|
|
- logger.error("DATABASE_URL not found in configuration")
|
|
|
- return None
|
|
|
- try:
|
|
|
- parsed = urlparse(database_url)
|
|
|
- return pymysql.connect(
|
|
|
- host=parsed.hostname,
|
|
|
- port=parsed.port or 3306,
|
|
|
- user=parsed.username,
|
|
|
- password=parsed.password,
|
|
|
- database=parsed.path[1:],
|
|
|
- charset='utf8mb4',
|
|
|
- autocommit=True
|
|
|
- )
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Database connection error: {e}")
|
|
|
- return None
|
|
|
-
|
|
|
-def update_db_status(doc_id, status=None, error=None, md_url=None, json_url=None):
|
|
|
+from app.base.mineru_connection import get_mineru_manager
|
|
|
+from app.base.async_mysql_connection import get_db_connection
|
|
|
+
|
|
|
+def main_cli(doc_id):
|
|
|
+ """
|
|
|
+ MinerU 命令行入口,供后台进程调用
|
|
|
+ """
|
|
|
+ manager = get_mineru_manager()
|
|
|
conn = get_db_connection()
|
|
|
if not conn:
|
|
|
+ logger.error("数据库连接失败")
|
|
|
return
|
|
|
+
|
|
|
try:
|
|
|
with conn.cursor() as cursor:
|
|
|
- updates = []
|
|
|
- params = []
|
|
|
- if status is not None:
|
|
|
- updates.append("conversion_status = %s")
|
|
|
- params.append(status)
|
|
|
- if error is not None:
|
|
|
- updates.append("conversion_error = %s")
|
|
|
- params.append(error)
|
|
|
- if md_url is not None:
|
|
|
- updates.append("md_url = %s")
|
|
|
- params.append(md_url)
|
|
|
- if json_url is not None:
|
|
|
- updates.append("json_url = %s")
|
|
|
- params.append(json_url)
|
|
|
-
|
|
|
- if not updates:
|
|
|
+ # 1. 获取文档基本信息
|
|
|
+ cursor.execute("SELECT title, file_url, source_type, source_id FROM t_samp_document_main WHERE id = %s", (doc_id,))
|
|
|
+ row = cursor.fetchone()
|
|
|
+ if not row:
|
|
|
+ logger.warning(f"文档不存在: {doc_id}")
|
|
|
return
|
|
|
|
|
|
- # 同时更新修改时间
|
|
|
- updates.append("updated_time = NOW()")
|
|
|
-
|
|
|
- sql = f"UPDATE t_samp_document_main SET {', '.join(updates)} WHERE id = %s"
|
|
|
- params.append(doc_id)
|
|
|
- cursor.execute(sql, params)
|
|
|
-
|
|
|
- # 如果更新了 json_url 或 md_url,同步更新到子表
|
|
|
- if json_url is not None or md_url is not None:
|
|
|
- try:
|
|
|
- cursor.execute("SELECT source_type, source_id FROM t_samp_document_main WHERE id = %s", (doc_id,))
|
|
|
- row = cursor.fetchone()
|
|
|
- if row and row[0] and row[1]:
|
|
|
- source_type, source_id = row[0], row[1]
|
|
|
- TABLE_MAP = {
|
|
|
- "basis": "t_samp_standard_base_info",
|
|
|
- "work": "t_samp_construction_plan_base_info",
|
|
|
- "job": "t_samp_office_regulations"
|
|
|
- }
|
|
|
- table_name = TABLE_MAP.get(source_type)
|
|
|
- if table_name:
|
|
|
- sub_updates = []
|
|
|
- sub_params = []
|
|
|
- if json_url is not None:
|
|
|
- sub_updates.append("json_url = %s")
|
|
|
- sub_params.append(json_url)
|
|
|
-
|
|
|
- if sub_updates:
|
|
|
- sub_sql = f"UPDATE {table_name} SET {', '.join(sub_updates)} WHERE id = %s"
|
|
|
- sub_params.append(source_id)
|
|
|
- cursor.execute(sub_sql, sub_params)
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Sync URLs to sub-table failed: {e}")
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Update DB failed: {e}")
|
|
|
- finally:
|
|
|
- conn.close()
|
|
|
-
|
|
|
-def apply_upload_urls(files_meta, model_version="vlm"):
|
|
|
- payload = {
|
|
|
- "files": files_meta,
|
|
|
- "model_version": model_version,
|
|
|
- }
|
|
|
- r = requests.post(API_APPLY, headers=HEADERS, json=payload, timeout=60)
|
|
|
- r.raise_for_status()
|
|
|
- j = r.json()
|
|
|
- if j.get("code") != 0:
|
|
|
- raise RuntimeError(f"apply upload urls failed: {j.get('msg')}")
|
|
|
- return j["data"]["batch_id"], j["data"]["file_urls"]
|
|
|
-
|
|
|
-def upload_files(file_data_list, upload_urls):
|
|
|
- for data, url in zip(file_data_list, upload_urls):
|
|
|
- res = requests.put(url, data=data, timeout=300)
|
|
|
- if res.status_code != 200:
|
|
|
- raise RuntimeError(f"upload failed to {url}, status={res.status_code}")
|
|
|
-
|
|
|
-def poll_batch(doc_id, batch_id, interval_sec=5, timeout_sec=1800):
|
|
|
- deadline = time.time() + timeout_sec
|
|
|
- while True:
|
|
|
- r = requests.get(API_BATCH_RESULT.format(batch_id), headers=HEADERS, timeout=60)
|
|
|
- r.raise_for_status()
|
|
|
- j = r.json()
|
|
|
- if j.get("code") != 0:
|
|
|
- raise RuntimeError(f"poll failed: {j.get('msg')}")
|
|
|
- results = j["data"]["extract_result"]
|
|
|
- states = [it.get("state") for it in results]
|
|
|
-
|
|
|
- if all(s in ("done", "failed") for s in states):
|
|
|
- return results
|
|
|
-
|
|
|
- if time.time() > deadline:
|
|
|
- raise TimeoutError(f"poll timeout for batch_id={batch_id}")
|
|
|
- time.sleep(interval_sec)
|
|
|
-
|
|
|
-def process_document(doc_id, chinese_name, file_url, out_dir):
|
|
|
- try:
|
|
|
- # 1. 更新状态:开始转换
|
|
|
- update_db_status(doc_id, status=1)
|
|
|
-
|
|
|
- # 2. 下载原始文件
|
|
|
- logger.info(f"Downloading {file_url}...")
|
|
|
- resp = requests.get(file_url, timeout=60)
|
|
|
- resp.raise_for_status()
|
|
|
- file_content = resp.content
|
|
|
-
|
|
|
- # 检查文件类型
|
|
|
- content_type = resp.headers.get("Content-Type", "").lower()
|
|
|
- if "text/html" in content_type:
|
|
|
- raise RuntimeError("不支持对网页链接进行转换,请直接查看原链接。")
|
|
|
-
|
|
|
- file_ext = Path(urlparse(file_url).path).suffix.lower()
|
|
|
- if not file_ext:
|
|
|
- file_ext = ".pdf" # Default
|
|
|
+ title = row['title']
|
|
|
+ file_url = row['file_url']
|
|
|
+ source_type = row['source_type']
|
|
|
+ source_id = row['source_id']
|
|
|
+
|
|
|
+ # 2. 如果主表没有 file_url,尝试从子表获取 (兼容逻辑)
|
|
|
+ if not file_url and source_type and source_id:
|
|
|
+ TABLE_MAP = {
|
|
|
+ "basis": "t_samp_standard_base_info",
|
|
|
+ "work": "t_samp_construction_plan_base_info",
|
|
|
+ "job": "t_samp_office_regulations"
|
|
|
+ }
|
|
|
+ table_name = TABLE_MAP.get(source_type)
|
|
|
+ if table_name:
|
|
|
+ url_fields = ['file_url', 'source_url', 'url']
|
|
|
+ for field in url_fields:
|
|
|
+ try:
|
|
|
+ cursor.execute(f"SELECT {field} FROM {table_name} WHERE id = %s", (source_id,))
|
|
|
+ url_row = cursor.fetchone()
|
|
|
+ if url_row and url_row[field]:
|
|
|
+ file_url = url_row[field]
|
|
|
+ break
|
|
|
+ except:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if not file_url:
|
|
|
+ logger.error(f"未找到文件链接: {doc_id}")
|
|
|
+ manager.update_db_status(doc_id, status=3, error="未找到文件链接(file_url)")
|
|
|
+ return
|
|
|
|
|
|
- file_name = f"{chinese_name}{file_ext}"
|
|
|
-
|
|
|
- # 3. 提交到 MinerU
|
|
|
- files_meta = [{"name": file_name, "data_id": doc_id}]
|
|
|
- batch_id, upload_urls = apply_upload_urls(files_meta)
|
|
|
-
|
|
|
- upload_files([file_content], upload_urls)
|
|
|
-
|
|
|
- # 4. 轮询结果
|
|
|
- results = poll_batch(doc_id, batch_id)
|
|
|
- result = results[0]
|
|
|
-
|
|
|
- if result.get("state") == "done":
|
|
|
- zip_url = result.get("full_zip_url")
|
|
|
- if zip_url:
|
|
|
- # 5. 下载并处理结果
|
|
|
- zip_resp = requests.get(zip_url, timeout=300)
|
|
|
- zip_resp.raise_for_status()
|
|
|
-
|
|
|
- # 解压并处理结果
|
|
|
- with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z:
|
|
|
- # 查找 .md 文件
|
|
|
- md_files = [f for f in z.namelist() if f.endswith(".md")]
|
|
|
- # 查找 .json 文件 (通常是 content_list.json)
|
|
|
- json_files = [f for f in z.namelist() if f.endswith(".json")]
|
|
|
-
|
|
|
- md_cloud_url = None
|
|
|
- json_cloud_url = None
|
|
|
-
|
|
|
- if md_files:
|
|
|
- md_content = z.read(md_files[0])
|
|
|
- # 构造云端存储路径
|
|
|
- md_object_name = f"{MINIO_BASE_PATH}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.md"
|
|
|
- # 上传到 MinIO
|
|
|
- md_cloud_url = upload_to_minio(md_content, md_object_name, content_type="text/markdown")
|
|
|
-
|
|
|
- if json_files:
|
|
|
- # 优先取 content_list.json
|
|
|
- json_file = next((f for f in json_files if "content_list" in f), json_files[0])
|
|
|
- json_content = z.read(json_file)
|
|
|
- # 构造云端存储路径
|
|
|
- json_object_name = f"{MINIO_BASE_PATH}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.json"
|
|
|
- # 上传到 MinIO
|
|
|
- json_cloud_url = upload_to_minio(json_content, json_object_name, content_type="application/json")
|
|
|
-
|
|
|
- # 6. 更新数据库
|
|
|
- update_db_status(doc_id, status=2,
|
|
|
- md_url=md_cloud_url,
|
|
|
- json_url=json_cloud_url)
|
|
|
- logger.info(f"[{doc_id}] Processed successfully. MD: {md_cloud_url}, JSON: {json_cloud_url}")
|
|
|
- else:
|
|
|
- update_db_status(doc_id, status=3, error="Full ZIP URL not found")
|
|
|
- else:
|
|
|
- update_db_status(doc_id, status=3, error=result.get("err_msg", "Conversion failed"))
|
|
|
+ # 3. 调用管理器执行转换
|
|
|
+ logger.info(f"开始处理文档 [{doc_id}]: {title}")
|
|
|
+ manager.process_document(doc_id, title, file_url)
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.exception(f"[{doc_id}] Error processing document: {e}")
|
|
|
- update_db_status(doc_id, status=3, error=str(e))
|
|
|
-
|
|
|
-def main_cli(doc_id, out_dir=r"d:\UGit\MinerU"):
|
|
|
- # 从数据库获取详细信息 - 直接从 t_samp_document_main 获取
|
|
|
- conn = get_db_connection()
|
|
|
- if not conn:
|
|
|
- logger.error("Database connection failed")
|
|
|
- return
|
|
|
-
|
|
|
- try:
|
|
|
- with conn.cursor() as cursor:
|
|
|
- # 优先从 t_samp_document_main 获取 title 和 file_url
|
|
|
- cursor.execute("SELECT title, file_url FROM t_samp_document_main WHERE id = %s", (doc_id,))
|
|
|
- row = cursor.fetchone()
|
|
|
- if not row or not row[1]: # 如果主表没有 file_url,尝试从子表获取
|
|
|
- if not row:
|
|
|
- logger.warning(f"Document not found: {doc_id}")
|
|
|
- return
|
|
|
-
|
|
|
- title = row[0]
|
|
|
- # 尝试从子表获取 (兼容旧数据)
|
|
|
- cursor.execute("SELECT source_type, source_id FROM t_samp_document_main WHERE id = %s", (doc_id,))
|
|
|
- st_row = cursor.fetchone()
|
|
|
- if st_row:
|
|
|
- source_type, source_id = st_row
|
|
|
- TABLE_MAP = {
|
|
|
- "basis": "t_samp_standard_base_info",
|
|
|
- "work": "t_samp_construction_plan_base_info",
|
|
|
- "job": "t_samp_office_regulations"
|
|
|
- }
|
|
|
- table_name = TABLE_MAP.get(source_type)
|
|
|
- if table_name:
|
|
|
- # 尝试不同的 url 字段名
|
|
|
- url_fields = ['file_url', 'source_url', 'url']
|
|
|
- for field in url_fields:
|
|
|
- try:
|
|
|
- cursor.execute(f"SELECT {field} FROM {table_name} WHERE id = %s", (source_id,))
|
|
|
- url_row = cursor.fetchone()
|
|
|
- if url_row and url_row[0]:
|
|
|
- file_url = url_row[0]
|
|
|
- process_document(doc_id, title, file_url, out_dir)
|
|
|
- return
|
|
|
- except:
|
|
|
- continue
|
|
|
-
|
|
|
- logger.error(f"No file_url found for document: {doc_id}")
|
|
|
- update_db_status(doc_id, status=3, error="未找到文件链接(file_url)")
|
|
|
- return
|
|
|
-
|
|
|
- title, file_url = row
|
|
|
- process_document(doc_id, title, file_url, out_dir)
|
|
|
+ logger.exception(f"处理文档 [{doc_id}] 时发生未捕获异常: {e}")
|
|
|
+ manager.update_db_status(doc_id, status=3, error=str(e))
|
|
|
finally:
|
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
- # 示例用法:python miner_u.py <doc_id>
|
|
|
- import sys
|
|
|
if len(sys.argv) > 1:
|
|
|
- # 这里的参数处理需要微调,因为以前是 python miner_u.py <table_type> <doc_id>
|
|
|
- # 现在我们只需要 <doc_id>,但为了兼容性,我们可以检查参数个数
|
|
|
- if len(sys.argv) == 3:
|
|
|
- # 旧格式: python miner_u.py basis <doc_id>
|
|
|
- main_cli(sys.argv[2])
|
|
|
- else:
|
|
|
- # 新格式: python miner_u.py <doc_id>
|
|
|
- main_cli(sys.argv[1])
|
|
|
+ # 兼容旧格式: python miner_u.py <table_type> <doc_id>
|
|
|
+ # 兼容新格式: python miner_u.py <doc_id>
|
|
|
+ target_id = sys.argv[-1]
|
|
|
+ main_cli(target_id)
|
|
|
else:
|
|
|
print("Usage: python miner_u.py <doc_id>")
|