فهرست منبع

fix: 添加UTF-8 字节 + token 保护切分,并保留了表格优先按行切分的逻辑

ai02 4 هفته پیش
والد
کامیت
d221e08ff2
1فایلهای تغییر یافته به همراه696 افزوده شده و 45 حذف شده
  1. 696 45
      src/app/scripts/base_in_collection.py

+ 696 - 45
src/app/scripts/base_in_collection.py

@@ -1,13 +1,22 @@
 """
 将 JSON 文件中的 parent 和 children 数据插入 Milvus Collection。
 读取每个子文件夹的 JSON,解析 doc、parent、children 数组并构造入库数据。
+
+功能特性:
+- 分批插入:解决 GRPC 消息大小限制(默认 64MB)
+- 进度显示:实时显示处理进度
+- 断点续传:支持从上次中断处继续
+- 暂停保存:可暂停并保存进度和错误文件
 """
 from __future__ import annotations
 
 import json
+import signal
+import sys
+import time
 from datetime import datetime
 from pathlib import Path
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Tuple
 
 from pymilvus import MilvusClient
 
@@ -15,11 +24,16 @@ from app.config.embeddings import get_embeddings
 from app.config.milvus_client import get_milvusclient
 from app.config.setting import settings
 
+# ==================== 配置区域 ====================
+
 # 根目录配置
-ROOT_FOLDER = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终编制依据"
+ROOT_FOLDER = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终编制依据\错误"
 
 # 失败汇总JSON保存路径
-FAILED_REPORT_PATH = r"C:\Users\ZengChao\Desktop\base_collection_failed_report.json"
+FAILED_REPORT_PATH = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终编制依据\错误\base_collection_failed_report.json"
+
+# 进度保存文件路径(断点续传用)
+PROGRESS_FILE_PATH = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终编制依据\错误\base_collection_progress.json"
 
 # Collection 名称
 PARENT_COLLECTION_NAME = "t_rag_kng_standard_parent"
@@ -31,6 +45,33 @@ DEFAULT_USER_ID = "ed6a79d3-0083-4d81-8b48-fc522f686f74"
 # MinIO URL 前缀
 PREFIX = "/standard"
 
+# 分批插入配置:每批最大行数(根据平均行大小调整,避免超过 64MB GRPC 限制)
+BATCH_SIZE = 50  # 每批插入的最大行数
+
+# GRPC 消息大小限制(字节)- 留有余量
+MAX_GRPC_MESSAGE_SIZE = 60 * 1024 * 1024  # 60MB
+MILVUS_VARCHAR_MAX_LENGTH = 65535
+
+# embedding 模型 token 保护(保守估算)
+# 线上报错显示中文长文本在约 2.8 万字节时仍可超过 16384 tokens。
+# 因此这里采用更保守阈值:按 1 byte ~= 1 token 估算,再乘安全系数,
+# 优先保证 embedding 请求不触发 context length 400。
+EMBEDDING_MAX_INPUT_TOKENS = 16384
+EMBEDDING_TOKEN_SAFETY_RATIO = 0.75
+
+# TEXT_SAFE_MAX_LENGTH 需同时满足:
+# 1. Milvus VARCHAR 字段限制:65535 字节
+# 2. 嵌入模型 token 近似限制:16384 × 1 × 0.75 = 12288 字节
+# 取两者中的较小值,确保先满足 embedding 上限。
+TEXT_SAFE_MAX_LENGTH = min(
+    MILVUS_VARCHAR_MAX_LENGTH,
+    int(EMBEDDING_MAX_INPUT_TOKENS * EMBEDDING_TOKEN_SAFETY_RATIO),
+)  # 12288
+
+# 重试配置
+MAX_RETRY_ATTEMPTS = 3
+RETRY_DELAY_SECONDS = 2
+
 # 字段简写映射
 DOCUMENT_TYPE_MAP = {
     "国家标准": "GB",
@@ -71,6 +112,452 @@ PROFESSIONAL_FIELD_MAP = {
     "其他": "QT",
 }
 
+# ==================== 全局状态 ====================
+
+# 暂停标志
+_pause_requested = False
+
+# 当前进度(用于保存)
+_current_progress = {
+    "processed_folders": [],  # 已处理的文件夹列表
+    "current_folder": None,   # 当前正在处理的文件夹
+    "stats": None,            # 统计信息
+}
+
+
+def signal_handler(signum, frame):
+    """处理暂停信号(Ctrl+C)"""
+    global _pause_requested
+    print("\n\n⚠️  检测到暂停请求,正在保存进度...")
+    _pause_requested = True
+
+
+# 注册信号处理器
+signal.signal(signal.SIGINT, signal_handler)
+signal.signal(signal.SIGTERM, signal_handler)
+
+
+def save_progress():
+    """保存当前进度到文件"""
+    global _current_progress
+    try:
+        with open(PROGRESS_FILE_PATH, "w", encoding="utf-8") as f:
+            json.dump(_current_progress, f, ensure_ascii=False, indent=2)
+        print(f"💾 进度已保存到: {PROGRESS_FILE_PATH}")
+    except Exception as e:
+        print(f"❌ 保存进度失败: {e}")
+
+
+def load_progress() -> Dict[str, Any]:
+    """从文件加载进度"""
+    try:
+        progress_file = Path(PROGRESS_FILE_PATH)
+        if progress_file.exists():
+            with open(progress_file, "r", encoding="utf-8") as f:
+                return json.load(f)
+    except Exception as e:
+        print(f"⚠️  加载进度文件失败: {e}")
+    return {"processed_folders": [], "current_folder": None, "stats": None}
+
+
+def clear_progress():
+    """清除进度文件"""
+    try:
+        progress_file = Path(PROGRESS_FILE_PATH)
+        if progress_file.exists():
+            progress_file.unlink()
+            print(f"🗑️  已清除进度文件")
+    except Exception as e:
+        print(f"⚠️  清除进度文件失败: {e}")
+
+
+def estimate_entity_size(entity: Dict[str, Any]) -> int:
+    """估算单个实体的大小(字节)"""
+    # 粗略估算:JSON 序列化后的字符串长度
+    return len(json.dumps(entity, ensure_ascii=False))
+
+
+def utf8_len(text: str) -> int:
+    """返回字符串的 UTF-8 字节长度。"""
+    return len((text or "").encode("utf-8"))
+
+
+def truncate_utf8(text: str, max_bytes: int) -> str:
+    """
+    按 UTF-8 字节长度安全截断字符串,不截断到半个字符。
+    """
+    content = str(text or "")
+    raw = content.encode("utf-8")
+    if len(raw) <= max_bytes:
+        return content
+    return raw[:max_bytes].decode("utf-8", errors="ignore")
+
+
+def is_markdown_table_block(block: str) -> bool:
+    """判断文本块是否为 Markdown 表格。"""
+    lines = [line.strip() for line in str(block or "").splitlines() if line.strip()]
+    if len(lines) < 2:
+        return False
+
+    header = lines[0]
+    separator = lines[1]
+    return (
+        "|" in header
+        and "|" in separator
+        and set(separator.replace("|", "").replace(":", "").replace("-", "").strip()) == set()
+    )
+
+
+def is_markdown_table_separator_line(line: str) -> bool:
+    """判断是否为 Markdown 表格分隔行(如 |---|:---:|)。"""
+    stripped = str(line or "").strip()
+    if "|" not in stripped:
+        return False
+    core = stripped.replace("|", "").replace(":", "").replace("-", "").strip()
+    return core == ""
+
+
+def split_content_preserve_table_blocks(content: str) -> List[Tuple[str, str]]:
+    """
+    将内容拆为 text/table 块,避免混合段落场景下误切表格。
+
+    Returns:
+        [(block_type, block_text)],block_type 取值 text/table
+    """
+    lines = str(content or "").splitlines()
+    blocks: List[Tuple[str, str]] = []
+    text_buffer: List[str] = []
+    i = 0
+
+    def flush_text_buffer() -> None:
+        if not text_buffer:
+            return
+        text = "\n".join(text_buffer).strip()
+        if text:
+            blocks.append(("text", text))
+        text_buffer.clear()
+
+    while i < len(lines):
+        current = lines[i]
+        next_line = lines[i + 1] if i + 1 < len(lines) else ""
+        current_stripped = current.strip()
+        next_stripped = next_line.strip()
+
+        # 识别表格起始:表头行 + 分隔行
+        if current_stripped and "|" in current_stripped and is_markdown_table_separator_line(next_stripped):
+            flush_text_buffer()
+
+            table_lines = [current, next_line]
+            i += 2
+
+            # 继续收集表格数据行(空行结束;或非管道行结束)
+            while i < len(lines):
+                row = lines[i]
+                row_stripped = row.strip()
+                if not row_stripped:
+                    break
+                if "|" not in row_stripped:
+                    break
+                table_lines.append(row)
+                i += 1
+
+            table_text = "\n".join(table_lines).strip()
+            if table_text:
+                blocks.append(("table", table_text))
+            continue
+
+        text_buffer.append(current)
+        i += 1
+
+    flush_text_buffer()
+    return blocks
+
+
+def split_markdown_table_block(block: str, max_length: int) -> List[str]:
+    """
+    按“行”切分 Markdown 表格,尽量保持表格结构完整。
+    当单行超长时,才会做硬切分兜底。
+    """
+    lines = [line for line in str(block or "").splitlines() if line.strip()]
+    if len(lines) < 2:
+        return [block.strip()] if block.strip() else []
+
+    header_lines = [lines[0], lines[1]]
+    data_lines = lines[2:]
+    header_text = "\n".join(header_lines)
+    header_len = utf8_len(header_text)
+
+    # 表头本身超长,退化为硬切(极端情况)
+    if header_len >= max_length:
+        raw = str(block).encode("utf-8")
+        fallback_chunks: List[str] = []
+        start = 0
+        while start < len(raw):
+            sub = raw[start:start + max_length].decode("utf-8", errors="ignore").strip()
+            if sub:
+                fallback_chunks.append(sub)
+                start += len(sub.encode("utf-8"))
+            else:
+                start += max_length
+        return fallback_chunks
+
+    chunks: List[str] = []
+    current_rows: List[str] = []
+
+    for row in data_lines:
+        candidate_rows = current_rows + [row]
+        candidate = f"{header_text}\n" + "\n".join(candidate_rows)
+        if utf8_len(candidate) <= max_length:
+            current_rows = candidate_rows
+            continue
+
+        if current_rows:
+            chunks.append(f"{header_text}\n" + "\n".join(current_rows))
+            current_rows = []
+
+        # 单行超长时兜底硬切
+        row_with_header = f"{header_text}\n{row}"
+        if utf8_len(row_with_header) <= max_length:
+            current_rows = [row]
+        else:
+            raw = row.encode("utf-8")
+            start = 0
+            while start < len(raw):
+                sub = raw[start:start + max_length - header_len - 1].decode("utf-8", errors="ignore").strip()
+                if sub:
+                    chunks.append(f"{header_text}\n{sub}")
+                    start += len(sub.encode("utf-8"))
+                else:
+                    start += max(1, max_length - header_len - 1)
+
+    if current_rows:
+        chunks.append(f"{header_text}\n" + "\n".join(current_rows))
+
+    return [chunk.strip() for chunk in chunks if chunk.strip()]
+
+
+def split_text_for_milvus(text: str, max_length: int = TEXT_SAFE_MAX_LENGTH) -> List[str]:
+    """将超长文本切分为 Milvus VARCHAR 可接受的片段。"""
+    content = str(text or "").strip()
+    if not content:
+        return []
+    if utf8_len(content) <= max_length:
+        return [content]
+
+    chunks: List[str] = []
+    current = ""
+    blocks = split_content_preserve_table_blocks(content)
+
+    for block_type, block_text in blocks:
+        block_text = block_text.strip()
+        if not block_text:
+            continue
+
+        if block_type == "table":
+            # 先落盘已累计文本,避免与表格拼接后超限
+            if current:
+                chunks.append(current)
+                current = ""
+
+            if utf8_len(block_text) <= max_length:
+                chunks.append(block_text)
+                continue
+
+            table_chunks = split_markdown_table_block(block_text, max_length=max_length)
+            if table_chunks:
+                chunks.extend(table_chunks)
+                continue
+
+        # text 块按段落继续拼接,保持原有逻辑
+        parts = block_text.split("\n\n")
+        for part in parts:
+            part = part.strip()
+            if not part:
+                continue
+            candidate = f"{current}\n\n{part}" if current else part
+            if utf8_len(candidate) <= max_length:
+                current = candidate
+                continue
+
+            if current:
+                chunks.append(current)
+                current = ""
+
+            if utf8_len(part) <= max_length:
+                current = part
+                continue
+
+            # 单段落超过上限时,按字节安全硬切
+            raw = part.encode("utf-8")
+            start = 0
+            while start < len(raw):
+                sub = raw[start:start + max_length].decode("utf-8", errors="ignore").strip()
+                if sub:
+                    chunks.append(sub)
+                    start += len(sub.encode("utf-8"))
+                else:
+                    # 兜底推进,避免极端情况下死循环
+                    start += max_length
+
+    if current:
+        chunks.append(current)
+
+    return chunks
+
+
+def normalize_rows_for_text_limit(rows: List[Dict[str, Any]], row_type: str) -> List[Dict[str, Any]]:
+    """将 text 超长的 parent/child 行自动拆分,并重排 index。"""
+    if not rows:
+        return []
+
+    normalized: List[Dict[str, Any]] = []
+    new_index = 0
+    split_count = 0
+
+    for row in rows:
+        text_chunks = split_text_for_milvus(row.get("text", ""))
+        if not text_chunks:
+            continue
+
+        if len(text_chunks) > 1:
+            split_count += len(text_chunks) - 1
+
+        for chunk in text_chunks:
+            new_row = dict(row)
+            new_row["text"] = truncate_utf8(chunk, MILVUS_VARCHAR_MAX_LENGTH)
+            new_row["index"] = new_index
+            normalized.append(new_row)
+            new_index += 1
+
+    if split_count:
+        print(f"  ✂️  {row_type} 超长文本切分新增 {split_count} 行(VARCHAR 保护)")
+
+    return normalized
+
+
+def split_into_batches(entities: List[Dict[str, Any]], max_batch_size: int = BATCH_SIZE) -> List[List[Dict[str, Any]]]:
+    """
+    将实体列表分批,确保每批不超过最大大小限制
+    
+    Args:
+        entities: 实体列表
+        max_batch_size: 每批最大行数
+    
+    Returns:
+        分批后的实体列表
+    """
+    if not entities:
+        return []
+    
+    batches = []
+    current_batch = []
+    current_batch_size = 0
+    
+    for entity in entities:
+        entity_size = estimate_entity_size(entity)
+        
+        # 如果当前批次已满(按行数或大小),或者加上当前实体会超限
+        if (len(current_batch) >= max_batch_size or 
+            current_batch_size + entity_size > MAX_GRPC_MESSAGE_SIZE):
+            if current_batch:
+                batches.append(current_batch)
+            current_batch = [entity]
+            current_batch_size = entity_size
+        else:
+            current_batch.append(entity)
+            current_batch_size += entity_size
+    
+    # 添加最后一批
+    if current_batch:
+        batches.append(current_batch)
+    
+    return batches
+
+
+def insert_with_retry(
+    client: MilvusClient, 
+    collection_name: str, 
+    entities: List[Dict[str, Any]],
+    folder_name: str,
+    entity_type: str = "数据"
+) -> Tuple[int, str | None]:
+    """
+    带重试机制的插入操作
+    
+    Args:
+        client: Milvus 客户端
+        collection_name: Collection 名称
+        entities: 要插入的实体列表
+        folder_name: 文件夹名称(用于日志)
+        entity_type: 实体类型描述
+    
+    Returns:
+        (成功插入数量, 错误信息)
+    """
+    for attempt in range(MAX_RETRY_ATTEMPTS):
+        try:
+            client.insert(collection_name=collection_name, data=entities)
+            return len(entities), None
+        except Exception as e:
+            error_msg = str(e)
+            print(f"  ⚠️  {folder_name} {entity_type} 插入失败 (尝试 {attempt + 1}/{MAX_RETRY_ATTEMPTS}): {error_msg[:100]}...")
+            
+            if attempt < MAX_RETRY_ATTEMPTS - 1:
+                time.sleep(RETRY_DELAY_SECONDS * (attempt + 1))  # 递增延迟
+            else:
+                return 0, error_msg
+    
+    return 0, "所有重试都失败"
+
+
+def insert_batch_entities(
+    client: MilvusClient,
+    collection_name: str,
+    entities: List[Dict[str, Any]],
+    folder_name: str,
+    entity_type: str = "数据"
+) -> Tuple[int, str | None]:
+    """
+    分批插入实体
+    
+    Args:
+        client: Milvus 客户端
+        collection_name: Collection 名称
+        entities: 要插入的实体列表
+        folder_name: 文件夹名称
+        entity_type: 实体类型描述
+    
+    Returns:
+        (成功插入数量, 错误信息)
+    """
+    if not entities:
+        return 0, None
+    
+    # 分批
+    batches = split_into_batches(entities)
+    total_inserted = 0
+    
+    for batch_idx, batch in enumerate(batches):
+        if _pause_requested:
+            save_progress()
+            print("\n⏸️  已暂停,保存进度后退出")
+            sys.exit(0)
+        
+        inserted, error = insert_with_retry(
+            client, collection_name, batch, folder_name, entity_type
+        )
+        
+        if error:
+            return total_inserted, f"批次 {batch_idx + 1}/{len(batches)} 失败: {error}"
+        
+        total_inserted += inserted
+        
+        # 显示进度
+        if len(batches) > 1:
+            print(f"  📦 {folder_name} {entity_type} 批次 {batch_idx + 1}/{len(batches)} 完成 ({len(batch)} 行)")
+    
+    return total_inserted, None
+
 
 def build_metadata(doc_data: Dict[str, Any], hierarchy: str, file_url: str, file_name: str) -> Dict[str, Any]:
     """
@@ -93,22 +580,29 @@ def build_metadata(doc_data: Dict[str, Any], hierarchy: str, file_url: str, file
     professional_field = PROFESSIONAL_FIELD_MAP.get(professional_field_raw, professional_field_raw)
     
     return {
-        "file_name": file_name,  # 原始文件名
-        "chinese_name": doc_data.get("chinese_name", ""),  # 中文名称
+        "file_name": file_name,
+        "chinese_name": doc_data.get("chinese_name", ""),
         "standard_number": doc_data.get("standard_number", ""),
         "issuing_authority": doc_data.get("issuing_authority", ""),
-        "document_type": document_type,  # 简写
-        "professional_field": professional_field,  # 简写
+        "document_type": document_type,
+        "professional_field": professional_field,
         "hierarchy": hierarchy,
         "file_url": file_url,
-        "plan_type_list": {}  # 空 JSON
+        "plan_type_list": {}
     }
 
 
-def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows: List[Dict[str, Any]], 
-                       doc_data: Dict[str, Any], doc_id: str, folder_name: str, file_name: str) -> tuple[int, str | None]:
+def insert_parent_rows(
+    client: MilvusClient, 
+    collection_name: str, 
+    parent_rows: List[Dict[str, Any]], 
+    doc_data: Dict[str, Any], 
+    doc_id: str, 
+    folder_name: str, 
+    file_name: str
+) -> Tuple[int, str | None]:
     """
-    插入 parent 数据到 Milvus。
+    插入 parent 数据到 Milvus(支持分批)
     
     Args:
         client: Milvus 客户端
@@ -117,10 +611,12 @@ def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows:
         doc_data: doc 数据
         doc_id: 文档 ID
         folder_name: 文件夹名称
+        file_name: 原始文件名
     
     Returns:
-        插入的行数
+        (插入的行数, 错误信息)
     """
+    parent_rows = normalize_rows_for_text_limit(parent_rows, "parent")
     if not parent_rows:
         return 0, None
     
@@ -154,18 +650,21 @@ def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows:
         }
         entities.append(entity)
     
-    try:
-        client.insert(collection_name=collection_name, data=entities)
-        return len(entities), None
-    except Exception as e:
-        print(f"📁 {folder_name} ❌ 插入 parent 失败: {e}")
-        return 0, str(e)
+    # 分批插入
+    return insert_batch_entities(client, collection_name, entities, folder_name, "parent")
 
 
-def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: List[Dict[str, Any]], 
-                      doc_data: Dict[str, Any], doc_id: str, folder_name: str, file_name: str) -> tuple[int, str | None]:
+def insert_child_rows(
+    client: MilvusClient, 
+    collection_name: str, 
+    child_rows: List[Dict[str, Any]], 
+    doc_data: Dict[str, Any], 
+    doc_id: str, 
+    folder_name: str, 
+    file_name: str
+) -> Tuple[int, str | None]:
     """
-    插入 children 数据到 Milvus。
+    插入 children 数据到 Milvus(支持分批)
     
     Args:
         client: Milvus 客户端
@@ -174,10 +673,12 @@ def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: Li
         doc_data: doc 数据
         doc_id: 文档 ID
         folder_name: 文件夹名称
+        file_name: 原始文件名
     
     Returns:
-        插入的行数
+        (插入的行数, 错误信息)
     """
+    child_rows = normalize_rows_for_text_limit(child_rows, "children")
     if not child_rows:
         return 0, None
     
@@ -211,43 +712,107 @@ def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: Li
         }
         entities.append(entity)
     
+    # 分批插入
+    return insert_batch_entities(client, collection_name, entities, folder_name, "children")
+
+
+def cleanup_document_data(
+    client: MilvusClient,
+    parent_collection_name: str,
+    child_collection_name: str,
+    doc_id: str,
+    folder_name: str,
+) -> str | None:
+    """
+    按 document_id 清理父子集合旧数据,保证幂等重入。
+
+    Returns:
+        错误信息;成功返回 None
+    """
+    escaped_doc_id = str(doc_id or "").replace("\\", "\\\\").replace("'", "\\'")
+    if not escaped_doc_id:
+        return "doc_id 为空,无法执行清理"
+
+    filter_expr = f"document_id == '{escaped_doc_id}'"
+
     try:
-        client.insert(collection_name=collection_name, data=entities)
-        return len(entities), None
+        client.delete(collection_name=parent_collection_name, filter=filter_expr)
+        client.delete(collection_name=child_collection_name, filter=filter_expr)
+        print(f"  🧹 {folder_name} 已清理 document_id={doc_id} 的历史数据")
+        return None
     except Exception as e:
-        print(f"📁 {folder_name} ❌ 插入 children 失败: {e}")
-        return 0, str(e)
+        return f"清理历史数据失败: {e}"
 
 
-def process_folder(root_folder: str | Path) -> Dict[str, Any]:
+def process_folder(
+    root_folder: str | Path,
+    resume: bool = True
+) -> Dict[str, Any]:
     """
     处理文件夹结构,导入 Milvus。
     
     Args:
         root_folder: 根目录路径
+        resume: 是否从上次进度恢复
     
     Returns:
         统计信息字典
     """
+    global _current_progress
+    
     root_folder = Path(root_folder)
     if not root_folder.is_dir():
         raise NotADirectoryError(f"不是有效的文件夹: {root_folder}")
     
     client = get_milvusclient()
     
-    stats = {"success": 0, "failed": 0, "skipped": 0, "parent_rows": 0, "child_rows": 0, "failed_items": []}
+    # 加载进度(如果恢复模式)
+    progress = load_progress() if resume else {"processed_folders": [], "current_folder": None, "stats": None}
+    processed_folders = set(progress.get("processed_folders", []))
     
-    for subfolder in sorted(root_folder.iterdir()):
-        if not subfolder.is_dir():
-            continue
+    # 获取所有子文件夹
+    all_subfolders = [f for f in root_folder.iterdir() if f.is_dir()]
+    all_subfolders.sort()
+    
+    # 过滤已处理的
+    subfolders_to_process = [f for f in all_subfolders if f.name not in processed_folders]
+    
+    total_folders = len(all_subfolders)
+    remaining_folders = len(subfolders_to_process)
+    
+    print(f"📊 总文件夹数: {total_folders}, 已处理: {total_folders - remaining_folders}, 待处理: {remaining_folders}")
+    print("-" * 60)
+    
+    stats = {
+        "success": progress.get("stats", {}).get("success", 0) if progress.get("stats") else 0,
+        "failed": progress.get("stats", {}).get("failed", 0) if progress.get("stats") else 0,
+        "skipped": progress.get("stats", {}).get("skipped", 0) if progress.get("stats") else 0,
+        "parent_rows": progress.get("stats", {}).get("parent_rows", 0) if progress.get("stats") else 0,
+        "child_rows": progress.get("stats", {}).get("child_rows", 0) if progress.get("stats") else 0,
+        "failed_items": progress.get("stats", {}).get("failed_items", []) if progress.get("stats") else [],
+    }
+    
+    for idx, subfolder in enumerate(subfolders_to_process):
+        # 检查暂停请求
+        if _pause_requested:
+            save_progress()
+            print("\n⏸️  已暂停,保存进度后退出")
+            sys.exit(0)
         
         folder_name = subfolder.name
+        progress_pct = ((idx + total_folders - remaining_folders + 1) / total_folders) * 100
+        
+        # 更新当前进度
+        _current_progress["current_folder"] = folder_name
+        _current_progress["stats"] = stats
         
         # 查找 JSON 文件
         json_files = list(subfolder.glob("*.json"))
         if not json_files:
-            print(f"📁 {folder_name} ⊘ 无JSON文件")
+            print(f"[{idx + total_folders - remaining_folders + 1}/{total_folders} {progress_pct:.1f}%] 📁 {folder_name} ⊘ 无JSON文件")
             stats["skipped"] += 1
+            processed_folders.add(folder_name)
+            _current_progress["processed_folders"] = list(processed_folders)
             continue
         
         json_path = json_files[0]
@@ -262,13 +827,15 @@ def process_folder(root_folder: str | Path) -> Dict[str, Any]:
             child_rows = data.get("children", [])
             
             if not doc_data or not doc_data.get("id"):
-                print(f"📁 {folder_name} ❌ JSON格式错误或缺少doc/id")
+                print(f"[{idx + total_folders - remaining_folders + 1}/{total_folders} {progress_pct:.1f}%] 📁 {folder_name} ❌ JSON格式错误或缺少doc/id")
                 stats["failed_items"].append({
                     "folder": folder_name,
                     "error": "JSON格式错误或缺少doc/id",
                     "reason": "JSON格式错误或缺少doc/id",
                 })
                 stats["failed"] += 1
+                processed_folders.add(folder_name)
+                _current_progress["processed_folders"] = list(processed_folders)
                 continue
             
             doc_id = doc_data.get("id")
@@ -283,66 +850,146 @@ def process_folder(root_folder: str | Path) -> Dict[str, Any]:
             
             file_name = original_file.stem if original_file else folder_name
             
+            # 显示处理信息
+            print(f"[{idx + total_folders - remaining_folders + 1}/{total_folders} {progress_pct:.1f}%] 📁 {folder_name} (parent: {len(parent_rows)}, child: {len(child_rows)})")
+
+            # 每个文档先清理再写入(幂等)
+            cleanup_error = cleanup_document_data(
+                client,
+                PARENT_COLLECTION_NAME,
+                CHILD_COLLECTION_NAME,
+                doc_id,
+                folder_name,
+            )
+            if cleanup_error:
+                stats["failed_items"].append({
+                    "folder": folder_name,
+                    "error": "文档清理失败",
+                    "reason": cleanup_error,
+                })
+                stats["failed"] += 1
+                processed_folders.add(folder_name)
+                _current_progress["processed_folders"] = list(processed_folders)
+                _current_progress["stats"] = stats
+                continue
+            
             # 插入 parent 和 children
             parent_count, parent_error = insert_parent_rows(
                 client, PARENT_COLLECTION_NAME, parent_rows, doc_data, doc_id, folder_name, file_name
             )
-            child_count, child_error = insert_child_rows(
-                client, CHILD_COLLECTION_NAME, child_rows, doc_data, doc_id, folder_name, file_name
-            )
+            
+            if _pause_requested:
+                save_progress()
+                print("\n⏸️  已暂停,保存进度后退出")
+                sys.exit(0)
 
+            # 父失败则不插子,避免脏数据
+            child_count = 0
+            child_error = None
+            if parent_error:
+                child_error = "parent 入库失败,已跳过 child 入库"
+            else:
+                child_count, child_error = insert_child_rows(
+                    client, CHILD_COLLECTION_NAME, child_rows, doc_data, doc_id, folder_name, file_name
+                )
+            
+            has_error = False
             if parent_error:
                 stats["failed_items"].append({
                     "folder": folder_name,
                     "error": "parent 入库失败",
                     "reason": parent_error,
                 })
-                stats["failed"] += 1
+                has_error = True
+            
             if child_error:
                 stats["failed_items"].append({
                     "folder": folder_name,
                     "error": "child 入库失败",
                     "reason": child_error,
                 })
-                stats["failed"] += 1
+                has_error = True
             
-            if parent_count > 0 or child_count > 0:
-                print(f"📁 {folder_name} ✅ parent: {parent_count}, child: {child_count}")
+            if has_error:
+                stats["failed"] += 1
+            elif parent_count > 0 or child_count > 0:
+                print(f"  ✅ parent: {parent_count}, child: {child_count}")
                 stats["success"] += 1
                 stats["parent_rows"] += parent_count
                 stats["child_rows"] += child_count
             else:
-                print(f"📁 {folder_name} ⊘ 无数据入库")
+                print(f"  ⊘ 无数据入库")
                 stats["skipped"] += 1
+            
+            # 标记为已处理
+            processed_folders.add(folder_name)
+            _current_progress["processed_folders"] = list(processed_folders)
+            _current_progress["stats"] = stats
+            
+            # 每处理10个文件夹保存一次进度
+            if (idx + 1) % 10 == 0:
+                save_progress()
                 
         except Exception as e:
             error_message = str(e)
-            print(f"📁 {folder_name} ❌ {error_message}")
+            print(f"[{idx + total_folders - remaining_folders + 1}/{total_folders} {progress_pct:.1f}%] 📁 {folder_name} ❌ {error_message[:100]}")
             stats["failed_items"].append({
                 "folder": folder_name,
                 "error": "未知错误",
                 "reason": error_message,
             })
             stats["failed"] += 1
+            processed_folders.add(folder_name)
+            _current_progress["processed_folders"] = list(processed_folders)
     
     return stats
 
 
 def main():
     """主函数"""
+    global _current_progress
+    
+    # 检查是否有待恢复的进度
+    progress = load_progress()
+    resume = False
+    
+    if progress.get("processed_folders"):
+        print("=" * 60)
+        print("💾 发现未完成的进度记录")
+        print(f"   已处理文件夹数: {len(progress.get('processed_folders', []))}")
+        print(f"   上次处理: {progress.get('current_folder', 'N/A')}")
+        print("=" * 60)
+        
+        # 询问是否恢复
+        user_input = input("是否从上次进度继续? [Y/n]: ").strip().lower()
+        if user_input in ("", "y", "yes"):
+            resume = True
+            print("🔄 将从上次进度继续...")
+        else:
+            print("🆕 重新开始处理...")
+            clear_progress()
+            progress = {"processed_folders": [], "current_folder": None, "stats": None}
+    
+    _current_progress = progress if resume else {"processed_folders": [], "current_folder": None, "stats": None}
+    
     try:
         print(f"🔍 开始导入 Milvus...")
         print(f"📂 根目录: {ROOT_FOLDER}")
         print(f"🔗 Milvus: {settings.MILVUS_HOST}:{settings.MILVUS_PORT}")
         print(f"📊 Parent Collection: {PARENT_COLLECTION_NAME}")
         print(f"📊 Child Collection: {CHILD_COLLECTION_NAME}")
+        print(f"📦 分批大小: {BATCH_SIZE} 行/批")
         print("-" * 60)
         
-        stats = process_folder(ROOT_FOLDER)
-
+        stats = process_folder(ROOT_FOLDER, resume=resume)
+        
+        # 保存失败报告
         with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
             json.dump({"failed": stats["failed_items"]}, f, ensure_ascii=False, indent=2)
         
+        # 处理完成,清除进度文件
+        clear_progress()
+        
         print("\n" + "=" * 60)
         print(f"✅ 成功: {stats['success']} | ❌ 失败: {stats['failed']} | ⊘ 跳过: {stats['skipped']}")
         print(f"📊 Parent 行数: {stats['parent_rows']} | Child 行数: {stats['child_rows']}")
@@ -350,7 +997,11 @@ def main():
         print("=" * 60)
         
     except Exception as e:
-        print(f"❌ 错误: {str(e)}")
+        # 发生错误时保存进度
+        save_progress()
+        print(f"\n❌ 错误: {str(e)}")
+        print(f"💾 进度已保存,可重新运行脚本恢复")
+        raise
 
 
 if __name__ == "__main__":