Explorar el Código

dev:总结交接文件

ZengChao hace 1 mes
padre
commit
b1be1dc951
Se han modificado 2 ficheros con 129 adiciones y 0 borrados
  1. 1 0
      README.md
  2. 128 0
      src/app/scripts/python_processed_article.py

+ 1 - 0
README.md

@@ -57,6 +57,7 @@ uv run -m src.app.scripts.statu_to_milvus
 - `copy_pdf_md_files.py`:拷贝各子目录中的 PDF/MD 到目标目录,保留子目录结构
 - `ceshi.py`:MinIO 上传测试脚本(批量上传 md)
 - `ceshi_embdding.py`:Embedding 接口联通测试脚本
+- `python_processed_article`:python脚本修复层级
 
 ---
 

+ 128 - 0
src/app/scripts/python_processed_article.py

@@ -0,0 +1,128 @@
+import os
+import re
+from concurrent.futures import ThreadPoolExecutor, as_completed
+
+
+
+INPUT_ROOT = r"C:\Users\ZengChao\Desktop\新建文件夹"
+
+# 排除字符:前10字出现这些字符就不处理成标题
+EXCLUDE_CHARS = {"表", "图", "$", "(", ")", "(", ")"}
+
+# =====================================================
+# 规则:行级处理(以 \n 分隔),前10字含排除字符则不处理
+# =====================================================
+def restore_heading_level_block(block: str) -> str:
+    if not block.strip():
+        return block
+
+    stripped = block.lstrip()
+    
+    # 先去掉开头的 # 和空格
+    content = re.sub(r"^#+\s*", "", stripped)
+    preview = content[:10]
+    
+    # 检查是否以数字开头,只有数字起头才能加 #
+    if not content or not content[0].isdigit():
+        return block
+
+    # 前10字出现排除字符 => 直接正文
+    if any(char in preview for char in EXCLUDE_CHARS):
+        return block
+
+    # 提取开头的数字和点号格式,如 7、7.1、7.1.3、713 等
+    match = re.match(r"([\d.]+)", content)
+    if not match:
+        return block
+    
+    num_str = match.group(1)
+    
+    # 提取所有数字(去掉点)
+    digits_only = re.sub(r"\.", "", num_str)
+    
+    # 数字个数 = 标题级数
+    level = len(digits_only)
+    
+    # 在数字之间加点
+    dotted = ".".join(digits_only)
+    
+    # 替换原来的数字为处理后的格式
+    result_content = content.replace(num_str, dotted, 1)
+    
+    return ("#" * level) + " " + result_content
+
+
+# =====================================================
+# 文件处理:输入 md -> 输出到 "一级目录(case_dir)" 下
+# case_dir = INPUT_ROOT 下的第一层子目录(例如 ...\新建文件夹\1)
+# =====================================================
+def process_md_file(md_path: str, case_dir: str):
+    md_name = os.path.basename(md_path)
+    print(f"⏳ 正在处理:{md_name}")
+
+    try:
+        with open(md_path, "r", encoding="utf-8") as f:
+            text = f.read()
+
+        blocks = text.split("\n")
+        new_blocks = [restore_heading_level_block(b) for b in blocks]
+
+        # 输出文件名:默认同名;如冲突则自动加后缀
+        out_path = os.path.join(case_dir, md_name)
+        if os.path.exists(out_path):
+            base, ext = os.path.splitext(md_name)
+            i = 2
+            while True:
+                cand = os.path.join(case_dir, f"{base}_{i}{ext}")
+                if not os.path.exists(cand):
+                    out_path = cand
+                    break
+                i += 1
+
+        with open(out_path, "w", encoding="utf-8") as f:
+            f.write("\n".join(new_blocks))
+
+        print(f"✅ 处理成功:{os.path.basename(out_path)}")
+    except Exception:
+        print(f"❌ 处理失败:{md_name}")
+
+
+# =====================================================
+# 扫描规则:
+# 输入目录 INPUT_ROOT
+# - 只取 INPUT_ROOT 下的第一层子目录(例如 ...\新建文件夹\1、...\新建文件夹\2)
+# - 在每个子目录内部递归查找所有 "auto" 目录下的 md 文件
+# - 处理后输出到该第一层子目录(例如 ...\新建文件夹\1)
+# =====================================================
+def iter_case_dirs(input_root: str):
+    for name in os.listdir(input_root):
+        p = os.path.join(input_root, name)
+        if os.path.isdir(p):
+            yield p
+
+
+def iter_md_under_auto(case_dir: str):
+    for dirpath, _, filenames in os.walk(case_dir):
+        if os.path.basename(dirpath) == "auto":
+            for fn in filenames:
+                if fn.lower().endswith(".md"):
+                    yield os.path.join(dirpath, fn)
+
+
+def run(input_root: str, max_workers: int = 16):
+    if not os.path.isdir(input_root):
+        raise ValueError("INPUT_ROOT 必须是文件夹路径")
+
+    tasks = []
+    with ThreadPoolExecutor(max_workers=max_workers) as ex:
+        for case_dir in iter_case_dirs(input_root):
+            for md_path in iter_md_under_auto(case_dir):
+                tasks.append(ex.submit(process_md_file, md_path, case_dir))
+
+        for fut in as_completed(tasks):
+            fut.result()
+
+
+if __name__ == "__main__":
+    # ✅ 只需要改这里:输入根目录
+    run(INPUT_ROOT, max_workers=16)