Просмотр исходного кода

feat: 编制依据MD文档实体抽取入库脚本

ai02 4 недель назад
Родитель
Сommit
947164d56c
1 измененных файлов с 932 добавлено и 0 удалено
  1. 932 0
      src/app/scripts/plan_entity_import_engine.py

+ 932 - 0
src/app/scripts/plan_entity_import_engine.py

@@ -0,0 +1,932 @@
+"""
+建筑工程编制依据MD文档实体抽取入库脚本
+
+功能说明:
+    读取建筑工程编制依据类MD文档,抽取工程实体(标准名称、标准编号、专业术语),
+    生成向量并批量写入Milvus的first_bfp_collection_entity Collection。
+
+作者: 数据治理工程师
+版本: 1.0.0
+"""
+from __future__ import annotations
+
+import json
+import os
+import re
+import sys
+import uuid
+import warnings
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Set, Tuple
+
+import numpy as np
+
+# 忽略警告信息
+warnings.filterwarnings("ignore")
+
+# ==================== 配置区 ====================
+
+class Config:
+    """集中配置管理类"""
+    
+    # 1. 数据源配置
+    # MD文档所在目录,支持相对路径或绝对路径
+    # 脚本会读取该目录下所有子文件夹中的.md文件(只读一层子目录)
+    MD_SOURCE_DIR: str = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终编制依据"
+    
+    # 2. Milvus配置
+    COLLECTION_NAME: str = "first_bfp_collection_entity"
+    VECTOR_DIM: int = 4096
+    
+    # 3. 批量处理配置
+    BATCH_SIZE: int = 100
+    VECTOR_BATCH_SIZE: int = 32
+    
+    # 4. 实体抽取配置
+    MAX_ENTITIES_PER_DOC: int = 200
+    MIN_ENTITY_LENGTH: int = 2
+    MAX_ENTITY_LENGTH: int = 50
+    
+    # 5. 日志与错误处理
+    LOG_DIR: str = "./logs"
+    ERROR_LOG: str = "./logs/import_errors.log"
+    CHECKPOINT_FILE: str = "./logs/import_checkpoint.json"
+    
+    # 6. 进度显示
+    SHOW_PROGRESS: bool = True
+
+
+# ==================== 全局单例管理器 ====================
+
+class SingletonManager:
+    """全局单例管理器"""
+    
+    _embeddings_client = None
+    _jieba_initialized = False
+    _milvus_client = None
+    
+    @classmethod
+    def get_embeddings_client(cls):
+        """获取Embedding客户端"""
+        if cls._embeddings_client is None:
+            print("正在初始化Embedding客户端...")
+            try:
+                from app.config.embeddings import get_embeddings
+                cls._embeddings_client = get_embeddings()
+                print("Embedding客户端初始化完成")
+            except Exception as e:
+                print(f"Embedding客户端初始化失败: {e}")
+                raise
+        return cls._embeddings_client
+    
+    @classmethod
+    def init_jieba(cls):
+        """初始化jieba分词"""
+        if not cls._jieba_initialized:
+            try:
+                import jieba
+                import jieba.posseg as pseg
+                
+                domain_words = [
+                    '混凝土结构', '钢筋混凝土', '预应力混凝土', '框架结构',
+                    '剪力墙', '基础工程', '地基处理', '桩基础', '基坑支护',
+                    '施工方案', '安全检查', '质量验收', '技术规范',
+                    '延性构件', '构造细节', '抗震设计', '荷载组合'
+                ]
+                for word in domain_words:
+                    jieba.add_word(word, freq=1000)
+                
+                cls._jieba_initialized = True
+                print("jieba分词初始化完成")
+            except ImportError:
+                print("jieba未安装,将使用基础分词")
+                raise
+        return cls._jieba_initialized
+    
+    @classmethod
+    def get_milvus_client(cls):
+        """获取Milvus客户端"""
+        if cls._milvus_client is None:
+            try:
+                from app.config.milvus_client import get_milvusclient
+                cls._milvus_client = get_milvusclient()
+                print("Milvus客户端初始化完成")
+            except Exception as e:
+                print(f"Milvus客户端初始化失败: {e}")
+                raise
+        return cls._milvus_client
+
+
+# ==================== 数据模型 ====================
+
+@dataclass
+class Entity:
+    """实体数据模型"""
+    text: str
+    entity_type: str
+    source_sentence: str
+    position: int
+    sentence_title: str = ""
+    
+    def __post_init__(self):
+        self.text = self.text.strip('\'"、,。;:!?\n\t')
+
+
+@dataclass
+class Document:
+    """文档数据模型"""
+    file_name: str
+    content: str
+    raw_sentences: List[str]
+    entities: List[Entity] = field(default_factory=list)
+
+
+# ==================== MD标题提取器 ====================
+
+class MDTitleExtractor:
+    """MD标题提取器"""
+    
+    @staticmethod
+    def find_sentence_title(md_content: str, position: int) -> str:
+        """根据字符位置找到该位置所属的标题"""
+        lines_before = md_content[:position].count('\n')
+        
+        titles = []
+        for line_no, line in enumerate(md_content.split('\n')):
+            match = re.match(r'^(#{1,6})\s+(.+)$', line.strip())
+            if match:
+                level = len(match.group(1))
+                title_text = re.sub(r'\*\*|__|\*|_', '', match.group(2).strip())
+                titles.append((level, title_text, line_no))
+        
+        if not titles:
+            return ""
+        
+        current_title = ""
+        current_level = 999
+        
+        for level, title_text, line_no in titles:
+            if line_no <= lines_before:
+                if level <= current_level:
+                    current_title = title_text
+                    current_level = level
+            else:
+                break
+        
+        return current_title
+
+
+# ==================== 文本清理工具 ====================
+
+class TextCleaner:
+    """文本清理工具类"""
+    
+    @staticmethod
+    def clean_markdown(text: str) -> str:
+        """清理Markdown格式"""
+        if not text:
+            return ""
+        
+        text = re.sub(r'```[\s\S]*?```', ' ', text)
+        text = re.sub(r'`[^`]*`', ' ', text)
+        text = re.sub(r'<[^>]+>', ' ', text)
+        text = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', text)
+        text = re.sub(r'!\[[^\]]*\]\([^\)]+\)', ' ', text)
+        text = re.sub(r'^#+\s*', ' ', text, flags=re.MULTILINE)
+        text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text)
+        text = re.sub(r'__([^_]+)__', r'\1', text)
+        text = re.sub(r'\*([^*]+)\*', r'\1', text)
+        text = re.sub(r'_([^_]+)_', r'\1', text)
+        text = re.sub(r'~~([^~]+)~~', r'\1', text)
+        text = re.sub(r'\|', ' ', text)
+        text = re.sub(r'^[\s\-:]+$', ' ', text, flags=re.MULTILINE)
+        text = re.sub(r'^[\s]*[-*+][\s]+', ' ', text, flags=re.MULTILINE)
+        text = re.sub(r'^[\s]*\d+\.[\s]+', ' ', text, flags=re.MULTILINE)
+        text = re.sub(r'\n+', '\n', text)
+        text = re.sub(r'[ \t]+', ' ', text)
+        
+        return text.strip()
+    
+    @staticmethod
+    def split_sentences(text: str) -> List[str]:
+        """将文本分割为句子列表"""
+        if not text:
+            return []
+        
+        pattern = r'([。!?;\n]+)'
+        parts = re.split(pattern, text)
+        
+        sentences = []
+        current = ""
+        
+        for part in parts:
+            if re.match(pattern, part):
+                current += part
+                if len(current.strip()) >= 10:
+                    sentences.append(current.strip())
+                current = ""
+            else:
+                current = part
+        
+        if current.strip() and len(current.strip()) >= 10:
+            sentences.append(current.strip())
+        
+        return sentences
+
+
+# ==================== 实体抽取器 ====================
+
+class EntityExtractor:
+    """实体抽取器"""
+    
+    STANDARD_NAME_PATTERN = re.compile(r'《([^》]{2,100}?)》')
+    
+    STANDARD_CODE_PATTERNS = [
+        re.compile(r'GB\s*/?T?\s*\d+[\./\-]?\d*(?:[-–—]\d{4})?', re.IGNORECASE),
+        re.compile(r'JTG\s*[TD]?\s*\d+[\./\-]?\d*[a-zA-Z]?', re.IGNORECASE),
+        re.compile(r'JTJ\s*\d+[\./\-]?\d*', re.IGNORECASE),
+        re.compile(r'JGJ\s*\d+[\./\-]?\d*', re.IGNORECASE),
+        re.compile(r'CJJ\s*\d+[\./\-]?\d*', re.IGNORECASE),
+        re.compile(r'TB\s*\d+[\./\-]?\d*', re.IGNORECASE),
+        re.compile(r'SL\s*\d+[\./\-]?\d*', re.IGNORECASE),
+        re.compile(r'DL\s*/?T?\s*\d+[\./\-]?\d*', re.IGNORECASE),
+        re.compile(r'CECS\s*\d+[::]?\d*', re.IGNORECASE),
+        re.compile(r'建标\s*\d+[\./\-]?\d*', re.IGNORECASE),
+    ]
+    
+    STOP_WORDS = {
+        '本规范', '本规定', '本标准', '本章', '本节', '本条',
+        '例如', '如下', '所示', '所述', '上述', '下列',
+        '第一', '第二', '第三', '之一', '之二',
+    }
+    
+    DAILY_WORDS = {
+        '东西', '地方', '事情', '情况', '问题', '部分', '方面',
+        '原因', '结果', '方式', '方法', '过程', '阶段', '时期',
+        '进行', '完成', '实现', '开展', '推进', '落实', '执行',
+        '实施', '采取', '采用', '使用', '利用', '应用',
+        '重要', '主要', '基本', '一般', '特殊', '特定', '具体',
+        '相应', '有关', '相关', '其他', '其余', '必要', '可能',
+        '非常', '十分', '特别', '比较', '相当', '很', '最', '更',
+        '一些', '一点', '许多', '很多', '大量', '部分', '各种',
+        '关于', '对于', '根据', '按照', '通过', '经过', '随着',
+        '目前', '当前', '以前', '以后', '之后', '之前', '当时',
+        '现在', '今天', '明天', '昨天', '近期', '长期',
+        '这里', '那里', '哪里', '到处', '处处',
+        '我们', '你们', '他们', '大家', '有人', '人们',
+        '是否', '能否', '可否', '会不会', '能不能',
+    }
+    
+    @classmethod
+    def extract(cls, doc: Document, raw_content: str = "") -> List[Entity]:
+        """从文档中抽取实体"""
+        entities = []
+        seen_texts: Set[str] = set()
+        content = doc.content
+        sentences = doc.raw_sentences
+        
+        def find_source_sentence(text: str, position: int) -> str:
+            current_pos = 0
+            for sent in sentences:
+                sent_len = len(sent)
+                if current_pos <= position < current_pos + sent_len:
+                    return sent
+                current_pos += sent_len
+            return sentences[min(position // 100, len(sentences)-1)] if sentences else ""
+        
+        def get_title(position: int) -> str:
+            if raw_content:
+                return MDTitleExtractor.find_sentence_title(raw_content, position)
+            return ""
+        
+        # 1. 抽取标准名称
+        for match in cls.STANDARD_NAME_PATTERN.finditer(content):
+            name = match.group(1).strip()
+            if cls._is_valid_entity(name) and not cls._is_daily_word(name):
+                full_name = f"《{name}》"
+                if full_name.lower() not in seen_texts:
+                    seen_texts.add(full_name.lower())
+                    entities.append(Entity(
+                        text=full_name,
+                        entity_type="standard_name",
+                        source_sentence=find_source_sentence(full_name, match.start()),
+                        position=match.start(),
+                        sentence_title=get_title(match.start())
+                    ))
+        
+        # 2. 抽取标准编号
+        for pattern in cls.STANDARD_CODE_PATTERNS:
+            for match in pattern.finditer(content):
+                code = match.group(0).strip()
+                code = re.sub(r'\s+', '', code)
+                code_upper = code.upper()
+                if code_upper not in seen_texts:
+                    seen_texts.add(code_upper)
+                    entities.append(Entity(
+                        text=code_upper,
+                        entity_type="standard_number",
+                        source_sentence=find_source_sentence(code, match.start()),
+                        position=match.start(),
+                        sentence_title=get_title(match.start())
+                    ))
+        
+        # 3. 使用jieba抽取专业术语
+        try:
+            import jieba
+            import jieba.posseg as pseg
+            
+            words_pos = list(pseg.cut(content))
+            
+            i = 0
+            while i < len(words_pos):
+                pair = words_pos[i]
+                word = pair.word
+                flag = pair.flag
+                
+                if flag.startswith(('n', 'nz', 'j')) and len(word) >= Config.MIN_ENTITY_LENGTH:
+                    phrase = [word]
+                    j = i + 1
+                    while j < len(words_pos):
+                        next_pair = words_pos[j]
+                        next_word = next_pair.word
+                        next_flag = next_pair.flag
+                        if (next_flag.startswith(('n', 'nz', 'v', 'a')) and
+                            len(next_word) >= 2):
+                            phrase.append(next_word)
+                            j += 1
+                        else:
+                            break
+                    
+                    phrase_text = ''.join(phrase)
+                    
+                    if (Config.MIN_ENTITY_LENGTH <= len(phrase_text) <= Config.MAX_ENTITY_LENGTH and
+                        cls._is_valid_entity(phrase_text) and
+                        not cls._is_daily_word(phrase_text) and
+                        phrase_text.lower() not in seen_texts):
+                        
+                        seen_texts.add(phrase_text.lower())
+                        position = content.find(phrase_text, i)
+                        entities.append(Entity(
+                            text=phrase_text,
+                            entity_type="technical_term",
+                            source_sentence=find_source_sentence(phrase_text, position),
+                            position=position if position >= 0 else i,
+                            sentence_title=get_title(position if position >= 0 else i)
+                        ))
+                    
+                    i = j if j > i + 1 else i + 1
+                else:
+                    i += 1
+                    
+        except ImportError:
+            print("jieba未安装,跳过专业术语抽取")
+        
+        entities.sort(key=lambda x: x.position)
+        return entities[:Config.MAX_ENTITIES_PER_DOC]
+    
+    @classmethod
+    def _is_valid_entity(cls, text: str) -> bool:
+        """验证实体是否有效"""
+        if text in cls.STOP_WORDS:
+            return False
+        
+        if text.isdigit():
+            return False
+        
+        import string
+        punctuation = string.punctuation + '。,、;:!?()【】《》〈〉「」『』〔〕[]{}'
+        if all(c in punctuation or c.isspace() for c in text):
+            return False
+        
+        if len(text.strip()) < Config.MIN_ENTITY_LENGTH:
+            return False
+        
+        return True
+    
+    @classmethod
+    def _is_daily_word(cls, text: str) -> bool:
+        """检查是否为日常用语"""
+        text = text.strip()
+        
+        if text in cls.DAILY_WORDS:
+            return True
+        
+        if len(text) <= 4:
+            for word in cls.DAILY_WORDS:
+                if word in text:
+                    return True
+        
+        return False
+
+
+# ==================== 向量生成器 ====================
+
+class VectorGenerator:
+    """向量生成器"""
+    
+    @staticmethod
+    def generate(texts: List[str]) -> List[List[float]]:
+        """批量生成向量"""
+        if not texts:
+            return []
+        
+        client = SingletonManager.get_embeddings_client()
+        vectors = []
+        
+        for i in range(0, len(texts), Config.VECTOR_BATCH_SIZE):
+            batch = texts[i:i + Config.VECTOR_BATCH_SIZE]
+            
+            try:
+                embeddings = client.embed_documents(batch)
+                
+                for emb in embeddings:
+                    vec = VectorGenerator._adjust_dimension(emb, Config.VECTOR_DIM)
+                    vectors.append(vec)
+                    
+            except Exception as e:
+                print(f"向量生成失败: {e}")
+                for _ in batch:
+                    vectors.append([0.0] * Config.VECTOR_DIM)
+        
+        return vectors
+    
+    @staticmethod
+    def _adjust_dimension(vector: List[float], target_dim: int) -> List[float]:
+        """调整向量维度"""
+        current_dim = len(vector)
+        
+        if current_dim < target_dim:
+            vector = vector + [0.0] * (target_dim - current_dim)
+        elif current_dim > target_dim:
+            vector = vector[:target_dim]
+        
+        return vector
+
+
+# ==================== Background提取器 ====================
+
+class DocumentBackgroundExtractor:
+    """文档级背景信息提取器"""
+    
+    PUBLISH_PATTERNS = [
+        (r'由\s*([^,。\n]{2,30}?)\s*(?:发布|制定|颁发|出台|批准)', '发布单位'),
+        (r'([^,。\n]{2,30}?)\s*(?:发布|制定|颁发|出台)', '发布单位'),
+    ]
+    
+    REPLACE_PATTERNS = [
+        (r'代替\s*《?([^》]{2,50}?)》?', '代替标准'),
+        (r'被\s*《?([^》]{2,50}?)》?\s*代替', '被标准代替'),
+        (r'(?:自\s*[\d年月日\-]+\s*起)?\s*废止', '已废止'),
+        (r'原\s*([^\s]+)\s*同时废止', '原标准废止'),
+        (r'自\s*(\d{4}年\d{1,2}月\d{1,2}日|\d{4}-\d{2}-\d{2})\s*起\s*废止', '废止日期'),
+    ]
+    
+    MANAGE_PATTERNS = [
+        (r'(?:主编单位|主编部门)[::]\s*([^,。\n]{2,50})', '主编单位'),
+        (r'(?:参编单位|参编部门)[::]\s*([^,。\n]{2,50})', '参编单位'),
+        (r'(?:解释单位|技术归口)[::]\s*([^,。\n]{2,50})', '解释单位'),
+        (r'由\s*([^,。]{2,30})\s*负责解释', '负责解释'),
+        (r'批准部门[::]\s*([^,。\n]{2,50})', '批准部门'),
+        (r'归口单位[::]\s*([^,。\n]{2,50})', '归口单位'),
+    ]
+    
+    REFERENCE_PATTERNS = [
+        (r'应符合\s*《?([^》]{2,50}?)》?\s*(?:GB|JTG|JTJ|JGJ|CJJ)[/T]?\s*\d+[\-]?\d*\s*的?规定', '应符合'),
+        (r'应遵守\s*《?([^》]{2,50}?)》?', '应遵守'),
+        (r'参照\s*《?([^》]{2,50}?)》?', '参照'),
+        (r'引用\s*《?([^》]{2,50}?)》?', '引用'),
+        (r'依据\s*《?([^》]{2,50}?)》?', '依据'),
+    ]
+    
+    SCOPE_PATTERNS = [
+        (r'适用(?:于)?\s*([^。,]{3,30}?)(?:的)?\s*(?:设计|施工|验收|检测|勘察|监理)', '适用阶段'),
+        (r'适用(?:于)?\s*([^。,]{3,30}?)\s*工程', '适用工程'),
+        (r'适用(?:于)?\s*(新建|改建|扩建)\s*([^。,]{3,20}?)', '适用类型'),
+        (r'(公路|桥梁|隧道|路基|路面|涵洞|交通工程)\s*(?:的)?\s*(?:设计|施工|验收)', '工程类型'),
+        (r'适用(?:于)?\s*(全国|各省|自治区|直辖市|华北地区|华东地区|华南地区|西南地区)', '适用地区'),
+    ]
+    
+    @classmethod
+    def extract(cls, content: str) -> Dict[str, List[str]]:
+        """提取文档级背景信息"""
+        backgrounds = {
+            '发布关系': [],
+            '替代关系': [],
+            '管理关系': [],
+            '引用关系': [],
+            '废止状态': [],
+            '适用范围': [],
+        }
+        
+        for pattern, rel_type in cls.PUBLISH_PATTERNS:
+            for match in re.finditer(pattern, content):
+                target = match.group(1).strip() if match.groups() else match.group(0)
+                if target:
+                    backgrounds['发布关系'].append(f"{rel_type}:{target}")
+        
+        for pattern, rel_type in cls.REPLACE_PATTERNS:
+            for match in re.finditer(pattern, content):
+                target = match.group(1).strip() if match.groups() else match.group(0)
+                if target:
+                    backgrounds['替代关系'].append(f"{rel_type}:{target}")
+        
+        for pattern, rel_type in cls.MANAGE_PATTERNS:
+            for match in re.finditer(pattern, content):
+                target = match.group(1).strip() if match.groups() else match.group(0)
+                if target:
+                    backgrounds['管理关系'].append(f"{rel_type}:{target}")
+        
+        for pattern, rel_type in cls.REFERENCE_PATTERNS:
+            for match in re.finditer(pattern, content):
+                target = match.group(1).strip() if match.groups() else match.group(0)
+                if target:
+                    backgrounds['引用关系'].append(f"{rel_type}:{target}")
+        
+        for pattern, scope_type in cls.SCOPE_PATTERNS:
+            matches = re.findall(pattern, content)
+            for match in matches:
+                if isinstance(match, tuple):
+                    scope_text = ''.join(match)
+                else:
+                    scope_text = match
+                if scope_text:
+                    backgrounds['适用范围'].append(f"{scope_type}:{scope_text.strip()}")
+        
+        for key in backgrounds:
+            backgrounds[key] = list(dict.fromkeys(backgrounds[key]))[:3]
+        
+        return backgrounds
+
+
+class BackgroundMatcher:
+    """Background匹配器"""
+    
+    MAX_BACKGROUNDS = 5
+    
+    @classmethod
+    def match(cls, entity: Entity, sentences: List[str], doc_backgrounds: Dict[str, List[str]]) -> List[str]:
+        """为实体匹配background"""
+        backgrounds = []
+        entity_text = entity.text
+        
+        entity_sentences = []
+        for sent in sentences:
+            if entity_text in sent:
+                clean_sent = sent.strip()
+                if clean_sent and clean_sent not in entity_sentences:
+                    entity_sentences.append(clean_sent)
+                    if len(entity_sentences) >= 2:
+                        break
+        
+        if not entity_sentences and len(entity_text) > 5:
+            short_text = entity_text[:5]
+            for sent in sentences:
+                if short_text in sent:
+                    clean_sent = sent.strip()
+                    if clean_sent and clean_sent not in entity_sentences:
+                        entity_sentences.append(clean_sent)
+                        if len(entity_sentences) >= 2:
+                            break
+        
+        if not entity_sentences and entity.source_sentence:
+            entity_sentences.append(entity.source_sentence)
+        
+        backgrounds.extend(entity_sentences)
+        
+        if entity.entity_type in ['standard_name', 'standard_number']:
+            backgrounds.extend(doc_backgrounds.get('发布关系', [])[:1])
+            backgrounds.extend(doc_backgrounds.get('替代关系', [])[:1])
+            backgrounds.extend(doc_backgrounds.get('废止状态', [])[:1])
+        
+        if len(backgrounds) < cls.MAX_BACKGROUNDS:
+            backgrounds.extend(doc_backgrounds.get('适用范围', [])[:1])
+        
+        if len(backgrounds) < cls.MAX_BACKGROUNDS:
+            backgrounds.extend(doc_backgrounds.get('管理关系', [])[:1])
+        
+        if len(backgrounds) < cls.MAX_BACKGROUNDS:
+            backgrounds.extend(doc_backgrounds.get('引用关系', [])[:1])
+        
+        unique_backgrounds = []
+        seen = set()
+        for bg in backgrounds:
+            if bg and bg not in seen:
+                unique_backgrounds.append(bg)
+                seen.add(bg)
+                if len(unique_backgrounds) >= cls.MAX_BACKGROUNDS:
+                    break
+        
+        return unique_backgrounds
+
+
+# ==================== 日志与断点管理 ====================
+
+class Logger:
+    """日志记录器"""
+    
+    @staticmethod
+    def setup():
+        log_dir = Path(Config.LOG_DIR)
+        log_dir.mkdir(parents=True, exist_ok=True)
+    
+    @staticmethod
+    def error(file_name: str, error_msg: str):
+        from datetime import datetime
+        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+        log_entry = f"[{timestamp}] {file_name}: {error_msg}\n"
+        
+        try:
+            with open(Config.ERROR_LOG, "a", encoding="utf-8") as f:
+                f.write(log_entry)
+        except Exception:
+            pass
+
+
+class CheckpointManager:
+    """断点续传管理器"""
+    
+    @staticmethod
+    def load() -> Set[str]:
+        checkpoint_file = Path(Config.CHECKPOINT_FILE)
+        if checkpoint_file.exists():
+            try:
+                with open(checkpoint_file, "r", encoding="utf-8") as f:
+                    data = json.load(f)
+                    return set(data.get("processed_files", []))
+            except Exception:
+                return set()
+        return set()
+    
+    @staticmethod
+    def save(processed_files: Set[str]):
+        try:
+            checkpoint_file = Path(Config.CHECKPOINT_FILE)
+            checkpoint_file.parent.mkdir(parents=True, exist_ok=True)
+            
+            with open(checkpoint_file, "w", encoding="utf-8") as f:
+                json.dump({
+                    "processed_files": list(processed_files),
+                    "total": len(processed_files)
+                }, f, ensure_ascii=False)
+        except Exception:
+            pass
+
+
+# ==================== 核心处理类 ====================
+
+class MdEntityImporter:
+    """MD文档实体导入器"""
+    
+    def __init__(self):
+        self.processed_count = 0
+        self.error_count = 0
+        self.total_entities = 0
+        self.processed_files: Set[str] = CheckpointManager.load()
+        
+        Logger.setup()
+        SingletonManager.init_jieba()
+        
+        print(f"断点续传:已处理 {len(self.processed_files)} 个文件")
+    
+    def process_file(self, md_path: Path) -> Optional[List[Dict]]:
+        """处理单个MD文件"""
+        file_name = md_path.name
+        
+        if file_name in self.processed_files:
+            return []
+        
+        try:
+            with open(md_path, "r", encoding="utf-8") as f:
+                raw_content = f.read()
+            
+            if not raw_content.strip():
+                return []
+            
+            clean_content = TextCleaner.clean_markdown(raw_content)
+            sentences = TextCleaner.split_sentences(clean_content)
+            
+            doc = Document(
+                file_name=file_name,
+                content=clean_content,
+                raw_sentences=sentences
+            )
+            
+            doc.entities = EntityExtractor.extract(doc, raw_content)
+            doc_backgrounds = DocumentBackgroundExtractor.extract(clean_content)
+            
+            if not doc.entities:
+                self.processed_files.add(file_name)
+                return []
+            
+            rows = []
+            entity_texts = []
+            file_seen_texts: Set[str] = set()
+            
+            for entity in doc.entities:
+                if entity.text.lower() in file_seen_texts:
+                    continue
+                file_seen_texts.add(entity.text.lower())
+                
+                backgrounds = BackgroundMatcher.match(entity, sentences, doc_backgrounds)
+                
+                title = entity.sentence_title if entity.sentence_title else file_name
+                
+                row = {
+                    "text": entity.text,
+                    "uuid": str(uuid.uuid4()),
+                    "file": file_name,
+                    "title": title,
+                    "background": json.dumps(backgrounds, ensure_ascii=False),
+                    "_vector_text": entity.text
+                }
+                rows.append(row)
+                entity_texts.append(entity.text)
+            
+            vectors = VectorGenerator.generate(entity_texts)
+            
+            for i, row in enumerate(rows):
+                row["dense"] = vectors[i]
+                del row["_vector_text"]
+            
+            self.processed_files.add(file_name)
+            return rows
+            
+        except Exception as e:
+            error_msg = str(e)
+            Logger.error(file_name, error_msg)
+            self.error_count += 1
+            print(f"处理失败: {error_msg[:50]}")
+            return None
+    
+    def batch_insert_to_milvus(self, client, rows: List[Dict]) -> Tuple[int, int]:
+        """批量插入Milvus"""
+        if not rows:
+            return 0, 0
+        
+        success_count = 0
+        fail_count = 0
+        
+        for i in range(0, len(rows), Config.BATCH_SIZE):
+            batch = rows[i:i + Config.BATCH_SIZE]
+            
+            try:
+                milvus_data = []
+                for row in batch:
+                    milvus_data.append({
+                        "text": row["text"],
+                        "dense": row["dense"],
+                        "uuid": row["uuid"],
+                        "file": row["file"],
+                        "title": row["title"],
+                        "background": row["background"]
+                    })
+                
+                client.insert(
+                    collection_name=Config.COLLECTION_NAME,
+                    data=milvus_data
+                )
+                success_count += len(batch)
+                
+            except Exception as e:
+                print(f"插入失败: {str(e)[:50]}")
+                fail_count += len(batch)
+        
+        return success_count, fail_count
+    
+    def run(self):
+        """主运行流程"""
+        print("=" * 70)
+        print("建筑工程编制依据MD文档实体抽取入库")
+        print("=" * 70)
+        print(f"源目录: {Config.MD_SOURCE_DIR}")
+        print(f"Collection: {Config.COLLECTION_NAME}")
+        print(f"向量维度: {Config.VECTOR_DIM}")
+        print(f"批量大小: {Config.BATCH_SIZE}")
+        print("=" * 70)
+        
+        client = SingletonManager.get_milvus_client()
+        
+        if not client.has_collection(collection_name=Config.COLLECTION_NAME):
+            print(f"Collection不存在: {Config.COLLECTION_NAME}")
+            return
+        
+        try:
+            client.load_collection(collection_name=Config.COLLECTION_NAME)
+            print(f"Collection已加载: {Config.COLLECTION_NAME}")
+        except Exception as e:
+            print(f"Collection加载警告: {e}")
+        
+        source_dir = Path(Config.MD_SOURCE_DIR)
+        if not source_dir.exists():
+            print(f"源目录不存在: {source_dir}")
+            return
+        
+        # 只读取子文件夹下的.md文件
+        md_files = []
+        for subdir in source_dir.iterdir():
+            if subdir.is_dir():
+                md_files.extend(subdir.glob("*.md"))
+        
+        total_files = len(md_files)
+        
+        if total_files == 0:
+            print("未找到MD文件")
+            return
+        
+        print(f"\n找到的文件示例(前3个):")
+        for i, f in enumerate(md_files[:3], 1):
+            try:
+                rel_path = f.relative_to(source_dir)
+            except ValueError:
+                rel_path = f.name
+            print(f"   {i}. {rel_path}")
+        if total_files > 3:
+            print(f"   ... 还有 {total_files - 3} 个文件")
+        
+        pending_files = [f for f in md_files if f.name not in self.processed_files]
+        
+        print(f"\n总文件数: {total_files}")
+        print(f"待处理: {len(pending_files)}")
+        print(f"已跳过: {len(self.processed_files)}")
+        print()
+        
+        if not pending_files:
+            print("所有文件已处理完成")
+            return
+        
+        all_rows = []
+        
+        iterator = pending_files
+        if Config.SHOW_PROGRESS:
+            try:
+                from tqdm import tqdm
+                iterator = tqdm(pending_files, desc="处理文件")
+            except ImportError:
+                pass
+        
+        for md_path in iterator:
+            rows = self.process_file(md_path)
+            
+            if rows is None:
+                continue
+            
+            if rows:
+                all_rows.extend(rows)
+                self.total_entities += len(rows)
+            
+            self.processed_count += 1
+            
+            if self.processed_count % 10 == 0:
+                CheckpointManager.save(self.processed_files)
+                
+                if all_rows:
+                    success, fail = self.batch_insert_to_milvus(client, all_rows)
+                    print(f"批量入库: 成功{success}条, 失败{fail}条")
+                    all_rows = []
+        
+        if all_rows:
+            success, fail = self.batch_insert_to_milvus(client, all_rows)
+            print(f"\n最终入库: 成功{success}条, 失败{fail}条")
+        
+        CheckpointManager.save(self.processed_files)
+        
+        try:
+            client.flush(collection_name=Config.COLLECTION_NAME)
+        except Exception:
+            pass
+        
+        print("\n" + "=" * 70)
+        print("处理完成统计")
+        print("=" * 70)
+        print(f"本次处理文件: {self.processed_count}")
+        print(f"本次抽取实体: {self.total_entities}")
+        print(f"失败文件数: {self.error_count}")
+        print(f"累计处理文件: {len(self.processed_files)}")
+        
+        try:
+            stats = client.get_collection_stats(collection_name=Config.COLLECTION_NAME)
+            print(f"Collection总实体数: {stats.get('row_count', 'N/A')}")
+        except Exception:
+            pass
+        
+        print("=" * 70)
+
+
+def main():
+    """主入口"""
+    try:
+        importer = MdEntityImporter()
+        importer.run()
+    except KeyboardInterrupt:
+        print("\n用户中断,已保存断点")
+        sys.exit(1)
+    except Exception as e:
+        print(f"\n程序异常: {e}")
+        import traceback
+        traceback.print_exc()
+        sys.exit(1)
+
+
+if __name__ == "__main__":
+    main()