Procházet zdrojové kódy

dev:t_kngs_construction_plan_collections创建和导入milvus

ai02 před 3 týdny
rodič
revize
2d50ea275b

+ 25 - 8
src/app/scripts/t_kngs_construction_plan_collections_create.py

@@ -223,18 +223,26 @@ def create_index(client, collection_name: str):
     )
 
 
-def ensure_collection(collection_name: str, create_idx: bool = True):
+def ensure_collection(collection_name: str, create_idx: bool = True, auto_load: bool = True):
     """
-    确保Collection存在,不存在则创建
+    确保Collection存在,不存在则创建,并自动加载到内存
     
     Args:
         collection_name: Collection名称
         create_idx: 是否创建索引
+        auto_load: 创建后是否自动加载到内存
     """
     client = get_milvusclient()
     
     if client.has_collection(collection_name=collection_name):
         print(f"ℹ️ Collection已存在: {collection_name}")
+        # 即使已存在也尝试加载
+        if auto_load:
+            try:
+                client.load_collection(collection_name=collection_name)
+                print(f"✅ Collection已加载到内存: {collection_name}")
+            except Exception as e:
+                print(f"⚠️ Collection加载失败(可能已加载): {e}")
         return False
     
     print(f"📝 创建Schema: {collection_name}")
@@ -251,6 +259,15 @@ def ensure_collection(collection_name: str, create_idx: bool = True):
         print(f"🔍 创建索引: {collection_name}")
         create_index(client, collection_name)
     
+    # 自动加载到内存
+    if auto_load:
+        print(f"📂 加载Collection到内存: {collection_name}")
+        try:
+            client.load_collection(collection_name=collection_name)
+            print(f"✅ Collection加载完成: {collection_name}")
+        except Exception as e:
+            print(f"⚠️ Collection加载失败: {e}")
+    
     print(f"✅ Collection创建完成: {collection_name}")
     return True
 
@@ -262,19 +279,19 @@ def main():
     print("=" * 60)
     
     try:
-        # 创建父表
+        # 创建父表(自动加载)
         print("\n📋 创建父表...")
-        parent_created = ensure_collection(PARENT_COLLECTION_NAME)
+        parent_created = ensure_collection(PARENT_COLLECTION_NAME, auto_load=True)
         
-        # 创建子表
+        # 创建子表(自动加载)
         print("\n📋 创建子表...")
-        child_created = ensure_collection(CHILD_COLLECTION_NAME)
+        child_created = ensure_collection(CHILD_COLLECTION_NAME, auto_load=True)
         
         print("\n" + "=" * 60)
         if parent_created or child_created:
-            print("✅ 所有Collection创建完成!")
+            print("✅ 所有Collection创建并加载完成!")
         else:
-            print("ℹ️ 所有Collection已存在,无需创建")
+            print("ℹ️ 所有Collection已存在并加载")
         print("=" * 60)
         
     except Exception as e:

+ 777 - 0
src/app/scripts/t_kngs_construction_plan_collections_import.py

@@ -0,0 +1,777 @@
+"""
+施工方案知识结构体系数据导入脚本
+将JSON文件数据切分后插入Milvus的父子表中
+
+增强功能:
+- 支持断点续传(保存处理进度)
+- 输出已上传文件清单
+- 自动生成parent_id建立父子关联
+- 自动重试机制(处理Milvus集群节点不稳定问题)
+"""
+from __future__ import annotations
+
+import hashlib
+import json
+import os
+import re
+import time
+from datetime import datetime
+from functools import wraps
+from pathlib import Path
+from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
+
+from pymilvus import MilvusClient
+from pymilvus.exceptions import MilvusException
+
+from app.config.embeddings import get_embeddings
+from app.config.milvus_client import get_milvusclient
+from app.config.setting import settings
+
+T = TypeVar('T')
+
+
+def retry_on_milvus_node_error(
+    max_retries: int = 3,
+    delay: float = 2.0,
+    backoff: float = 2.0
+) -> Callable:
+    """
+    装饰器:当遇到Milvus节点不匹配错误时自动重试
+    
+    Args:
+        max_retries: 最大重试次数
+        delay: 初始重试延迟(秒)
+        backoff: 延迟增长倍数
+    """
+    def decorator(func: Callable[..., T]) -> Callable[..., T]:
+        @wraps(func)
+        def wrapper(*args, **kwargs) -> T:
+            current_delay = delay
+            last_exception = None
+            
+            for attempt in range(max_retries + 1):
+                try:
+                    return func(*args, **kwargs)
+                except MilvusException as e:
+                    last_exception = e
+                    error_msg = str(e)
+                    
+                    # 检查是否是节点不匹配错误
+                    if "node not match" in error_msg or "expectedNodeID" in error_msg:
+                        if attempt < max_retries:
+                            print(f"⚠️ Milvus节点不匹配,{current_delay}秒后重试({attempt + 1}/{max_retries})...")
+                            time.sleep(current_delay)
+                            current_delay *= backoff
+                            continue
+                    # 其他错误直接抛出
+                    raise
+                except Exception as e:
+                    # 非Milvus异常直接抛出
+                    raise
+            
+            # 重试次数用完,抛出最后一个异常
+            raise last_exception
+        
+        return wrapper
+    return decorator
+
+# ==================== 配置区域 ====================
+
+# JSON文件夹路径
+JSON_FOLDER = r"E:\WeChat Files\wxid_rymkhe638gt022\FileStorage\File\2026-03\1\your_output_folder"
+
+# Collection 名称
+PARENT_COLLECTION_NAME = "t_kngs_construction_plan_parent"
+CHILD_COLLECTION_NAME = "t_kngs_construction_plan_child"
+
+# 向量维度(根据使用的embedding模型调整)
+DENSE_DIM = 4096
+
+# 默认创建人/修改人ID
+DEFAULT_USER_ID = "system"
+
+# 失败报告保存路径
+FAILED_REPORT_PATH = "construction_plan_import_failed_report.json"
+
+# 进度保存文件路径(用于断点续传)
+PROGRESS_FILE_PATH = "construction_plan_import_progress.json"
+
+# 已上传文件清单保存路径
+UPLOADED_FILES_PATH = "construction_plan_import_uploaded_files.json"
+
+# 批量插入大小
+BATCH_SIZE = 100
+
+# 是否保存处理进度(用于断点续传)
+SAVE_PROGRESS = True
+
+
+# ==================== 进度管理功能 ====================
+
+def load_progress() -> Dict[str, Any]:
+    """加载处理进度"""
+    if not os.path.exists(PROGRESS_FILE_PATH):
+        return {
+            "processed_files": [],
+            "failed_files": [],
+            "last_run": None,
+            "total_processed": 0
+        }
+    
+    try:
+        with open(PROGRESS_FILE_PATH, "r", encoding="utf-8") as f:
+            return json.load(f)
+    except Exception as e:
+        print(f"⚠️ 加载进度文件失败: {e}")
+        return {
+            "processed_files": [],
+            "failed_files": [],
+            "last_run": None,
+            "total_processed": 0
+        }
+
+
+def save_progress(progress: Dict[str, Any]):
+    """保存处理进度"""
+    if not SAVE_PROGRESS:
+        return
+    
+    try:
+        progress["last_run"] = datetime.now().isoformat()
+        with open(PROGRESS_FILE_PATH, "w", encoding="utf-8") as f:
+            json.dump(progress, f, ensure_ascii=False, indent=2)
+    except Exception as e:
+        print(f"⚠️ 保存进度文件失败: {e}")
+
+
+def is_file_processed(file_name: str, doc_id: str, progress: Dict[str, Any]) -> bool:
+    """检查文件是否已处理过"""
+    file_key = f"{file_name}:{doc_id}"
+    return file_key in progress.get("processed_files", [])
+
+
+def mark_file_processed(file_name: str, doc_id: str, progress: Dict[str, Any]):
+    """标记文件为已处理"""
+    file_key = f"{file_name}:{doc_id}"
+    if file_key not in progress.get("processed_files", []):
+        progress.setdefault("processed_files", []).append(file_key)
+        progress["total_processed"] = progress.get("total_processed", 0) + 1
+
+
+def mark_file_failed(file_name: str, doc_id: str, error: str, progress: Dict[str, Any]):
+    """标记文件为处理失败"""
+    failed_item = {
+        "file": file_name,
+        "doc_id": doc_id,
+        "error": error,
+        "timestamp": datetime.now().isoformat()
+    }
+    progress.setdefault("failed_files", []).append(failed_item)
+
+
+# ==================== 重复检测功能 ====================
+
+@retry_on_milvus_node_error(max_retries=3, delay=2.0)
+def _load_collection_with_retry(client: MilvusClient, collection_name: str):
+    """带重试的加载collection"""
+    client.load_collection(collection_name=collection_name)
+
+
+@retry_on_milvus_node_error(max_retries=3, delay=2.0)
+def _query_with_retry(client: MilvusClient, **kwargs):
+    """带重试的查询"""
+    return client.query(**kwargs)
+
+
+# ==================== 数据解析和处理 ====================
+
+def parse_section_label(section_label: str) -> Tuple[str, str, str]:
+    """
+    解析章节标签,提取一级、二级、三级章节类型
+    
+    Args:
+        section_label: 如 "9 验收要求->9.2 验收内容"
+    
+    Returns:
+        (chapter_level_1, chapter_level_2, chapter_level_3)
+    """
+    if not section_label:
+        return "", "", ""
+    
+    parts = section_label.split("->")
+    
+    level_1 = parts[0].strip() if len(parts) > 0 else ""
+    level_2 = parts[1].strip() if len(parts) > 1 else ""
+    level_3 = parts[2].strip() if len(parts) > 2 else ""
+    
+    return level_1, level_2, level_3
+
+
+def extract_file_info(source_file: str) -> Dict[str, str]:
+    """
+    从源文件路径提取文件信息
+    
+    Args:
+        source_file: 源文件完整路径
+    
+    Returns:
+        包含file_name的字典(plan_type留白)
+    """
+    if not source_file:
+        return {"file_name": ""}
+    
+    # 提取文件名
+    file_name = os.path.basename(source_file)
+    # 移除.pdf后缀
+    if file_name.endswith(".pdf"):
+        file_name = file_name[:-4]
+    
+    # plan_type暂时留白,不填充默认值
+    return {"file_name": file_name}
+
+
+
+def build_metadata(
+    chunk_id: str,
+    section_label: str,
+    element_tag: Dict[str, Any],
+    category_info: Dict[str, str],
+    source_file: str
+) -> Dict[str, Any]:
+    """
+    构建元数据
+    
+    Args:
+        chunk_id: 片段ID
+        section_label: 章节标签
+        element_tag: 元素标签信息
+        category_info: 分类信息
+        source_file: 源文件路径
+    
+    Returns:
+        元数据字典
+    """
+    return {
+        "chunk_id": chunk_id,
+        "section_label": section_label,
+        "page": element_tag.get("page", 0),
+        "serial_number": element_tag.get("serial_number", ""),
+        "source_file": source_file,
+        "primary_category": category_info.get("primary", ""),
+        "secondary_category": category_info.get("secondary", ""),
+        "tertiary_category": category_info.get("tertiary", ""),
+    }
+
+
+# ==================== 数据插入 ====================
+
+def insert_to_collection(
+    client: MilvusClient,
+    collection_name: str,
+    records: List[Dict[str, Any]],
+    embeddings: Any
+) -> Tuple[int, Optional[str]]:
+    """
+    批量插入数据到Milvus Collection
+    
+    Args:
+        client: Milvus客户端
+        collection_name: Collection名称
+        records: 待插入的记录列表
+        embeddings: Embedding模型
+    
+    Returns:
+        (成功插入数量, 错误信息)
+    """
+    if not records:
+        return 0, None
+    
+    try:
+        # 批量生成向量
+        texts = [r["text"] for r in records]
+        vectors = embeddings.embed_documents(texts)
+        
+        # 构建实体
+        now_ts = int(datetime.now().timestamp())
+        entities = []
+        
+        for i, record in enumerate(records):
+            entity = {
+                "text": record["text"],
+                "dense": vectors[i],
+                "document_id": record.get("document_id", ""),
+                "parent_id": record.get("parent_id", ""),
+                "index": record.get("index", 0),
+                "tag_list": record.get("tag_list", ""),
+                "permission": record.get("permission", {}),
+                "metadata": record.get("metadata", {}),
+                "file_name": record.get("file_name", ""),
+                "plan_type": record.get("plan_type", ""),  # 从record获取,默认为空
+                "file_url": record.get("file_url", ""),
+                "chapter_title": record.get("chapter_title", ""),
+                "chapter_level_1": record.get("chapter_level_1", ""),
+                "chapter_level_2": record.get("chapter_level_2", ""),
+                "chapter_level_3": record.get("chapter_level_3", ""),
+                "is_deleted": False,
+                "created_by": record.get("created_by", DEFAULT_USER_ID),
+                "created_time": now_ts,
+                "updated_by": record.get("updated_by", DEFAULT_USER_ID),
+                "updated_time": now_ts,
+            }
+            entities.append(entity)
+        
+        # 批量插入
+        client.insert(collection_name=collection_name, data=entities)
+        return len(entities), None
+        
+    except Exception as e:
+        return 0, str(e)
+
+
+# ==================== 文件清单输出 ====================
+
+def save_uploaded_files_report(stats: Dict[str, Any], output_path: str):
+    """
+    保存已上传文件清单为JSON
+    
+    Args:
+        stats: 统计信息字典
+        output_path: 输出文件路径
+    """
+    report = {
+        "generated_at": datetime.now().isoformat(),
+        "summary": {
+            "total_files": stats.get('total', 0),
+            "success_count": stats.get('success', 0),
+            "skipped_count": stats.get('skipped', 0),
+            "failed_count": stats.get('failed', 0),
+            "total_parent_rows": stats.get('total_parent', 0),
+            "total_child_rows": stats.get('total_child', 0)
+        },
+        "uploaded_files": {
+            "success": [
+                {
+                    "file": item.get('file', ''),
+                    "parent_count": item.get('parent_count', 0),
+                    "child_count": item.get('child_count', 0),
+                    "timestamp": datetime.now().isoformat()
+                }
+                for item in stats.get('success_items', [])
+            ],
+            "skipped": [
+                {
+                    "file": item.get('file', ''),
+                    "reason": item.get('reason', '已处理过')
+                }
+                for item in stats.get('skipped_items', [])
+            ],
+            "failed": stats.get('failed_items', [])
+        },
+        "statistics": {
+            "success_files": len(stats.get('success_items', [])),
+            "skipped_files": len(stats.get('skipped_items', [])),
+            "failed_files": len(stats.get('failed_items', []))
+        }
+    }
+    
+    try:
+        with open(output_path, "w", encoding="utf-8") as f:
+            json.dump(report, f, ensure_ascii=False, indent=2)
+        print(f"\n✅ 已上传文件清单已保存: {output_path}")
+    except Exception as e:
+        print(f"\n⚠️ 保存已上传文件清单失败: {e}")
+
+
+# ==================== 主处理流程 ====================
+
+def make_parent_id(chunk_id: str, file_name: str) -> int:
+    """
+    生成稳定的parent_id
+    
+    Args:
+        chunk_id: chunk唯一标识
+        file_name: 文件名
+    
+    Returns:
+        稳定的整数parent_id
+    """
+    raw = f"{file_name}|{chunk_id}".encode("utf-8")
+    return int(hashlib.sha1(raw).hexdigest()[:16], 16) & ((1 << 63) - 1)
+
+
+def process_json_file(
+    json_path: Path, 
+    client: MilvusClient, 
+    embeddings: Any,
+    progress: Dict[str, Any]
+) -> Dict[str, Any]:
+    """
+    处理单个JSON文件
+    
+    Args:
+        json_path: JSON文件路径
+        client: Milvus客户端
+        embeddings: Embedding模型
+        progress: 进度字典
+    
+    Returns:
+        处理结果统计
+    """
+    result = {
+        "file": str(json_path.name),
+        "parent_count": 0,
+        "child_count": 0,
+        "success": False,
+        "error": None,
+        "status": "unknown"
+    }
+    
+    try:
+        # 读取JSON
+        with open(json_path, "r", encoding="utf-8") as f:
+            data = json.load(f)
+        
+        if not isinstance(data, dict):
+            result["error"] = "JSON格式错误:根对象应为字典"
+            result["status"] = "failed"
+            return result
+        
+        # 获取第一个chunk的document_id用于断点续传检测
+        first_chunk = None
+        for chunk_key, chunk_data in data.items():
+            if isinstance(chunk_data, dict):
+                first_chunk = chunk_data
+                break
+        
+        if not first_chunk:
+            result["error"] = "JSON中没有有效的chunk数据"
+            result["status"] = "skipped"
+            return result
+        
+        matedata = first_chunk.get("matedata", {})
+        doc_id = matedata.get("chunk_id", "")
+        source_file = matedata.get("source_file", "")
+        file_info = extract_file_info(source_file)
+        file_name = file_info["file_name"]
+        
+        # 【断点续传】检查是否已处理过
+        if is_file_processed(json_path.name, doc_id, progress):
+            result["status"] = "skipped"
+            result["reason"] = "进度文件中已记录"
+            return result
+        
+        parent_records = []
+        child_records = []
+        parent_index = 0  # 父表索引序号计数器
+        
+        # 处理每个chunk
+        for chunk_key, chunk_data in data.items():
+            if not isinstance(chunk_data, dict):
+                continue
+            
+            tags = chunk_data.get("tags", {})
+            content = chunk_data.get("oringin_content", "")
+            matedata = chunk_data.get("matedata", {})
+            category_info = chunk_data.get("category_info", {})
+            
+            # 从JSON中读取document_id和file_url(从matedata中)
+            chunk_doc_id = matedata.get("document_id", "")
+            chunk_file_url = matedata.get("file_url", "")
+            
+            chunk_id = matedata.get("chunk_id", "")
+            section_label = matedata.get("section_label", "")
+            element_tag = matedata.get("element_tag", {})
+            source_file = matedata.get("source_file", "")
+            
+            # 解析章节级别(从section_label)
+            level_1, level_2, level_3 = parse_section_label(section_label)
+            
+            # 【新增】从category_info获取章节级别(优先使用category_info)
+            cat_level_1 = category_info.get("primary", "")
+            cat_level_2 = category_info.get("secondary", "")
+            cat_level_3 = category_info.get("tertiary", "")
+            
+            # 如果category_info有值,优先使用;否则使用section_label解析的值
+            final_level_1 = cat_level_1 if cat_level_1 else level_1
+            final_level_2 = cat_level_2 if cat_level_2 else level_2
+            final_level_3 = cat_level_3 if cat_level_3 else level_3
+            
+            # 提取文件信息
+            file_info = extract_file_info(source_file)
+            
+            # 构建标签列表
+            tag_list = ",".join([v for v in tags.values() if v]) if tags else ""
+            
+            # 构建元数据(包含category_info的映射)
+            metadata = build_metadata(
+                chunk_id=chunk_id,
+                section_label=section_label,
+                element_tag=element_tag,
+                category_info=category_info,
+                source_file=source_file
+            )
+            
+            # 使用JSON中的document_id,如果没有则使用chunk_id
+            document_id = chunk_doc_id if chunk_doc_id else chunk_id
+            
+            # 使用JSON中的file_url
+            file_url = chunk_file_url if chunk_file_url else ""
+            
+            # 【关键】生成稳定的parent_id
+            parent_id = make_parent_id(chunk_id, file_info["file_name"])
+            
+            # === 父表记录 ===
+            parent_record = {
+                "text": content,
+                "document_id": document_id,
+                "parent_id": str(parent_id),
+                "index": 0,  # 父表index暂且为空(设为0)
+                "tag_list": tag_list,
+                "permission": {},
+                "metadata": metadata,
+                "file_name": file_info["file_name"],
+                "plan_type": "",  # plan_type暂时留白
+                "file_url": file_url,  # 使用JSON中的file_url
+                "chapter_title": section_label,
+                "chapter_level_1": final_level_1,
+                "chapter_level_2": final_level_2,
+                "chapter_level_3": final_level_3,
+            }
+            parent_records.append(parent_record)
+            parent_index += 1  # 递增父表索引序号
+            
+            # === 子表记录 ===
+            # 【核心逻辑】每个tag对应1条子数据,子表条数 = tags数量
+            # 子表text = tag值,tag_list = tag值
+            sorted_tag_items = sorted(tags.items(), key=lambda x: x[0])
+            
+            # 过滤掉空的tag
+            valid_tags = [(k, v.strip()) for k, v in sorted_tag_items if v and v.strip()]
+            
+            for child_idx, (tag_key, tag_value) in enumerate(valid_tags):
+                child_record = {
+                    "text": tag_value,  # 【修改】text内容为tag值
+                    "document_id": document_id,
+                    "parent_id": str(parent_id),
+                    "index": child_idx + 1,  # 子表索引序号,从1开始递增(1,2,3...N)
+                    "tag_list": tag_value,  # 【修改】tag_list为单个tag值
+                    "permission": {},
+                    "metadata": {
+                        **metadata,
+                        "fragment_tag_key": tag_key,  # 关联的 tag_key,如 "tag_0"
+                        "fragment_tag_value": tag_value,  # 关联的 tag 值,如 "主副钩测试"
+                        "fragment_index": child_idx + 1,  # 从1开始
+                        "total_fragments": len(valid_tags),
+                    },
+                    "file_name": file_info["file_name"],
+                    "plan_type": "",  # plan_type暂时留白
+                    "file_url": file_url,  # 使用JSON中的file_url
+                    "chapter_title": section_label,
+                    "chapter_level_1": final_level_1,
+                    "chapter_level_2": final_level_2,
+                    "chapter_level_3": final_level_3,
+                }
+                child_records.append(child_record)
+        
+        # 批量插入父表
+        if parent_records:
+            count, error = insert_to_collection(
+                client, PARENT_COLLECTION_NAME, parent_records, embeddings
+            )
+            if error:
+                result["error"] = f"父表插入失败: {error}"
+                result["status"] = "failed"
+                return result
+            result["parent_count"] = count
+        
+        # 批量插入子表
+        if child_records:
+            count, error = insert_to_collection(
+                client, CHILD_COLLECTION_NAME, child_records, embeddings
+            )
+            if error:
+                result["error"] = f"子表插入失败: {error}"
+                result["status"] = "failed"
+                return result
+            result["child_count"] = count
+        
+        result["success"] = True
+        result["status"] = "success"
+        mark_file_processed(json_path.name, doc_id, progress)
+        
+    except json.JSONDecodeError as e:
+        result["error"] = f"JSON解析错误: {str(e)}"
+        result["status"] = "failed"
+    except Exception as e:
+        result["error"] = f"处理异常: {str(e)}"
+        result["status"] = "failed"
+    
+    return result
+
+
+def process_folder(folder_path: str | Path, progress: Dict[str, Any]) -> Dict[str, Any]:
+    """
+    处理文件夹中的所有JSON文件
+    
+    Args:
+        folder_path: 文件夹路径
+        progress: 处理进度字典
+    
+    Returns:
+        处理统计信息
+    """
+    folder_path = Path(folder_path)
+    
+    if not folder_path.exists():
+        raise FileNotFoundError(f"文件夹不存在: {folder_path}")
+    
+    if not folder_path.is_dir():
+        raise NotADirectoryError(f"不是有效的文件夹: {folder_path}")
+    
+    client = get_milvusclient()
+    
+    try:
+        embeddings = get_embeddings()
+    except Exception as e:
+        raise RuntimeError(f"获取embeddings失败: {e}")
+    
+    # 查找所有JSON文件
+    json_files = list(folder_path.glob("*.json"))
+    
+    print(f"🔍 找到 {len(json_files)} 个JSON文件")
+    print("-" * 60)
+    
+    stats = {
+        "total": len(json_files),
+        "success": 0,
+        "failed": 0,
+        "skipped": 0,
+        "total_parent": 0,
+        "total_child": 0,
+        "failed_items": [],
+        "success_items": [],
+        "skipped_items": []
+    }
+    
+    for i, json_file in enumerate(json_files, 1):
+        print(f"[{i}/{len(json_files)}] 处理: {json_file.name}", end=" ")
+        
+        result = process_json_file(
+            json_file, client, embeddings, progress
+        )
+        
+        if result["status"] == "success":
+            print(f"✅ parent: {result['parent_count']}, child: {result['child_count']}")
+            stats["success"] += 1
+            stats["total_parent"] += result["parent_count"]
+            stats["total_child"] += result["child_count"]
+            stats["success_items"].append({
+                "file": result["file"],
+                "parent_count": result["parent_count"],
+                "child_count": result["child_count"]
+            })
+        elif result["status"] == "skipped":
+            print(f"⏭️ {result.get('reason', '已处理过')}")
+            stats["skipped"] += 1
+            stats["skipped_items"].append({
+                "file": result["file"],
+                "reason": result.get("reason", "")
+            })
+        else:
+            print(f"❌ {result['error']}")
+            stats["failed"] += 1
+            stats["failed_items"].append({
+                "file": result["file"],
+                "error": result["error"]
+            })
+            mark_file_failed(json_file.name, "", result["error"], progress)
+        
+        # 每处理10个文件保存一次进度
+        if i % 10 == 0:
+            save_progress(progress)
+    
+    return stats
+
+
+def print_summary(stats: Dict[str, Any], progress: Dict[str, Any]):
+    """打印处理摘要"""
+    print("\n" + "=" * 60)
+    print("📈 处理结果统计")
+    print("=" * 60)
+    print(f"📁 总文件数: {stats['total']}")
+    print(f"✅ 成功: {stats['success']}")
+    print(f"❌ 失败: {stats['failed']}")
+    print(f"⏭️ 跳过(已处理): {stats['skipped']}")
+    print(f"📊 父表记录: {stats['total_parent']}")
+    print(f"📊 子表记录: {stats['total_child']}")
+    
+    if stats.get('success_items'):
+        print(f"\n✅ 成功上传列表(前10条):")
+        for item in stats['success_items'][:10]:
+            print(f"   - {item['file']} (parent: {item['parent_count']}, child: {item['child_count']})")
+        if len(stats['success_items']) > 10:
+            print(f"   ... 等共 {len(stats['success_items'])} 条")
+    
+    if stats.get('failed_items'):
+        print(f"\n❌ 失败列表(前5条):")
+        for item in stats['failed_items'][:5]:
+            print(f"   - {item['file']}: {item['error']}")
+        if len(stats['failed_items']) > 5:
+            print(f"   ... 等共 {len(stats['failed_items'])} 条")
+    
+    print(f"\n💾 累计处理: {progress.get('total_processed', 0)} 个文件")
+    print(f"❌ 累计失败: {len(progress.get('failed_files', []))} 个文件")
+    print("=" * 60)
+
+
+# ==================== 主函数 ====================
+
+def main():
+    """主入口函数"""
+    print("=" * 60)
+    print("施工方案知识结构体系数据导入工具 - 断点续传版")
+    print("=" * 60)
+    print(f"📂 JSON文件夹: {JSON_FOLDER}")
+    print(f"🔗 Milvus: {settings.MILVUS_HOST}:{settings.MILVUS_PORT}/{settings.MILVUS_DB}")
+    print(f"📊 父表: {PARENT_COLLECTION_NAME}")
+    print(f"📊 子表: {CHILD_COLLECTION_NAME}")
+    print(f"💾 保存进度: {'是' if SAVE_PROGRESS else '否'}")
+    print("-" * 60)
+    
+    try:
+        # 加载进度
+        progress = load_progress()
+        if progress.get("processed_files"):
+            print(f"📋 发现历史进度: {len(progress['processed_files'])} 个文件已处理")
+        
+        # 处理JSON文件
+        print("\n📋 步骤1: 处理JSON文件...")
+        stats = process_folder(JSON_FOLDER, progress)
+        
+        # 3. 保存最终进度
+        save_progress(progress)
+        
+        # 4. 保存失败报告
+        with open(FAILED_REPORT_PATH, "w", encoding="utf-8") as f:
+            json.dump(stats["failed_items"], f, ensure_ascii=False, indent=2)
+        
+        # 5. 保存已上传文件清单
+        save_uploaded_files_report(stats, UPLOADED_FILES_PATH)
+        
+        # 6. 打印摘要
+        print_summary(stats, progress)
+        
+        print(f"\n📝 失败报告: {FAILED_REPORT_PATH}")
+        print(f"📄 已上传文件清单: {UPLOADED_FILES_PATH}")
+        if SAVE_PROGRESS:
+            print(f"💾 进度文件: {PROGRESS_FILE_PATH}")
+        print("=" * 60)
+        
+    except Exception as e:
+        print(f"\n❌ 程序异常: {str(e)}")
+        import traceback
+        traceback.print_exc()
+
+
+if __name__ == "__main__":
+    main()