Просмотр исходного кода

fix: 施工方案milvus插入,添加断点续传功能,清理脏数据

ai02 4 недель назад
Родитель
Сommit
eab3020633
2 измененных файлов с 1341 добавлено и 44 удалено
  1. 235 44
      src/app/scripts/plan_info_in_collection.py
  2. 1106 0
      src/app/scripts/plan_info_in_collection_v2.py

+ 235 - 44
src/app/scripts/plan_info_in_collection.py

@@ -1,13 +1,20 @@
 """
 将 JSON 文件中的 parent 和 children 数据插入 Milvus Collection。
 读取每个子文件夹的 JSON,解析 doc、parent、children 数组并构造入库数据。
+
+更新功能:
+- 显示文件上传进度
+- 支持 Ctrl+C 手动停止
+- 断点续传:自动跳过已完整上传的文件
 """
 from __future__ import annotations
 
 import json
+import signal
+import sys
 from datetime import datetime
 from pathlib import Path
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Set
 
 from pymilvus import MilvusClient
 
@@ -16,11 +23,14 @@ from app.config.milvus_client import get_milvusclient
 from app.config.setting import settings
 
 # 根目录配置
-ROOT_FOLDER = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终施工方案"
+ROOT_FOLDER = r"F:\第二阶段编制依据及施工方案数据治理-20260206\error"
 
 # 失败汇总JSON保存路径
 FAILED_REPORT_PATH = r"F:\第二阶段编制依据及施工方案数据治理-20260206\plan_collection_failed_report.json"
 
+# 进度保存文件路径(用于断点续传)
+PROGRESS_FILE_PATH = r"F:\第二阶段编制依据及施工方案数据治理-20260206\plan_collection_progress.json"
+
 # Collection 名称
 PARENT_COLLECTION_NAME = "t_rag_kng_construction_plan_parent"
 CHILD_COLLECTION_NAME = "t_rag_kng_construction_plan"
@@ -31,6 +41,48 @@ DEFAULT_USER_ID = "ed6a79d3-0083-4d81-8b48-fc522f686f74"
 # MinIO URL 前缀
 PREFIX = "/plan"
 
+# Milvus text 字段最大长度限制 (按字节计算,UTF-8中文占3字节)
+MAX_TEXT_BYTES = 60000
+
+# 批量插入批次大小(避免gRPC消息过大)
+BATCH_SIZE = 2000
+
+
+def truncate_text_by_bytes(text: str, max_bytes: int = MAX_TEXT_BYTES) -> str:
+    """
+    按字节长度截断文本,确保不超过Milvus的varchar限制
+    
+    Args:
+        text: 原始文本
+        max_bytes: 最大字节数
+    
+    Returns:
+        截断后的文本
+    """
+    encoded = text.encode('utf-8')
+    if len(encoded) <= max_bytes:
+        return text
+    
+    # 截断到最大字节数
+    truncated = encoded[:max_bytes]
+    
+    # 尝试解码,如果最后有不完整的UTF-8字符,会抛出异常
+    try:
+        return truncated.decode('utf-8')
+    except UnicodeDecodeError:
+        # 逐步减少字节数直到能正确解码
+        for i in range(1, 10):
+            try:
+                return truncated[:-i].decode('utf-8')
+            except UnicodeDecodeError:
+                continue
+        # 如果还是不行,使用 ignore 错误处理
+        return truncated.decode('utf-8', errors='ignore')
+
+# 全局变量用于信号处理
+_should_stop = False
+_progress_data: Dict[str, Any] = {}
+
 # 枚举简写映射(匹配不到统一用 "QT")
 PLAN_CATEGORY_MAP = {
     "超危大方案": "CH",
@@ -154,6 +206,72 @@ def build_metadata(doc_data: Dict[str, Any], hierarchy: str, file_url: str, file
     }
 
 
+def load_progress() -> Dict[str, Any]:
+    """加载处理进度"""
+    try:
+        with open(PROGRESS_FILE_PATH, "r", encoding="utf-8") as f:
+            return json.load(f)
+    except FileNotFoundError:
+        return {"completed": [], "total_completed": 0}
+    except Exception as e:
+        print(f"⚠️ 加载进度文件失败: {e}")
+        return {"completed": [], "total_completed": 0}
+
+
+def save_progress(progress: Dict[str, Any]):
+    """保存处理进度到文件"""
+    try:
+        with open(PROGRESS_FILE_PATH, "w", encoding="utf-8") as f:
+            json.dump(progress, f, ensure_ascii=False, indent=2)
+    except Exception as e:
+        print(f"⚠️ 保存进度文件失败: {e}")
+
+
+def get_uploaded_doc_ids(client: MilvusClient) -> Set[str]:
+    """
+    从 Milvus 查询已上传的文档ID。
+    只在启动时调用一次,避免影响上传速度。
+    """
+    uploaded_ids = set()
+    try:
+        # 从子表查询所有已存在的 document_id
+        # 使用 query_iterator 避免 offset+limit 超过 16384 的限制
+        iterator = client.query_iterator(
+            collection_name=CHILD_COLLECTION_NAME,
+            filter="is_deleted == false",
+            output_fields=["document_id"],
+            batch_size=1000
+        )
+        
+        while True:
+            results = iterator.next()
+            if not results:
+                break
+            
+            for item in results:
+                doc_id = item.get("document_id")
+                if doc_id:
+                    uploaded_ids.add(doc_id)
+        
+        iterator.close()
+            
+    except Exception as e:
+        print(f"⚠️ 查询已上传文档失败: {e}")
+    
+    return uploaded_ids
+
+
+def signal_handler(signum, frame):
+    """处理 Ctrl+C 信号,优雅退出"""
+    global _should_stop
+    if not _should_stop:
+        _should_stop = True
+        print("\n\n⚠️ 检测到停止信号,正在保存进度并退出...")
+        print("💡 已处理的文件已记录,下次运行将自动跳过")
+        save_progress(_progress_data)
+        sys.exit(0)
+
+
 def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows: List[Dict[str, Any]], 
                        doc_data: Dict[str, Any], doc_id: str, folder_name: str, file_name: str) -> tuple[int, str | None]:
     """
@@ -179,8 +297,17 @@ def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows:
     file_url = f"{PREFIX}/{doc_id}.md"
     now_ts = int(datetime.now().timestamp())
     
-    # 批量提取所有文本
-    texts = [row.get("text", "") for row in parent_rows]
+    # 批量提取所有文本,并截断超长文本(按字节)
+    texts = []
+    for row in parent_rows:
+        text = row.get("text", "")
+        text_bytes = text.encode('utf-8')
+        if len(text_bytes) > MAX_TEXT_BYTES:
+            original_chars = len(text)
+            text = truncate_text_by_bytes(text, MAX_TEXT_BYTES)
+            print(f"\n⚠️  {folder_name} 发现超长文本({original_chars}字符/{len(text_bytes)}字节),已截断至{len(text.encode('utf-8'))}字节")
+        texts.append(text)
+    
     # 批量生成向量
     vectors = embeddings.embed_documents(texts)
     
@@ -203,12 +330,17 @@ def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows:
         }
         entities.append(entity)
     
+    # 分批插入,避免gRPC消息过大
+    total_inserted = 0
     try:
-        client.insert(collection_name=collection_name, data=entities)
-        return len(entities), None
+        for i in range(0, len(entities), BATCH_SIZE):
+            batch = entities[i:i + BATCH_SIZE]
+            client.insert(collection_name=collection_name, data=batch)
+            total_inserted += len(batch)
+        return total_inserted, None
     except Exception as e:
-        print(f"📁 {folder_name} ❌ 插入 parent 失败: {e}")
-        return 0, str(e)
+        print(f"\n📁 {folder_name} ❌ 插入 parent 失败: {e}")
+        return total_inserted, str(e)
 
 
 def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: List[Dict[str, Any]], 
@@ -236,8 +368,17 @@ def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: Li
     file_url = f"{PREFIX}/{doc_id}.md"
     now_ts = int(datetime.now().timestamp())
     
-    # 批量提取所有文本
-    texts = [row.get("text", "") for row in child_rows]
+    # 批量提取所有文本,并截断超长文本(按字节)
+    texts = []
+    for row in child_rows:
+        text = row.get("text", "")
+        text_bytes = text.encode('utf-8')
+        if len(text_bytes) > MAX_TEXT_BYTES:
+            original_chars = len(text)
+            text = truncate_text_by_bytes(text, MAX_TEXT_BYTES)
+            print(f"\n⚠️  {folder_name} 发现超长文本({original_chars}字符/{len(text_bytes)}字节),已截断至{len(text.encode('utf-8'))}字节")
+        texts.append(text)
+    
     # 批量生成向量
     vectors = embeddings.embed_documents(texts)
     
@@ -260,12 +401,17 @@ def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: Li
         }
         entities.append(entity)
     
+    # 分批插入,避免gRPC消息过大
+    total_inserted = 0
     try:
-        client.insert(collection_name=collection_name, data=entities)
-        return len(entities), None
+        for i in range(0, len(entities), BATCH_SIZE):
+            batch = entities[i:i + BATCH_SIZE]
+            client.insert(collection_name=collection_name, data=batch)
+            total_inserted += len(batch)
+        return total_inserted, None
     except Exception as e:
-        print(f"📁 {folder_name} ❌ 插入 children 失败: {e}")
-        return 0, str(e)
+        print(f"\n📁 {folder_name} ❌ 插入 children 失败: {e}")
+        return total_inserted, str(e)
 
 
 def process_folder(root_folder: str | Path) -> Dict[str, Any]:
@@ -278,50 +424,78 @@ def process_folder(root_folder: str | Path) -> Dict[str, Any]:
     Returns:
         统计信息字典
     """
+    global _progress_data, _should_stop
+    
     root_folder = Path(root_folder)
     if not root_folder.is_dir():
         raise NotADirectoryError(f"不是有效的文件夹: {root_folder}")
     
     client = get_milvusclient()
     
-    stats = {"success": 0, "failed": 0, "skipped": 0, "parent_rows": 0, "child_rows": 0, "failed_items": []}
+    # 加载进度
+    _progress_data = load_progress()
+    completed_set = set(_progress_data.get("completed", []))
     
-    for subfolder in sorted(root_folder.iterdir()):
-        if not subfolder.is_dir():
-            continue
-        
-        folder_name = subfolder.name
-        
-        # 查找 JSON 文件
+    # 获取所有文件夹列表
+    subfolders = [f for f in sorted(root_folder.iterdir()) if f.is_dir()]
+    total_folders = len(subfolders)
+    
+    # 查询已上传到 Milvus 的文档(启动时只查一次,避免影响速度)
+    print("🔍 正在查询已上传的文档...")
+    uploaded_doc_ids = get_uploaded_doc_ids(client)
+    print(f"📋 发现 {len(uploaded_doc_ids)} 个已上传文档")
+    
+    # 合并本地进度和 Milvus 中的记录
+    skip_set = completed_set | uploaded_doc_ids
+    
+    stats = {
+        "success": 0, 
+        "failed": 0, 
+        "skipped": 0, 
+        "parent_rows": 0, 
+        "child_rows": 0, 
+        "failed_items": []
+    }
+    
+    # 计算实际需要处理的文件夹
+    pending_folders = []
+    for subfolder in subfolders:
         json_files = list(subfolder.glob("*.json"))
         if not json_files:
-            print(f"📁 {folder_name} ⊘ 无JSON文件")
-            stats["skipped"] += 1
             continue
+        try:
+            with open(json_files[0], "r", encoding="utf-8") as f:
+                data = json.load(f)
+            doc_data = data.get("doc")
+            if doc_data and doc_data.get("id"):
+                doc_id = doc_data.get("id")
+                if doc_id not in skip_set:
+                    pending_folders.append((subfolder, doc_id, data))
+                else:
+                    stats["skipped"] += 1
+        except Exception:
+            pass
+    
+    pending_count = len(pending_folders)
+    processed_count = 0
+    
+    print(f"\n📊 进度: 共 {total_folders} 个文件夹,已跳过 {stats['skipped']} 个,待处理 {pending_count} 个")
+    print("-" * 60)
+    
+    for subfolder, doc_id, data in pending_folders:
+        # 检查是否需要停止
+        if _should_stop:
+            print("\n⚠️ 正在停止...")
+            break
         
-        json_path = json_files[0]
+        folder_name = subfolder.name
+        processed_count += 1
         
         try:
-            # 读取 JSON
-            with open(json_path, "r", encoding="utf-8") as f:
-                data = json.load(f)
-            
             doc_data = data.get("doc")
             parent_rows = data.get("parent", [])
             child_rows = data.get("children", [])
             
-            if not doc_data or not doc_data.get("id"):
-                print(f"📁 {folder_name} ❌ JSON格式错误或缺少doc/id")
-                stats["failed_items"].append({
-                    "folder": folder_name,
-                    "error": "JSON格式错误或缺少doc/id",
-                    "reason": "JSON格式错误或缺少doc/id",
-                })
-                stats["failed"] += 1
-                continue
-            
-            doc_id = doc_data.get("id")
-
             # 获取原始文件名(不含扩展名)
             all_files = list(subfolder.glob("*"))
             original_file = None
@@ -356,17 +530,26 @@ def process_folder(root_folder: str | Path) -> Dict[str, Any]:
                 stats["failed"] += 1
             
             if parent_count > 0 or child_count > 0:
-                print(f"📁 {folder_name} ✅ parent: {parent_count}, child: {child_count}")
+                print(f"[{processed_count}/{pending_count}] 📁 {folder_name} ✅ parent: {parent_count}, child: {child_count}")
                 stats["success"] += 1
                 stats["parent_rows"] += parent_count
                 stats["child_rows"] += child_count
+                
+                # 标记为已完成
+                completed_set.add(doc_id)
+                _progress_data["completed"] = list(completed_set)
+                _progress_data["total_completed"] = len(completed_set)
+                
+                # 每处理10个文件保存一次进度(平衡安全性和速度)
+                if processed_count % 10 == 0:
+                    save_progress(_progress_data)
             else:
-                print(f"📁 {folder_name} ⊘ 无数据入库")
+                print(f"[{processed_count}/{pending_count}] 📁 {folder_name} ⊘ 无数据入库")
                 stats["skipped"] += 1
                 
         except Exception as e:
             error_message = str(e)
-            print(f"📁 {folder_name} ❌ {error_message}")
+            print(f"[{processed_count}/{pending_count}] 📁 {folder_name} ❌ {error_message}")
             stats["failed_items"].append({
                 "folder": folder_name,
                 "error": "未知错误",
@@ -374,17 +557,25 @@ def process_folder(root_folder: str | Path) -> Dict[str, Any]:
             })
             stats["failed"] += 1
     
+    # 最终保存进度
+    save_progress(_progress_data)
+    
     return stats
 
 
 def main():
     """主函数"""
+    # 注册信号处理器
+    signal.signal(signal.SIGINT, signal_handler)
+    
     try:
         print(f"🔍 开始导入 Milvus...")
         print(f"📂 根目录: {ROOT_FOLDER}")
         print(f"🔗 Milvus: {settings.MILVUS_HOST}:{settings.MILVUS_PORT}")
         print(f"📊 Parent Collection: {PARENT_COLLECTION_NAME}")
         print(f"📊 Child Collection: {CHILD_COLLECTION_NAME}")
+        print(f"💾 进度文件: {PROGRESS_FILE_PATH}")
+        print("💡 提示: 按 Ctrl+C 可随时停止,下次运行将自动跳过已处理文件")
         print("-" * 60)
         
         stats = process_folder(ROOT_FOLDER)

+ 1106 - 0
src/app/scripts/plan_info_in_collection_v2.py

@@ -0,0 +1,1106 @@
+"""
+将 JSON 文件中的 parent 和 children 数据插入 Milvus Collection。
+读取每个子文件夹的 JSON,解析 doc、parent、children 数组并构造入库数据。
+支持断点续传:保存进度到文件,下次运行自动跳过已处理的文件夹。
+支持 Milvus 数据验证:检查进度文件中已记录的文件夹是否确实存在于 Milvus 中。
+"""
+from __future__ import annotations
+
+import json
+import os
+from datetime import datetime
+from pathlib import Path
+from typing import Any, Dict, List, Set, Tuple
+
+from pymilvus import MilvusClient
+
+from app.config.embeddings import get_embeddings
+from app.config.milvus_client import get_milvusclient
+from app.config.setting import settings
+
+# 根目录配置
+ROOT_FOLDER = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终施工方案"
+
+# 失败汇总JSON保存路径
+FAILED_REPORT_PATH = r"F:\第二阶段编制依据及施工方案数据治理-20260206\plan_collection_failed_report.json"
+
+# 进度保存文件路径
+PROGRESS_FILE_PATH = r"F:\第二阶段编制依据及施工方案数据治理-20260206\plan_collection_progress.json"
+
+# Collection 名称
+PARENT_COLLECTION_NAME = "t_rag_kng_construction_plan_parent"
+CHILD_COLLECTION_NAME = "t_rag_kng_construction_plan"
+
+# 默认创建人/修改人ID
+DEFAULT_USER_ID = "ed6a79d3-0083-4d81-8b48-fc522f686f74"
+
+# MinIO URL 前缀
+PREFIX = "/plan"
+
+# Milvus 字段与 embedding 输入保护
+MILVUS_VARCHAR_MAX_LENGTH = 65535
+EMBEDDING_MAX_INPUT_TOKENS = 16384
+EMBEDDING_TOKEN_SAFETY_RATIO = 0.75
+TEXT_SAFE_MAX_LENGTH = min(
+    MILVUS_VARCHAR_MAX_LENGTH,
+    int(EMBEDDING_MAX_INPUT_TOKENS * EMBEDDING_TOKEN_SAFETY_RATIO),
+)  # 12288
+
+# 批量插入批次大小(避免单次请求过大)
+BATCH_SIZE = 2000
+
+# 枚举简写映射(匹配不到统一用 "QT")
+PLAN_CATEGORY_MAP = {
+    "超危大方案": "CH",
+    "超危大方案较大Ⅱ级": "CH2",
+    "超危大方案较大II级": "CH2",
+    "超危大方案特大Ⅳ级": "CH4",
+    "超危大方案特大IV级": "CH4",
+    "超危大方案一般Ⅰ级": "CH1",
+    "超危大方案一般I级": "CH1",
+    "超危大方案重大Ⅲ级": "CH3",
+    "超危大方案重大III级": "CH3",
+    "危大方案": "WD",
+    "一般方案": "YB",
+    "其他": "QT",
+}
+
+LEVEL_1_CLASSIFICATION_MAP = {
+    "施工方案": "SC",
+    "其他": "QT",
+}
+
+LEVEL_2_CLASSIFICATION_MAP = {
+    "临建工程": "LZ",
+    "路基工程": "LJ",
+    "桥梁工程": "QL",
+    "隧道工程": "SD",
+    "其他": "QT",
+}
+
+LEVEL_3_CLASSIFICATION_MAP = {
+    "TBM施工": "TM",
+    "拌和站安、拆施工": "BH",
+    "不良地质隧道施工": "BL",
+    "常规桥梁": "CG",
+    "挡土墙工程类": "DT",
+    "辅助坑道施工": "FB",
+    "复杂洞口工程施工": "FD",
+    "钢筋加工场安、拆": "GG",
+    "钢栈桥施工": "GZ",
+    "拱桥": "GH",
+    "涵洞工程类": "HD",
+    "滑坡体处理类": "HP",
+    "路堤": "LT",
+    "路堑": "LQ",
+    "深基坑": "JK",
+    "隧道总体施工": "ZT",
+    "特殊结构隧道": "TS",
+    "斜拉桥": "XL",
+    "悬索桥": "XS",
+    "其他": "QT",
+}
+
+LEVEL_4_CLASSIFICATION_MAP = {
+    "挡土墙": "DT",
+    "顶管": "DG",
+    "断层破碎带及软弱围岩": "DL",
+    "钢筋砼箱涵": "GX",
+    "高填路堤": "GT",
+    "抗滑桩": "KH",
+    "软岩大变形隧道": "RY",
+    "上部结构": "SB",
+    "深基坑开挖与支护": "JK",
+    "深挖路堑": "LC",
+    "隧道TBM": "TM",
+    "隧道进洞": "JD",
+    "隧道竖井": "SJ",
+    "隧道斜井": "XJ",
+    "特种设备": "TZ",
+    "瓦斯隧道": "WS",
+    "下部结构": "XB",
+    "小净距隧道": "NJ",
+    "岩爆隧道": "YB",
+    "岩溶隧道": "YR",
+    "涌水突泥隧道": "YN",
+    "桩基础": "ZJ",
+    "其他": "QT",
+}
+
+
+def utf8_len(text: str) -> int:
+    """返回字符串的 UTF-8 字节长度。"""
+    return len((text or "").encode("utf-8"))
+
+
+def truncate_utf8(text: str, max_bytes: int) -> str:
+    """按 UTF-8 字节长度安全截断字符串。"""
+    content = str(text or "")
+    raw = content.encode("utf-8")
+    if len(raw) <= max_bytes:
+        return content
+    return raw[:max_bytes].decode("utf-8", errors="ignore")
+
+
+def is_markdown_table_separator_line(line: str) -> bool:
+    """判断是否为 Markdown 表格分隔行(如 |---|:---:|)。"""
+    stripped = str(line or "").strip()
+    if "|" not in stripped:
+        return False
+    core = stripped.replace("|", "").replace(":", "").replace("-", "").strip()
+    return core == ""
+
+
+def split_content_preserve_table_blocks(content: str) -> List[Tuple[str, str]]:
+    """
+    将内容拆为 text/table 块,避免混合段落场景下误切表格。
+    Returns: [(block_type, block_text)],block_type 取值 text/table
+    """
+    lines = str(content or "").splitlines()
+    blocks: List[Tuple[str, str]] = []
+    text_buffer: List[str] = []
+    i = 0
+
+    def flush_text_buffer() -> None:
+        if not text_buffer:
+            return
+        text = "\n".join(text_buffer).strip()
+        if text:
+            blocks.append(("text", text))
+        text_buffer.clear()
+
+    while i < len(lines):
+        current = lines[i]
+        next_line = lines[i + 1] if i + 1 < len(lines) else ""
+        current_stripped = current.strip()
+        next_stripped = next_line.strip()
+
+        if current_stripped and "|" in current_stripped and is_markdown_table_separator_line(next_stripped):
+            flush_text_buffer()
+            table_lines = [current, next_line]
+            i += 2
+            while i < len(lines):
+                row = lines[i]
+                row_stripped = row.strip()
+                if not row_stripped or "|" not in row_stripped:
+                    break
+                table_lines.append(row)
+                i += 1
+            table_text = "\n".join(table_lines).strip()
+            if table_text:
+                blocks.append(("table", table_text))
+            continue
+
+        text_buffer.append(current)
+        i += 1
+
+    flush_text_buffer()
+    return blocks
+
+
+def split_markdown_table_block(block: str, max_length: int) -> List[str]:
+    """按“行”切分 Markdown 表格,尽量保持结构完整。"""
+    lines = [line for line in str(block or "").splitlines() if line.strip()]
+    if len(lines) < 2:
+        return [block.strip()] if block.strip() else []
+
+    header_lines = [lines[0], lines[1]]
+    data_lines = lines[2:]
+    header_text = "\n".join(header_lines)
+    header_len = utf8_len(header_text)
+
+    if header_len >= max_length:
+        raw = str(block).encode("utf-8")
+        fallback_chunks: List[str] = []
+        start = 0
+        while start < len(raw):
+            sub = raw[start:start + max_length].decode("utf-8", errors="ignore").strip()
+            if sub:
+                fallback_chunks.append(sub)
+                start += len(sub.encode("utf-8"))
+            else:
+                start += max_length
+        return fallback_chunks
+
+    chunks: List[str] = []
+    current_rows: List[str] = []
+    for row in data_lines:
+        candidate_rows = current_rows + [row]
+        candidate = f"{header_text}\n" + "\n".join(candidate_rows)
+        if utf8_len(candidate) <= max_length:
+            current_rows = candidate_rows
+            continue
+
+        if current_rows:
+            chunks.append(f"{header_text}\n" + "\n".join(current_rows))
+            current_rows = []
+
+        row_with_header = f"{header_text}\n{row}"
+        if utf8_len(row_with_header) <= max_length:
+            current_rows = [row]
+        else:
+            raw = row.encode("utf-8")
+            start = 0
+            while start < len(raw):
+                sub = raw[start:start + max_length - header_len - 1].decode("utf-8", errors="ignore").strip()
+                if sub:
+                    chunks.append(f"{header_text}\n{sub}")
+                    start += len(sub.encode("utf-8"))
+                else:
+                    start += max(1, max_length - header_len - 1)
+
+    if current_rows:
+        chunks.append(f"{header_text}\n" + "\n".join(current_rows))
+
+    return [chunk.strip() for chunk in chunks if chunk.strip()]
+
+
+def split_text_for_milvus(text: str, max_length: int = TEXT_SAFE_MAX_LENGTH) -> List[str]:
+    """将超长文本切分为 Milvus 与 embedding 都可接受的片段。"""
+    content = str(text or "").strip()
+    if not content:
+        return []
+    if utf8_len(content) <= max_length:
+        return [content]
+
+    chunks: List[str] = []
+    current = ""
+    blocks = split_content_preserve_table_blocks(content)
+    for block_type, block_text in blocks:
+        block_text = block_text.strip()
+        if not block_text:
+            continue
+
+        if block_type == "table":
+            if current:
+                chunks.append(current)
+                current = ""
+            if utf8_len(block_text) <= max_length:
+                chunks.append(block_text)
+                continue
+            chunks.extend(split_markdown_table_block(block_text, max_length=max_length))
+            continue
+
+        parts = block_text.split("\n\n")
+        for part in parts:
+            part = part.strip()
+            if not part:
+                continue
+            candidate = f"{current}\n\n{part}" if current else part
+            if utf8_len(candidate) <= max_length:
+                current = candidate
+                continue
+            if current:
+                chunks.append(current)
+                current = ""
+            if utf8_len(part) <= max_length:
+                current = part
+                continue
+
+            raw = part.encode("utf-8")
+            start = 0
+            while start < len(raw):
+                sub = raw[start:start + max_length].decode("utf-8", errors="ignore").strip()
+                if sub:
+                    chunks.append(sub)
+                    start += len(sub.encode("utf-8"))
+                else:
+                    start += max_length
+
+    if current:
+        chunks.append(current)
+    return chunks
+
+
+def normalize_rows_for_text_limit(rows: List[Dict[str, Any]], row_type: str, folder_name: str) -> List[Dict[str, Any]]:
+    """将 text 超长的行自动拆分,并重排 index。"""
+    if not rows:
+        return []
+
+    normalized: List[Dict[str, Any]] = []
+    new_index = 0
+    split_count = 0
+    for row in rows:
+        text_chunks = split_text_for_milvus(row.get("text", ""))
+        if not text_chunks:
+            continue
+        if len(text_chunks) > 1:
+            split_count += len(text_chunks) - 1
+        for chunk in text_chunks:
+            new_row = dict(row)
+            new_row["text"] = truncate_utf8(chunk, MILVUS_VARCHAR_MAX_LENGTH)
+            new_row["index"] = new_index
+            normalized.append(new_row)
+            new_index += 1
+
+    if split_count:
+        print(f"\n✂️  {folder_name} {row_type} 超长文本切分新增 {split_count} 行(UTF-8 + token 保护)")
+    return normalized
+
+
+def load_progress(progress_file: str) -> Dict[str, Any]:
+    """
+    加载进度文件
+    
+    Returns:
+        包含已完成文件夹列表、统计信息的字典
+    """
+    if not os.path.exists(progress_file):
+        return {
+            "completed_folders": [],
+            "failed_folders": [],
+            "verified_folders": [],  # 已验证确实存在于 Milvus 的文件夹
+            "stats": {
+                "success": 0,
+                "failed": 0,
+                "skipped": 0,
+                "parent_rows": 0,
+                "child_rows": 0,
+            },
+            "last_update": None,
+        }
+    
+    try:
+        with open(progress_file, "r", encoding="utf-8") as f:
+            progress = json.load(f)
+            # 确保新字段存在(兼容旧进度文件)
+            if "verified_folders" not in progress:
+                progress["verified_folders"] = []
+            return progress
+    except Exception as e:
+        print(f"⚠️ 读取进度文件失败: {e},将重新开始")
+        return {
+            "completed_folders": [],
+            "failed_folders": [],
+            "verified_folders": [],
+            "stats": {
+                "success": 0,
+                "failed": 0,
+                "skipped": 0,
+                "parent_rows": 0,
+                "child_rows": 0,
+            },
+            "last_update": None,
+        }
+
+
+def save_progress(progress_file: str, progress: Dict[str, Any]) -> None:
+    """保存进度到文件"""
+    progress["last_update"] = datetime.now().isoformat()
+    try:
+        with open(progress_file, "w", encoding="utf-8") as f:
+            json.dump(progress, f, ensure_ascii=False, indent=2)
+    except Exception as e:
+        print(f"⚠️ 保存进度文件失败: {e}")
+
+
+def format_progress(current: int, total: int, folder_name: str = "") -> str:
+    """格式化进度字符串"""
+    percentage = (current / total * 100) if total > 0 else 0
+    bar_length = 30
+    filled = int(bar_length * current / total) if total > 0 else 0
+    bar = "█" * filled + "░" * (bar_length - filled)
+    return f"[{bar}] {current}/{total} ({percentage:.1f}%) - {folder_name}"
+
+
+def map_enum(value: str | None, mapping: Dict[str, str], default: str = "QT") -> str:
+    if not value:
+        return default
+    return mapping.get(value, default)
+
+
+def build_metadata(doc_data: Dict[str, Any], hierarchy: str, file_url: str, file_name: str) -> Dict[str, Any]:
+    """
+    构造 metadata 字段。
+    
+    Args:
+        doc_data: JSON 中的 doc 数据
+        hierarchy: 文档层级信息
+        file_url: 文件 URL
+        file_name: 原始文件名
+    
+    Returns:
+        metadata 字典
+    """
+    plan_category = map_enum(doc_data.get("plan_category"), PLAN_CATEGORY_MAP)
+    level_1_classification = map_enum(doc_data.get("level_1_classification"), LEVEL_1_CLASSIFICATION_MAP)
+    level_2_classification = map_enum(doc_data.get("level_2_classification"), LEVEL_2_CLASSIFICATION_MAP)
+    level_3_classification = map_enum(doc_data.get("level_3_classification"), LEVEL_3_CLASSIFICATION_MAP)
+    level_4_classification = map_enum(doc_data.get("level_4_classification"), LEVEL_4_CLASSIFICATION_MAP)
+
+    return {
+        "file_name": file_name,
+        "plan_name": doc_data.get("plan_name", ""),
+        "project_name": doc_data.get("project_name", ""),
+        "project_section": doc_data.get("project_section", ""),
+        "compiling_unit": doc_data.get("compiling_unit", ""),
+        "compiling_date": doc_data.get("compiling_date", ""),
+        "plan_summary": doc_data.get("plan_summary", ""),
+        "hierarchy": hierarchy,
+        "file_url": file_url,
+        "plan_category": plan_category,
+        "level_1_classification": level_1_classification,
+        "level_2_classification": level_2_classification,
+        "level_3_classification": level_3_classification,
+        "level_4_classification": level_4_classification,
+        "plan_type_list": {}  # 空 JSON
+    }
+
+
+def check_document_in_milvus(client: MilvusClient, doc_id: str) -> tuple[bool, int, int]:
+    """
+    检查 Milvus 中是否已存在指定 document_id 的数据
+    
+    Args:
+        client: Milvus 客户端
+        doc_id: 文档 ID
+    
+    Returns:
+        (是否存在, parent_collection中的数量, child_collection中的数量)
+    """
+    try:
+        # 查询 parent collection
+        parent_result = client.query(
+            collection_name=PARENT_COLLECTION_NAME,
+            filter=f'document_id == "{doc_id}"',
+            output_fields=["document_id"],
+        )
+        parent_count = len(parent_result) if parent_result else 0
+        
+        # 查询 child collection
+        child_result = client.query(
+            collection_name=CHILD_COLLECTION_NAME,
+            filter=f'document_id == "{doc_id}"',
+            output_fields=["document_id"],
+        )
+        child_count = len(child_result) if child_result else 0
+        
+        exists = parent_count > 0 or child_count > 0
+        return exists, parent_count, child_count
+    except Exception as e:
+        print(f"\n⚠️ 查询 Milvus 失败 (doc_id={doc_id}): {e}")
+        return False, 0, 0
+
+
+def delete_document_by_id(client: MilvusClient, doc_id: str) -> tuple[int, int]:
+    """
+    删除 Milvus 中指定 document_id 的数据
+    
+    Args:
+        client: Milvus 客户端
+        doc_id: 文档 ID
+    
+    Returns:
+        (删除的parent数量, 删除的child数量)
+    """
+    parent_deleted = 0
+    child_deleted = 0
+    
+    try:
+        # 加载集合
+        client.load_collection(collection_name=PARENT_COLLECTION_NAME)
+        client.load_collection(collection_name=CHILD_COLLECTION_NAME)
+        
+        # 删除 parent collection 中的数据
+        parent_result = client.delete(
+            collection_name=PARENT_COLLECTION_NAME,
+            filter=f'document_id == "{doc_id}"'
+        )
+        if parent_result:
+            parent_deleted = parent_result.get('delete_count', 0)
+        client.flush(collection_name=PARENT_COLLECTION_NAME)
+        
+        # 删除 child collection 中的数据
+        child_result = client.delete(
+            collection_name=CHILD_COLLECTION_NAME,
+            filter=f'document_id == "{doc_id}"'
+        )
+        if child_result:
+            child_deleted = child_result.get('delete_count', 0)
+        client.flush(collection_name=CHILD_COLLECTION_NAME)
+        
+        return parent_deleted, child_deleted
+    except Exception as e:
+        print(f"\n⚠️ 删除 Milvus 数据失败 (doc_id={doc_id}): {e}")
+        return 0, 0
+
+
+def check_and_clean_failed_files(
+    client: MilvusClient,
+    failed_files: List[Dict[str, Any]],
+    root_folder: Path
+) -> tuple[List[Dict[str, Any]], Set[str], int, int]:
+    """
+    检查失败的文件在 Milvus 中是否有残留数据,有则清除
+    
+    Args:
+        client: Milvus 客户端
+        failed_files: 失败文件列表
+        root_folder: 根目录路径
+    
+    Returns:
+        (清理后的失败文件列表, 需要重新上传的文件夹集合, 清理的文件数, 删除的总记录数)
+    """
+    if not failed_files:
+        return [], set(), 0, 0
+    
+    print(f"\n🔍 检查 {len(failed_files)} 个失败文件的 Milvus 残留数据...")
+    
+    cleaned_files = []
+    folders_to_reupload = set()  # 需要重新上传的文件夹
+    cleaned_count = 0
+    total_deleted = 0
+    
+    for idx, failed_item in enumerate(failed_files, 1):
+        folder_name = failed_item.get("folder", "")
+        file_name = failed_item.get("file", "")
+        doc_id = failed_item.get("document_id", "")
+        
+        if not folder_name and not file_name and not doc_id:
+            # 尝试从其他字段获取
+            folder_name = failed_item.get("folder_name", "")
+            doc_id = failed_item.get("doc_id", "")
+        
+        if not doc_id:
+            # 尝试从文件夹读取 JSON 获取 doc_id
+            subfolder = root_folder / folder_name if folder_name else None
+            if subfolder and subfolder.exists():
+                json_files = list(subfolder.glob("*.json"))
+                if json_files:
+                    try:
+                        with open(json_files[0], "r", encoding="utf-8") as f:
+                            data = json.load(f)
+                        doc_data = data.get("doc")
+                        if doc_data:
+                            doc_id = doc_data.get("id", "")
+                    except Exception:
+                        pass
+        
+        if not doc_id:
+            print(f"\r  检查进度: [{idx}/{len(failed_files)}] {folder_name or file_name} - 无法获取 document_id,跳过", end="")
+            cleaned_files.append(failed_item)
+            continue
+        
+        # 检查 Milvus 中是否有残留
+        exists, parent_count, child_count = check_document_in_milvus(client, doc_id)
+        
+        # 只要有残留数据或者之前有失败记录,就需要重新上传
+        need_reupload = False
+        folder_identifier = folder_name or file_name
+        
+        if exists:
+            print(f"\r  检查进度: [{idx}/{len(failed_files)}] {folder_identifier} - 发现残留数据(p:{parent_count}, c:{child_count}),正在清理...", end="")
+            deleted_parent, deleted_child = delete_document_by_id(client, doc_id)
+            total_deleted += deleted_parent + deleted_child
+            cleaned_count += 1
+            print(f"\r  检查进度: [{idx}/{len(failed_files)}] {folder_identifier} - ✅ 已清理残留数据(p:{deleted_parent}, c:{deleted_child})")
+            need_reupload = True
+        else:
+            print(f"\r  检查进度: [{idx}/{len(failed_files)}] {folder_identifier} - 无残留数据,将重新入库")
+            need_reupload = True
+        
+        # 保留在失败列表中,等待重新入库
+        cleaned_files.append(failed_item)
+        
+        # 标记需要重新上传
+        if folder_identifier:
+            folders_to_reupload.add(folder_identifier)
+    
+    print(f"\n📊 清理结果: 清理了 {cleaned_count} 个文件的残留数据,共删除 {total_deleted} 条记录")
+    print(f"🔄 需要重新上传: {len(folders_to_reupload)} 个文件夹")
+    return cleaned_files, folders_to_reupload, cleaned_count, total_deleted
+
+
+def verify_completed_folders(
+    client: MilvusClient,
+    root_folder: Path,
+    completed_folders: Set[str],
+    verified_folders: Set[str]
+) -> tuple[Set[str], Set[str], int]:
+    """
+    验证已完成文件夹的数据是否确实存在于 Milvus 中
+    
+    Args:
+        client: Milvus 客户端
+        root_folder: 根目录路径
+        completed_folders: 进度文件中记录的已完成文件夹集合
+        verified_folders: 已验证的文件夹集合
+    
+    Returns:
+        (需要重新上传的文件夹集合, 已验证的文件夹集合, 验证通过的文件夹数)
+    """
+    # 找出需要验证的文件夹(已完成但未验证的)
+    folders_to_verify = completed_folders - verified_folders
+    
+    if not folders_to_verify:
+        return set(), verified_folders, 0
+    
+    total_to_verify = len(folders_to_verify)
+    print(f"\n🔍 需要验证 {total_to_verify} 个已记录文件夹的数据是否存在于 Milvus...")
+    
+    need_reupload = set()
+    newly_verified = set()
+    verified_count = 0
+    
+    for idx, folder_name in enumerate(sorted(folders_to_verify), 1):
+        subfolder = root_folder / folder_name
+        if not subfolder.exists():
+            print(f"\r  验证进度: [{idx}/{total_to_verify}] {folder_name} - 文件夹不存在,跳过", end="")
+            newly_verified.add(folder_name)  # 标记为已验证,下次不再检查
+            continue
+        
+        # 读取 JSON 获取 doc_id
+        json_files = list(subfolder.glob("*.json"))
+        if not json_files:
+            print(f"\r  验证进度: [{idx}/{total_to_verify}] {folder_name} - 无JSON文件", end="")
+            newly_verified.add(folder_name)
+            continue
+        
+        try:
+            with open(json_files[0], "r", encoding="utf-8") as f:
+                data = json.load(f)
+            
+            doc_data = data.get("doc")
+            if not doc_data or not doc_data.get("id"):
+                print(f"\r  验证进度: [{idx}/{total_to_verify}] {folder_name} - JSON格式错误", end="")
+                newly_verified.add(folder_name)
+                continue
+            
+            doc_id = doc_data.get("id")
+            
+            # 查询 Milvus
+            exists, _, _ = check_document_in_milvus(client, doc_id)
+            
+            if exists:
+                print(f"\r  验证进度: [{idx}/{total_to_verify}] {folder_name} ✅ 已存在", end="")
+                newly_verified.add(folder_name)
+                verified_count += 1
+            else:
+                print(f"\r  验证进度: [{idx}/{total_to_verify}] {folder_name} ❌ 不存在,需重新上传", end="")
+                need_reupload.add(folder_name)
+                
+        except Exception as e:
+            print(f"\r  验证进度: [{idx}/{total_to_verify}] {folder_name} ⚠️ 验证失败: {e}", end="")
+            # 验证失败时不确定数据是否存在,保守起见需要重新上传
+            need_reupload.add(folder_name)
+    
+    print()  # 换行
+    print(f"\n📊 验证结果:")
+    print(f"   验证通过: {verified_count}")
+    print(f"   需要重新上传: {len(need_reupload)}")
+    
+    # 更新已验证集合
+    verified_folders.update(newly_verified)
+    
+    return need_reupload, verified_folders, verified_count
+
+
+def insert_parent_rows(client: MilvusClient, collection_name: str, parent_rows: List[Dict[str, Any]], 
+                       doc_data: Dict[str, Any], doc_id: str, folder_name: str, file_name: str) -> tuple[int, str | None]:
+    """
+    插入 parent 数据到 Milvus。
+    
+    Args:
+        client: Milvus 客户端
+        collection_name: Collection 名称
+        parent_rows: parent 数组
+        doc_data: doc 数据
+        doc_id: 文档 ID
+        folder_name: 文件夹名称
+    
+    Returns:
+        插入的行数
+    """
+    parent_rows = normalize_rows_for_text_limit(parent_rows, "parent", folder_name)
+    if not parent_rows:
+        return 0, None
+    
+    # 获取 embeddings 客户端
+    embeddings = get_embeddings()
+    
+    file_url = f"{PREFIX}/{doc_id}.md"
+    now_ts = int(datetime.now().timestamp())
+    
+    # 批量提取切分后的文本
+    texts = [row.get("text", "") for row in parent_rows]
+    # 批量生成向量
+    vectors = embeddings.embed_documents(texts)
+    
+    entities = []
+    for idx, row in enumerate(parent_rows):
+        entity = {
+            "text": texts[idx],
+            "dense": vectors[idx],
+            "document_id": doc_id,
+            "parent_id": str(row.get("parent_id", "")),
+            "index": row.get("index", 0),
+            "tag_list": "",
+            "permission": {},
+            "metadata": build_metadata(doc_data, row.get("hierarchy", ""), file_url, file_name),
+            "is_deleted": False,
+            "created_by": DEFAULT_USER_ID,
+            "created_time": now_ts,
+            "updated_by": DEFAULT_USER_ID,
+            "updated_time": now_ts,
+        }
+        entities.append(entity)
+    
+    # 显式分批插入,降低单次请求体积和失败风险
+    total_inserted = 0
+    try:
+        for i in range(0, len(entities), BATCH_SIZE):
+            batch = entities[i:i + BATCH_SIZE]
+            client.insert(collection_name=collection_name, data=batch)
+            total_inserted += len(batch)
+        return total_inserted, None
+    except Exception as e:
+        print(f"\n📁 {folder_name} ❌ 插入 parent 失败: {e}")
+        return total_inserted, str(e)
+
+
+def insert_child_rows(client: MilvusClient, collection_name: str, child_rows: List[Dict[str, Any]], 
+                      doc_data: Dict[str, Any], doc_id: str, folder_name: str, file_name: str) -> tuple[int, str | None]:
+    """
+    插入 children 数据到 Milvus。
+    
+    Args:
+        client: Milvus 客户端
+        collection_name: Collection 名称
+        child_rows: children 数组
+        doc_data: doc 数据
+        doc_id: 文档 ID
+        folder_name: 文件夹名称
+    
+    Returns:
+        插入的行数
+    """
+    child_rows = normalize_rows_for_text_limit(child_rows, "children", folder_name)
+    if not child_rows:
+        return 0, None
+    
+    # 获取 embeddings 客户端
+    embeddings = get_embeddings()
+    
+    file_url = f"{PREFIX}/{doc_id}.md"
+    now_ts = int(datetime.now().timestamp())
+    
+    # 批量提取切分后的文本
+    texts = [row.get("text", "") for row in child_rows]
+    # 批量生成向量
+    vectors = embeddings.embed_documents(texts)
+    
+    entities = []
+    for idx, row in enumerate(child_rows):
+        entity = {
+            "text": texts[idx],
+            "dense": vectors[idx],
+            "document_id": doc_id,
+            "parent_id": str(row.get("parent_id", "")),
+            "index": row.get("index", 0),
+            "tag_list": "",
+            "permission": {},
+            "metadata": build_metadata(doc_data, row.get("hierarchy", ""), file_url, file_name),
+            "is_deleted": False,
+            "created_by": DEFAULT_USER_ID,
+            "created_time": now_ts,
+            "updated_by": DEFAULT_USER_ID,
+            "updated_time": now_ts,
+        }
+        entities.append(entity)
+    
+    # 显式分批插入,降低单次请求体积和失败风险
+    total_inserted = 0
+    try:
+        for i in range(0, len(entities), BATCH_SIZE):
+            batch = entities[i:i + BATCH_SIZE]
+            client.insert(collection_name=collection_name, data=batch)
+            total_inserted += len(batch)
+        return total_inserted, None
+    except Exception as e:
+        print(f"\n📁 {folder_name} ❌ 插入 children 失败: {e}")
+        return total_inserted, str(e)
+
+
+def process_folder(root_folder: str | Path, progress: Dict[str, Any]) -> Dict[str, Any]:
+    """
+    处理文件夹结构,导入 Milvus。
+    支持断点续传,跳过已处理的文件夹。
+    支持 Milvus 数据验证,确保进度文件中的记录确实存在于 Milvus 中。
+    
+    Args:
+        root_folder: 根目录路径
+        progress: 进度字典
+    
+    Returns:
+        统计信息字典
+    """
+    root_folder = Path(root_folder)
+    if not root_folder.is_dir():
+        raise NotADirectoryError(f"不是有效的文件夹: {root_folder}")
+    
+    client = get_milvusclient()
+    
+    # 获取已完成的文件夹集合
+    completed_folders: Set[str] = set(progress.get("completed_folders", []))
+    verified_folders: Set[str] = set(progress.get("verified_folders", []))
+    failed_folders: List[Dict[str, Any]] = list(progress.get("failed_folders", []))
+    stats = dict(progress.get("stats", {
+        "success": 0,
+        "failed": 0,
+        "skipped": 0,
+        "parent_rows": 0,
+        "child_rows": 0,
+    }))
+    
+    # 检查并清理失败文件的 Milvus 残留数据
+    folders_to_reupload_from_failed = set()
+    if failed_folders:
+        cleaned_failed_files, folders_to_reupload_from_failed, cleaned_count, total_deleted = check_and_clean_failed_files(
+            client, failed_folders, root_folder
+        )
+        # 更新失败列表(清理后的)
+        failed_folders = cleaned_failed_files
+        # 调整统计
+        stats["parent_rows"] = max(0, stats.get("parent_rows", 0) - total_deleted)
+        progress["failed_folders"] = failed_folders
+        progress["stats"] = stats
+        save_progress(PROGRESS_FILE_PATH, progress)
+    
+    # 验证已完成的文件夹是否确实存在于 Milvus
+    need_reupload, verified_folders, verified_count = verify_completed_folders(
+        client, root_folder, completed_folders, verified_folders
+    )
+    
+    # 合并需要从失败列表重新上传的文件夹
+    if folders_to_reupload_from_failed:
+        need_reupload.update(folders_to_reupload_from_failed)
+    
+    # 更新进度文件中的已验证列表
+    progress["verified_folders"] = list(verified_folders)
+    save_progress(PROGRESS_FILE_PATH, progress)
+    
+    # 需要从 completed_folders 中移除需要重新上传的,并调整统计
+    if need_reupload:
+        completed_folders -= need_reupload
+        # 调整统计(粗略估计,从 success 中减去)
+        stats["success"] = max(0, stats["success"] - len(need_reupload))
+        print(f"\n🔄 将重新上传 {len(need_reupload)} 个文件夹")
+    
+    # 获取所有子文件夹并排序
+    all_subfolders = sorted([d for d in root_folder.iterdir() if d.is_dir()])
+    total_folders = len(all_subfolders)
+    
+    # 统计待处理的文件夹(包括需要重新上传的)
+    pending_folders = [f for f in all_subfolders if f.name not in completed_folders]
+    pending_count = len(pending_folders)
+    
+    print(f"\n📊 统计信息:")
+    print(f"   总文件夹数: {total_folders}")
+    print(f"   已完成且验证通过: {len(completed_folders)}")
+    print(f"   需要重新上传: {len(need_reupload)}")
+    print(f"   待处理: {pending_count}")
+    print(f"   上次更新时间: {progress.get('last_update', '无')}")
+    print("-" * 60)
+    
+    if pending_count == 0:
+        print("✅ 所有文件夹已处理完毕!")
+        return {
+            "success": stats["success"],
+            "failed": stats["failed"],
+            "skipped": stats["skipped"],
+            "parent_rows": stats["parent_rows"],
+            "child_rows": stats["child_rows"],
+            "failed_items": failed_folders,
+        }
+    
+    # 处理每个文件夹
+    current_index = len(completed_folders)
+    for subfolder in all_subfolders:
+        folder_name = subfolder.name
+        
+        # 跳过已验证的文件夹
+        if folder_name in completed_folders and folder_name not in need_reupload:
+            continue
+        
+        current_index += 1
+        
+        # 标记为已验证(防止下次重复验证)
+        if folder_name in need_reupload:
+            need_reupload.discard(folder_name)
+        
+        # 查找 JSON 文件
+        json_files = list(subfolder.glob("*.json"))
+        if not json_files:
+            print(f"\r{format_progress(current_index, total_folders, folder_name + ' (无JSON)')}", end="")
+            stats["skipped"] += 1
+            completed_folders.add(folder_name)
+            verified_folders.add(folder_name)
+            # 每处理10个文件夹保存一次进度
+            if len(completed_folders) % 10 == 0:
+                progress["completed_folders"] = list(completed_folders)
+                progress["verified_folders"] = list(verified_folders)
+                progress["stats"] = stats
+                save_progress(PROGRESS_FILE_PATH, progress)
+            continue
+        
+        json_path = json_files[0]
+        
+        try:
+            # 读取 JSON
+            with open(json_path, "r", encoding="utf-8") as f:
+                data = json.load(f)
+            
+            doc_data = data.get("doc")
+            parent_rows = data.get("parent", [])
+            child_rows = data.get("children", [])
+            
+            if not doc_data or not doc_data.get("id"):
+                print(f"\n📁 {folder_name} ❌ JSON格式错误或缺少doc/id")
+                failed_folders.append({
+                    "folder": folder_name,
+                    "document_id": "",
+                    "error": "JSON格式错误或缺少doc/id",
+                    "reason": "JSON格式错误或缺少doc/id",
+                })
+                stats["failed"] += 1
+                completed_folders.add(folder_name)
+                verified_folders.add(folder_name)
+                continue
+            
+            doc_id = doc_data.get("id")
+
+            # 获取原始文件名(不含扩展名)
+            all_files = list(subfolder.glob("*"))
+            original_file = None
+            for f in all_files:
+                if f.is_file() and f.suffix not in [".json", ".md"]:
+                    original_file = f
+                    break
+
+            file_name = original_file.stem if original_file else folder_name
+            
+            # 插入 parent 和 children
+            parent_count, parent_error = insert_parent_rows(
+                client, PARENT_COLLECTION_NAME, parent_rows, doc_data, doc_id, folder_name, file_name
+            )
+            child_count, child_error = insert_child_rows(
+                client, CHILD_COLLECTION_NAME, child_rows, doc_data, doc_id, folder_name, file_name
+            )
+
+            has_error = False
+            if parent_error:
+                failed_folders.append({
+                    "folder": folder_name,
+                    "document_id": doc_id,
+                    "error": "parent 入库失败",
+                    "reason": parent_error,
+                })
+                has_error = True
+            if child_error:
+                failed_folders.append({
+                    "folder": folder_name,
+                    "document_id": doc_id,
+                    "error": "child 入库失败",
+                    "reason": child_error,
+                })
+                has_error = True
+            
+            if parent_count > 0 or child_count > 0:
+                print(f"\r{format_progress(current_index, total_folders, folder_name + f' ✅ p:{parent_count} c:{child_count}')}", end="")
+                stats["success"] += 1
+                stats["parent_rows"] += parent_count
+                stats["child_rows"] += child_count
+                # 从失败列表中移除(如果之前失败过)
+                failed_folders = [
+                    f for f in failed_folders 
+                    if f.get("folder") != folder_name and f.get("file") != folder_name
+                ]
+            elif has_error:
+                stats["failed"] += 1
+            else:
+                print(f"\r{format_progress(current_index, total_folders, folder_name + ' (无数据)')}", end="")
+                stats["skipped"] += 1
+                # 从失败列表中移除(如果之前失败过)
+                failed_folders = [
+                    f for f in failed_folders 
+                    if f.get("folder") != folder_name and f.get("file") != folder_name
+                ]
+            
+            # 标记为已完成且已验证
+            completed_folders.add(folder_name)
+            verified_folders.add(folder_name)
+            
+            # 每处理10个文件夹保存一次进度
+            if len(completed_folders) % 10 == 0:
+                progress["completed_folders"] = list(completed_folders)
+                progress["verified_folders"] = list(verified_folders)
+                progress["failed_folders"] = failed_folders
+                progress["stats"] = stats
+                save_progress(PROGRESS_FILE_PATH, progress)
+                
+        except Exception as e:
+            error_message = str(e)
+            print(f"\n📁 {folder_name} ❌ {error_message}")
+            # 尝试获取 doc_id
+            doc_id_for_error = doc_id if 'doc_id' in locals() else ""
+            failed_folders.append({
+                "folder": folder_name,
+                "document_id": doc_id_for_error,
+                "error": "未知错误",
+                "reason": error_message,
+            })
+            stats["failed"] += 1
+            # 失败也标记为已处理,避免无限循环
+            completed_folders.add(folder_name)
+            verified_folders.add(folder_name)
+    
+    print()  # 换行
+    
+    # 最终保存进度
+    progress["completed_folders"] = list(completed_folders)
+    progress["verified_folders"] = list(verified_folders)
+    progress["failed_folders"] = failed_folders
+    progress["stats"] = stats
+    save_progress(PROGRESS_FILE_PATH, progress)
+    
+    return {
+        "success": stats["success"],
+        "failed": stats["failed"],
+        "skipped": stats["skipped"],
+        "parent_rows": stats["parent_rows"],
+        "child_rows": stats["child_rows"],
+        "failed_items": failed_folders,
+    }
+
+
+def main():
+    """主函数"""
+    try:
+        print(f"🔍 开始导入 Milvus...")
+        print(f"📂 根目录: {ROOT_FOLDER}")
+        print(f"🔗 Milvus: {settings.MILVUS_HOST}:{settings.MILVUS_PORT}")
+        print(f"📊 Parent Collection: {PARENT_COLLECTION_NAME}")
+        print(f"📊 Child Collection: {CHILD_COLLECTION_NAME}")
+        print(f"💾 进度文件: {PROGRESS_FILE_PATH}")
+        
+        # 加载进度
+        progress = load_progress(PROGRESS_FILE_PATH)
+        
+        stats = process_folder(ROOT_FOLDER, progress)
+
+        with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+            json.dump({"failed": stats["failed_items"]}, f, ensure_ascii=False, indent=2)
+        
+        print("\n" + "=" * 60)
+        print(f"✅ 成功: {stats['success']} | ❌ 失败: {stats['failed']} | ⊘ 跳过: {stats['skipped']}")
+        print(f"📊 Parent 行数: {stats['parent_rows']} | Child 行数: {stats['child_rows']}")
+        print(f"❌ 失败汇总JSON: {FAILED_REPORT_PATH}")
+        print(f"💾 进度文件: {PROGRESS_FILE_PATH}")
+        print("=" * 60)
+        
+    except Exception as e:
+        print(f"\n❌ 错误: {str(e)}")
+
+
+if __name__ == "__main__":
+    main()