miner_u.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. import os
  2. import time
  3. import json
  4. import logging
  5. import requests
  6. import pymysql
  7. import zipfile
  8. import io
  9. from datetime import datetime
  10. from pathlib import Path
  11. from urllib.parse import urlparse
  12. from minio import Minio
  13. # 配置日志
  14. logging.basicConfig(
  15. level=logging.INFO,
  16. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  17. )
  18. logger = logging.getLogger("MinerU")
  19. # 导入配置
  20. import sys
  21. sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
  22. from app.core.config import config_handler
  23. # MinIO 配置
  24. MINIO_ENDPOINT = config_handler.get("admin_app", "MINIO_ENDPOINT", "192.168.91.15:19000")
  25. MINIO_ACCESS_KEY = config_handler.get("admin_app", "MINIO_ACCESS_KEY", "minioadmin")
  26. MINIO_SECRET_KEY = config_handler.get("admin_app", "MINIO_SECRET_KEY", "minioadmin")
  27. MINIO_BUCKET = config_handler.get("admin_app", "MINIO_BUCKET_NAME", "aidata")
  28. MINIO_USE_SSL = config_handler.get_bool("admin_app", "MINIO_USE_SSL", False)
  29. MINIO_BASE_PATH = config_handler.get("admin_app", "MINIO_BASE_PATH", "sampledata")
  30. def get_minio_client():
  31. try:
  32. return Minio(
  33. MINIO_ENDPOINT,
  34. access_key=MINIO_ACCESS_KEY,
  35. secret_key=MINIO_SECRET_KEY,
  36. secure=MINIO_USE_SSL
  37. )
  38. except Exception as e:
  39. logger.error(f"MinIO client init failed: {e}")
  40. return None
  41. def upload_to_minio(file_content, object_name, content_type="text/markdown"):
  42. client = get_minio_client()
  43. if not client:
  44. return None
  45. try:
  46. # 确保桶存在
  47. if not client.bucket_exists(MINIO_BUCKET):
  48. client.make_bucket(MINIO_BUCKET)
  49. # 上传文件
  50. client.put_object(
  51. MINIO_BUCKET,
  52. object_name,
  53. io.BytesIO(file_content),
  54. len(file_content),
  55. content_type=content_type
  56. )
  57. # 返回访问链接
  58. return f"http://{MINIO_ENDPOINT}/{MINIO_BUCKET}/{object_name}"
  59. except Exception as e:
  60. logger.error(f"Upload to MinIO failed: {e}")
  61. return None
  62. # MinERU 配置
  63. MINERU_TOKEN = config_handler.get("admin_app", "MINERU_TOKEN", "")
  64. API_APPLY = config_handler.get("admin_app", "MINERU_API_APPLY", "https://mineru.net/api/v4/file-urls/batch")
  65. API_BATCH_RESULT = config_handler.get("admin_app", "MINERU_API_BATCH_RESULT", "https://mineru.net/api/v4/extract-results/batch/{}")
  66. HEADERS = {
  67. "Content-Type": "application/json",
  68. "Authorization": f"Bearer {MINERU_TOKEN}",
  69. }
  70. SUPPORTED_SUFFIX = {".pdf", ".doc", ".docx", ".ppt", ".pptx", ".png", ".jpg", ".jpeg", ".html"}
  71. def get_db_connection():
  72. database_url = config_handler.get("admin_app", "DATABASE_URL", "")
  73. if not database_url:
  74. logger.error("DATABASE_URL not found in configuration")
  75. return None
  76. try:
  77. parsed = urlparse(database_url)
  78. return pymysql.connect(
  79. host=parsed.hostname,
  80. port=parsed.port or 3306,
  81. user=parsed.username,
  82. password=parsed.password,
  83. database=parsed.path[1:],
  84. charset='utf8mb4',
  85. autocommit=True
  86. )
  87. except Exception as e:
  88. logger.error(f"Database connection error: {e}")
  89. return None
  90. def update_db_status(doc_id, status=None, progress=None, error=None, converted_file_name=None):
  91. conn = get_db_connection()
  92. if not conn:
  93. return
  94. try:
  95. with conn.cursor() as cursor:
  96. updates = []
  97. params = []
  98. if status is not None:
  99. updates.append("conversion_status = %s")
  100. params.append(status)
  101. if progress is not None:
  102. updates.append("conversion_progress = %s")
  103. params.append(progress)
  104. if error is not None:
  105. updates.append("conversion_error = %s")
  106. params.append(error)
  107. if converted_file_name is not None:
  108. updates.append("converted_file_name = %s")
  109. params.append(converted_file_name)
  110. if not updates:
  111. return
  112. # 同时更新修改时间
  113. updates.append("updated_time = NOW()")
  114. sql = f"UPDATE t_document_main SET {', '.join(updates)} WHERE id = %s"
  115. params.append(doc_id)
  116. cursor.execute(sql, params)
  117. except Exception as e:
  118. logger.error(f"Update DB failed: {e}")
  119. finally:
  120. conn.close()
  121. def apply_upload_urls(files_meta, model_version="vlm"):
  122. payload = {
  123. "files": files_meta,
  124. "model_version": model_version,
  125. }
  126. r = requests.post(API_APPLY, headers=HEADERS, json=payload, timeout=60)
  127. r.raise_for_status()
  128. j = r.json()
  129. if j.get("code") != 0:
  130. raise RuntimeError(f"apply upload urls failed: {j.get('msg')}")
  131. return j["data"]["batch_id"], j["data"]["file_urls"]
  132. def upload_files(file_data_list, upload_urls):
  133. for data, url in zip(file_data_list, upload_urls):
  134. res = requests.put(url, data=data, timeout=300)
  135. if res.status_code != 200:
  136. raise RuntimeError(f"upload failed to {url}, status={res.status_code}")
  137. def poll_batch(batch_id, interval_sec=5, timeout_sec=1800):
  138. deadline = time.time() + timeout_sec
  139. while True:
  140. r = requests.get(API_BATCH_RESULT.format(batch_id), headers=HEADERS, timeout=60)
  141. r.raise_for_status()
  142. j = r.json()
  143. if j.get("code") != 0:
  144. raise RuntimeError(f"poll failed: {j.get('msg')}")
  145. results = j["data"]["extract_result"]
  146. states = [it.get("state") for it in results]
  147. if all(s in ("done", "failed") for s in states):
  148. return results
  149. if time.time() > deadline:
  150. raise TimeoutError(f"poll timeout for batch_id={batch_id}")
  151. time.sleep(interval_sec)
  152. def process_document(doc_id, chinese_name, file_url, out_dir):
  153. try:
  154. # 1. 更新状态:开始转换
  155. update_db_status(doc_id, status=1, progress=10)
  156. # 2. 下载原始文件
  157. logger.info(f"Downloading {file_url}...")
  158. resp = requests.get(file_url, timeout=60)
  159. resp.raise_for_status()
  160. file_content = resp.content
  161. # 检查文件类型
  162. content_type = resp.headers.get("Content-Type", "").lower()
  163. if "text/html" in content_type:
  164. raise RuntimeError("不支持对网页链接进行转换,请直接查看原链接。")
  165. file_ext = Path(urlparse(file_url).path).suffix.lower()
  166. if not file_ext:
  167. file_ext = ".pdf" # Default
  168. file_name = f"{chinese_name}{file_ext}"
  169. update_db_status(doc_id, progress=30)
  170. # 3. 提交到 MinerU
  171. files_meta = [{"name": file_name, "data_id": doc_id}]
  172. batch_id, upload_urls = apply_upload_urls(files_meta)
  173. upload_files([file_content], upload_urls)
  174. update_db_status(doc_id, progress=50)
  175. # 4. 轮询结果
  176. results = poll_batch(batch_id)
  177. result = results[0]
  178. if result.get("state") == "done":
  179. zip_url = result.get("full_zip_url")
  180. if zip_url:
  181. # 5. 下载并处理结果
  182. update_db_status(doc_id, progress=80)
  183. zip_resp = requests.get(zip_url, timeout=300)
  184. zip_resp.raise_for_status()
  185. # 解压并处理结果
  186. with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as z:
  187. # 查找 .md 文件
  188. md_files = [f for f in z.namelist() if f.endswith(".md")]
  189. if md_files:
  190. md_content = z.read(md_files[0])
  191. # 构造云端存储路径
  192. object_name = f"{MINIO_BASE_PATH}/converted/{datetime.now().strftime('%Y%m%d')}/{doc_id}.md"
  193. # 上传到 MinIO
  194. cloud_url = upload_to_minio(md_content, object_name)
  195. if cloud_url:
  196. logger.info(f"Uploaded converted file to {cloud_url}")
  197. update_db_status(doc_id, status=2, progress=100, converted_file_name=cloud_url)
  198. return True
  199. else:
  200. raise RuntimeError("Failed to upload converted file to MinIO")
  201. else:
  202. raise RuntimeError("No .md file found in the converted zip")
  203. else:
  204. raise RuntimeError("No zip URL in result")
  205. else:
  206. err_msg = result.get("err_msg", "Unknown error")
  207. raise RuntimeError(f"MinerU extraction failed: {err_msg}")
  208. except Exception as e:
  209. logger.error(f"Process failed: {e}")
  210. update_db_status(doc_id, status=3, error=str(e))
  211. return False
  212. def main_cli(doc_id, out_dir=r"d:\UGit\MinerU"):
  213. # 从数据库获取详细信息 - 直接从 t_document_main 获取
  214. conn = get_db_connection()
  215. if not conn:
  216. logger.error("Database connection failed")
  217. return
  218. try:
  219. with conn.cursor() as cursor:
  220. # 优先从 t_document_main 获取 title 和 file_url
  221. cursor.execute("SELECT title, file_url FROM t_document_main WHERE id = %s", (doc_id,))
  222. row = cursor.fetchone()
  223. if not row or not row[1]: # 如果主表没有 file_url,尝试从子表获取
  224. if not row:
  225. logger.warning(f"Document not found: {doc_id}")
  226. return
  227. title = row[0]
  228. # 尝试从子表获取 (兼容旧数据)
  229. cursor.execute("SELECT source_type, source_id FROM t_document_main WHERE id = %s", (doc_id,))
  230. st_row = cursor.fetchone()
  231. if st_row:
  232. source_type, source_id = st_row
  233. TABLE_MAP = {
  234. "basis": "t_samp_standard_base_info",
  235. "work": "t_samp_construction_plan_base_info",
  236. "job": "t_samp_office_regulations"
  237. }
  238. table_name = TABLE_MAP.get(source_type)
  239. if table_name:
  240. # 尝试不同的 url 字段名
  241. url_fields = ['file_url', 'source_url', 'url']
  242. for field in url_fields:
  243. try:
  244. cursor.execute(f"SELECT {field} FROM {table_name} WHERE id = %s", (source_id,))
  245. url_row = cursor.fetchone()
  246. if url_row and url_row[0]:
  247. file_url = url_row[0]
  248. process_document(doc_id, title, file_url, out_dir)
  249. return
  250. except:
  251. continue
  252. logger.error(f"No file_url found for document: {doc_id}")
  253. update_db_status(doc_id, status=3, error="未找到文件链接(file_url)")
  254. return
  255. title, file_url = row
  256. process_document(doc_id, title, file_url, out_dir)
  257. finally:
  258. conn.close()
  259. if __name__ == "__main__":
  260. # 示例用法:python miner_u.py <doc_id>
  261. import sys
  262. if len(sys.argv) > 1:
  263. # 这里的参数处理需要微调,因为以前是 python miner_u.py <table_type> <doc_id>
  264. # 现在我们只需要 <doc_id>,但为了兼容性,我们可以检查参数个数
  265. if len(sys.argv) == 3:
  266. # 旧格式: python miner_u.py basis <doc_id>
  267. main_cli(sys.argv[2])
  268. else:
  269. # 新格式: python miner_u.py <doc_id>
  270. main_cli(sys.argv[1])
  271. else:
  272. print("Usage: python miner_u.py <doc_id>")