|
|
@@ -0,0 +1,409 @@
|
|
|
+"""
|
|
|
+根据Excel和文件夹编号生成施工方案信息并保存为JSON。
|
|
|
+并在同一个JSON里补充 md 切分结果(parent / children),每条只保留:
|
|
|
+- parent_id
|
|
|
+- index
|
|
|
+- hierarchy
|
|
|
+- text
|
|
|
+"""
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+import json
|
|
|
+import re
|
|
|
+import uuid
|
|
|
+import hashlib
|
|
|
+from datetime import datetime
|
|
|
+from pathlib import Path
|
|
|
+from typing import Any, Optional, List, Dict, Tuple
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
+
|
|
|
+# ==================== 配置参数 ====================
|
|
|
+EXCEL_FILE = r"C:\Users\ZengChao\Desktop\plan_id.xlsx" # ✅ Excel文件路径
|
|
|
+ROOT_FOLDER = r"C:\Users\ZengChao\Desktop\施工方案文件夹" # ✅ 根文件夹路径
|
|
|
+SHEET_INDEX = 0 # 目标sheet索引(0为第一个sheet)
|
|
|
+
|
|
|
+PARENT_MAX_CHARS = 6000 # ✅ 父段最大长度(超长切片)
|
|
|
+CHILD_INDEX_START = 0 # ✅ children.index 起始
|
|
|
+EXCEL_ID_COLUMN = "ID" # ✅ Excel主键列名
|
|
|
+# ================================================
|
|
|
+
|
|
|
+
|
|
|
+# ====================
|
|
|
+# Markdown 切分逻辑(按你之前的逻辑抽出来)
|
|
|
+# ====================
|
|
|
+
|
|
|
+BLANK_SPLIT_RE = re.compile(r"\n\s*\n+")
|
|
|
+H1_RE = re.compile(r"^#\s+(.+?)\s*$", re.MULTILINE)
|
|
|
+
|
|
|
+
|
|
|
+def split_md_by_blank_lines(md: str) -> List[str]:
|
|
|
+ md = md.replace("\r\n", "\n").replace("\r", "\n")
|
|
|
+ parts = BLANK_SPLIT_RE.split(md)
|
|
|
+ return [p.strip() for p in parts if p.strip()]
|
|
|
+
|
|
|
+
|
|
|
+def is_heading_chunk(chunk: str):
|
|
|
+ first_line = chunk.split("\n", 1)[0].strip()
|
|
|
+ m = re.match(r"^(#{1,6})\s+(.+?)\s*$", first_line)
|
|
|
+ if not m:
|
|
|
+ return None
|
|
|
+ return len(m.group(1)), m.group(2).strip()
|
|
|
+
|
|
|
+
|
|
|
+def outline_path_str(path: List[str]) -> str:
|
|
|
+ return " > ".join(path)
|
|
|
+
|
|
|
+
|
|
|
+def guess_doc_name_from_filename(file_name: str) -> str:
|
|
|
+ return Path(file_name).stem
|
|
|
+
|
|
|
+
|
|
|
+def split_md_by_h1_sections(md: str) -> List[Tuple[str, str]]:
|
|
|
+ """
|
|
|
+ 按 '# 一级标题' 切成父段:
|
|
|
+ return: [(h1_title, section_text), ...]
|
|
|
+ - 如果最开始有内容(第一个#之前),将其作为 "__PREAMBLE__" 段
|
|
|
+ - section_text 包含该 # 行本身 + 直到下一个 # 之前的所有内容
|
|
|
+ - 如果全文没有任何 #,则返回一个默认段 ("__NO_H1__", 全文)
|
|
|
+ """
|
|
|
+ md = md.replace("\r\n", "\n").replace("\r", "\n")
|
|
|
+ matches = list(H1_RE.finditer(md))
|
|
|
+ if not matches:
|
|
|
+ txt = md.strip()
|
|
|
+ if not txt:
|
|
|
+ return []
|
|
|
+ return [("__NO_H1__", txt)]
|
|
|
+
|
|
|
+ sections: List[Tuple[str, str]] = []
|
|
|
+
|
|
|
+ first_match_start = matches[0].start()
|
|
|
+ preamble = md[:first_match_start].strip()
|
|
|
+ if preamble:
|
|
|
+ sections.append(("__PREAMBLE__", preamble))
|
|
|
+
|
|
|
+ for i, m in enumerate(matches):
|
|
|
+ title = m.group(1).strip()
|
|
|
+ start = m.start()
|
|
|
+ end = matches[i + 1].start() if i + 1 < len(matches) else len(md)
|
|
|
+ sec = md[start:end].strip()
|
|
|
+ if sec:
|
|
|
+ sections.append((title, sec))
|
|
|
+ return sections
|
|
|
+
|
|
|
+
|
|
|
+def make_parent_id(doc_name: str, h1_title: str, parent_seq: int) -> int:
|
|
|
+ """
|
|
|
+ 生成稳定 parent_id(父段组ID)
|
|
|
+ 同一个 # 一级标题段无论父表切成几条记录,都共享同一个 parent_id
|
|
|
+ """
|
|
|
+ raw = f"{doc_name}|{parent_seq}|{h1_title}".encode("utf-8")
|
|
|
+ return int(hashlib.sha1(raw).hexdigest()[:16], 16) & ((1 << 63) - 1)
|
|
|
+
|
|
|
+
|
|
|
+def split_text_by_max_chars(text: str, max_chars: int) -> List[str]:
|
|
|
+ """
|
|
|
+ 父段过长时切片:
|
|
|
+ - 优先在最大长度附近的空行边界切割(用按空行拆分再拼接的方式实现)
|
|
|
+ - 单个段落超过max_chars时才硬切
|
|
|
+ """
|
|
|
+ text = (text or "").strip()
|
|
|
+ if not text:
|
|
|
+ return []
|
|
|
+ if len(text) <= max_chars:
|
|
|
+ return [text]
|
|
|
+
|
|
|
+ chunks = split_md_by_blank_lines(text)
|
|
|
+ result: List[str] = []
|
|
|
+ current_slice = ""
|
|
|
+
|
|
|
+ for chunk in chunks:
|
|
|
+ if len(chunk) > max_chars:
|
|
|
+ if current_slice.strip():
|
|
|
+ result.append(current_slice.strip())
|
|
|
+ current_slice = ""
|
|
|
+ start = 0
|
|
|
+ while start < len(chunk):
|
|
|
+ result.append(chunk[start:start + max_chars].strip())
|
|
|
+ start += max_chars
|
|
|
+ else:
|
|
|
+ test_slice = current_slice + "\n\n" + chunk if current_slice else chunk
|
|
|
+ if len(test_slice) <= max_chars:
|
|
|
+ current_slice = test_slice
|
|
|
+ else:
|
|
|
+ if current_slice.strip():
|
|
|
+ result.append(current_slice.strip())
|
|
|
+ current_slice = chunk
|
|
|
+
|
|
|
+ if current_slice.strip():
|
|
|
+ result.append(current_slice.strip())
|
|
|
+
|
|
|
+ return [s for s in result if s]
|
|
|
+
|
|
|
+
|
|
|
+def build_parent_and_children_rows(
|
|
|
+ md_text: str,
|
|
|
+ file_name: str,
|
|
|
+ parent_max_chars: int = PARENT_MAX_CHARS,
|
|
|
+ child_index_start: int = CHILD_INDEX_START,
|
|
|
+) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
|
|
|
+ """
|
|
|
+ 输出你最终要的极简结构:
|
|
|
+ parent: [{parent_id,index,hierarchy,text}, ...]
|
|
|
+ children: [{parent_id,index,hierarchy,text}, ...]
|
|
|
+ """
|
|
|
+ doc_name = guess_doc_name_from_filename(file_name)
|
|
|
+
|
|
|
+ # 1) 父段:按 H1
|
|
|
+ parent_sections = split_md_by_h1_sections(md_text)
|
|
|
+
|
|
|
+ # parent_id stable mapping
|
|
|
+ parent_seq_to_parent_id: Dict[int, int] = {}
|
|
|
+ for parent_seq, (h1_title, _sec_text) in enumerate(parent_sections):
|
|
|
+ parent_seq_to_parent_id[parent_seq] = make_parent_id(
|
|
|
+ doc_name=doc_name,
|
|
|
+ h1_title=h1_title,
|
|
|
+ parent_seq=parent_seq,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 2) children:在各父段内部按空行切,并维护 hierarchy(outline_path)
|
|
|
+ children: List[Dict[str, Any]] = []
|
|
|
+ child_index = child_index_start
|
|
|
+
|
|
|
+ for parent_seq, (_h1_title, sec_text) in enumerate(parent_sections):
|
|
|
+ parent_id = parent_seq_to_parent_id[parent_seq]
|
|
|
+
|
|
|
+ chunks = split_md_by_blank_lines(sec_text)
|
|
|
+ heading_path: List[str] = []
|
|
|
+
|
|
|
+ for chunk in chunks:
|
|
|
+ heading_info = is_heading_chunk(chunk)
|
|
|
+ if heading_info:
|
|
|
+ level, title = heading_info
|
|
|
+ parent_path = heading_path[: level - 1]
|
|
|
+ hierarchy = outline_path_str(parent_path)
|
|
|
+ heading_path = parent_path + [title]
|
|
|
+ else:
|
|
|
+ hierarchy = outline_path_str(heading_path)
|
|
|
+
|
|
|
+ children.append(
|
|
|
+ {
|
|
|
+ "index": child_index,
|
|
|
+ "parent_id": int(parent_id),
|
|
|
+ "hierarchy": hierarchy,
|
|
|
+ "text": chunk,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ child_index += 1
|
|
|
+
|
|
|
+ # 3) parents:父段过长按 max_chars 切片,但 parent_id 不变
|
|
|
+ parents: List[Dict[str, Any]] = []
|
|
|
+ parent_row_index = 0 # 这里用“父表行顺序index”,保证唯一、可追溯
|
|
|
+
|
|
|
+ for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
|
|
|
+ parent_id = parent_seq_to_parent_id[parent_seq]
|
|
|
+
|
|
|
+ slices = split_text_by_max_chars(sec_text, parent_max_chars)
|
|
|
+ # hierarchy:父表层级(你之前逻辑)
|
|
|
+ if h1_title == "__PREAMBLE__":
|
|
|
+ hierarchy = doc_name
|
|
|
+ elif h1_title == "__NO_H1__":
|
|
|
+ hierarchy = ""
|
|
|
+ else:
|
|
|
+ hierarchy = h1_title
|
|
|
+
|
|
|
+ for _slice_idx, slice_text in enumerate(slices):
|
|
|
+ parents.append(
|
|
|
+ {
|
|
|
+ "index": parent_row_index,
|
|
|
+ "parent_id": int(parent_id),
|
|
|
+ "hierarchy": hierarchy,
|
|
|
+ "text": slice_text,
|
|
|
+ }
|
|
|
+ )
|
|
|
+ parent_row_index += 1
|
|
|
+
|
|
|
+ return parents, children
|
|
|
+
|
|
|
+
|
|
|
+# ====================
|
|
|
+# 主业务:Excel + 文件夹遍历
|
|
|
+# ====================
|
|
|
+
|
|
|
+class PlanInfoGenerator:
|
|
|
+ """从Excel生成施工方案信息的生成器类。"""
|
|
|
+
|
|
|
+ COLUMN_MAPPING = {
|
|
|
+ "施工方案名称": "plan_name",
|
|
|
+ "工程项目名称": "project_name",
|
|
|
+ "分部/分项工程": "project_section",
|
|
|
+ "编制单位": "compiling_unit",
|
|
|
+ "编制日期": "compiling_date",
|
|
|
+ "方案简述": "plan_summary",
|
|
|
+ "方案类别": "plan_category",
|
|
|
+ "一级分类": "level_1_classification",
|
|
|
+ "二级分类": "level_2_classification",
|
|
|
+ "三级分类": "level_3_classification",
|
|
|
+ "四级分类": "level_4_classification",
|
|
|
+ }
|
|
|
+
|
|
|
+ NOTE_COLUMNS = ["专项施工方案名称", "工艺简述"]
|
|
|
+
|
|
|
+ def __init__(self, excel_path: str | Path, sheet_index: int = 0):
|
|
|
+ self.excel_path = Path(excel_path)
|
|
|
+ self.sheet_index = sheet_index
|
|
|
+ self._load_excel()
|
|
|
+
|
|
|
+ def _load_excel(self) -> None:
|
|
|
+ if not self.excel_path.exists():
|
|
|
+ raise FileNotFoundError(f"Excel文件不存在: {self.excel_path}")
|
|
|
+
|
|
|
+ self.df = pd.read_excel(self.excel_path, sheet_name=self.sheet_index)
|
|
|
+ print(f"✅ 已加载Excel文件: {self.excel_path},共{len(self.df)}行数据。")
|
|
|
+ self.df.set_index(EXCEL_ID_COLUMN, inplace=True, drop=False)
|
|
|
+
|
|
|
+ def _parse_date(self, value: Any) -> Optional[str]:
|
|
|
+ if pd.isna(value):
|
|
|
+ return None
|
|
|
+
|
|
|
+ if isinstance(value, str) and value.strip():
|
|
|
+ try:
|
|
|
+ parsed = pd.to_datetime(value)
|
|
|
+ return parsed.strftime("%Y-%m-%d")
|
|
|
+ except Exception:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if hasattr(value, "strftime"):
|
|
|
+ return value.strftime("%Y-%m-%d")
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
+ def get_plan_info_by_code(self, code: str) -> Optional[dict[str, Any]]:
|
|
|
+ try:
|
|
|
+ code_str = str(code).strip()
|
|
|
+ df_with_str_id = self.df.copy()
|
|
|
+ df_with_str_id.index = df_with_str_id.index.astype(str).str.strip()
|
|
|
+ row = df_with_str_id.loc[code_str]
|
|
|
+ except KeyError:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if hasattr(row, "to_dict"):
|
|
|
+ row_dict = row.to_dict()
|
|
|
+ else:
|
|
|
+ row_dict = row.to_frame().T.iloc[0].to_dict()
|
|
|
+
|
|
|
+ result: Dict[str, Any] = {
|
|
|
+ "id": str(uuid.uuid4()),
|
|
|
+ }
|
|
|
+
|
|
|
+ for excel_col, model_field in self.COLUMN_MAPPING.items():
|
|
|
+ if excel_col in row_dict:
|
|
|
+ value = row_dict[excel_col]
|
|
|
+
|
|
|
+ if model_field in ("compiling_date",):
|
|
|
+ value = self._parse_date(value)
|
|
|
+ elif pd.isna(value):
|
|
|
+ value = None
|
|
|
+ elif isinstance(value, str):
|
|
|
+ value = value.strip() if value else None
|
|
|
+
|
|
|
+ result[model_field] = value
|
|
|
+
|
|
|
+ note_parts: List[str] = []
|
|
|
+ for col in self.NOTE_COLUMNS:
|
|
|
+ if col in row_dict:
|
|
|
+ value = row_dict[col]
|
|
|
+ if pd.isna(value):
|
|
|
+ continue
|
|
|
+ if isinstance(value, str):
|
|
|
+ value = value.strip()
|
|
|
+ if value:
|
|
|
+ note_parts.append(f"{col}: {value}")
|
|
|
+
|
|
|
+ if note_parts:
|
|
|
+ result["note"] = "; ".join(note_parts)
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+ def process_folder_structure(self, root_folder: str | Path) -> dict[str, list[str]]:
|
|
|
+ root_folder = Path(root_folder)
|
|
|
+ if not root_folder.is_dir():
|
|
|
+ raise NotADirectoryError(f"不是有效的文件夹: {root_folder}")
|
|
|
+
|
|
|
+ results = {"success": [], "failed": [], "skipped": []}
|
|
|
+
|
|
|
+ for subfolder in sorted(root_folder.iterdir()):
|
|
|
+ if not subfolder.is_dir():
|
|
|
+ continue
|
|
|
+
|
|
|
+ folder_name = subfolder.name
|
|
|
+ plan_info = self.get_plan_info_by_code(folder_name)
|
|
|
+
|
|
|
+ if plan_info is None:
|
|
|
+ print(f"📄 {folder_name} ❌ (id未在Excel中找到)")
|
|
|
+ results["skipped"].append(folder_name)
|
|
|
+ continue
|
|
|
+
|
|
|
+ try:
|
|
|
+ md_files = list(subfolder.glob("*.md"))
|
|
|
+ if not md_files:
|
|
|
+ print(f"📄 {folder_name} ❌ (文件夹中无.md文件)")
|
|
|
+ results["skipped"].append(folder_name)
|
|
|
+ continue
|
|
|
+
|
|
|
+ md_path = md_files[0]
|
|
|
+ output_path = md_path.with_suffix(".json")
|
|
|
+
|
|
|
+ with open(md_path, "r", encoding="utf-8") as f:
|
|
|
+ md_text = f.read()
|
|
|
+
|
|
|
+ parents, children = build_parent_and_children_rows(
|
|
|
+ md_text=md_text,
|
|
|
+ file_name=md_path.name,
|
|
|
+ parent_max_chars=PARENT_MAX_CHARS,
|
|
|
+ child_index_start=CHILD_INDEX_START,
|
|
|
+ )
|
|
|
+
|
|
|
+ info_to_save = {
|
|
|
+ k: v.isoformat() if isinstance(v, datetime) else v
|
|
|
+ for k, v in plan_info.items()
|
|
|
+ }
|
|
|
+
|
|
|
+ out_json = {
|
|
|
+ "doc": {
|
|
|
+ **info_to_save,
|
|
|
+ },
|
|
|
+ "parent": parents,
|
|
|
+ "children": children,
|
|
|
+ }
|
|
|
+
|
|
|
+ with open(output_path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(out_json, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ print(f"📄 {folder_name} ✅ (已生成: {output_path.name})")
|
|
|
+ results["success"].append(folder_name)
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"📄 {folder_name} ❌ ({str(e)})")
|
|
|
+ results["failed"].append(f"{folder_name} ({str(e)})")
|
|
|
+
|
|
|
+ return results
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ try:
|
|
|
+ generator = PlanInfoGenerator(EXCEL_FILE, sheet_index=SHEET_INDEX)
|
|
|
+ results = generator.process_folder_structure(ROOT_FOLDER)
|
|
|
+
|
|
|
+ print("\n" + "=" * 60)
|
|
|
+ print(
|
|
|
+ f"✅ 成功: {len(results['success'])} | ❌ 失败: {len(results['failed'])} | ⊘ 跳过: {len(results['skipped'])}"
|
|
|
+ )
|
|
|
+ print("=" * 60)
|
|
|
+ except Exception as e:
|
|
|
+ print(f"❌ 错误: {str(e)}")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|