Эх сурвалжийг харах

fix: 修改编制依据子表切分逻辑,按照句号切分且注意md表格的保护

ai02 4 долоо хоног өмнө
parent
commit
08acb3f144

+ 390 - 18
src/app/scripts/base_info_json_generation.py

@@ -22,14 +22,14 @@ from app.models import StandardBaseInfo  # noqa: F401  # 仅用于类型/一致
 
 
 # ==================== 配置参数 ====================
-EXCEL_FILE = r"C:\Users\ZengChao\Desktop\编制依据_output3.xlsx"      # ✅ Excel文件路径
-ROOT_FOLDER = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终编制依据"   # ✅ 根文件夹路径
-SHEET_INDEX = 0                                        # 目标sheet索引(0为第一个sheet)
-
-FAILED_REPORT_PATH = r"F:\第二阶段编制依据及施工方案数据治理-20260206\base_json_failed_report.json"  # ✅ 失败汇总JSON保存路径
+EXCEL_FILE = r"F:\第二阶段编制依据及施工方案数据治理-20260206\编制依据.xlsx"
+ROOT_FOLDER = r"F:\第二阶段编制依据及施工方案数据治理-20260206\最终编制依据\501-1000"
+SHEET_INDEX = 0
 
+FAILED_REPORT_PATH = r"F:\测试\base_json_failed_report.json"
 PARENT_MAX_CHARS = 6000                                # ✅ 父段最大长度(超长切片)
 CHILD_INDEX_START = 0                                  # ✅ children.index 起始
+MILVUS_VARCHAR_MAX_LENGTH = 65535                      # ✅ text 字段最大 UTF-8 字节长度
 # ================================================
 
 
@@ -39,6 +39,112 @@ CHILD_INDEX_START = 0                                  # ✅ children.index 起
 
 BLANK_SPLIT_RE = re.compile(r"\n\s*\n+")
 H1_RE = re.compile(r"^#\s+(.+?)\s*$", re.MULTILINE)
+MD_TABLE_SEPARATOR_RE = re.compile(r"^\s*\|?(?:\s*:?-{3,}:?\s*\|)+\s*:?-{3,}:?\s*\|?\s*$")
+
+# HTML表格转换相关正则
+HTML_TABLE_RE = re.compile(r"<table[^>]*>.*?</table>", re.DOTALL | re.IGNORECASE)
+HTML_TR_RE = re.compile(r"<tr[^>]*>(.*?)</tr>", re.DOTALL | re.IGNORECASE)
+HTML_TD_RE = re.compile(r"<td[^>]*>(.*?)</td>", re.DOTALL | re.IGNORECASE)
+HTML_TH_RE = re.compile(r"<th[^>]*>(.*?)</th>", re.DOTALL | re.IGNORECASE)
+HTML_ROWSPAN_RE = re.compile(r'rowspan=["\']?(\d+)["\']?', re.IGNORECASE)
+HTML_COLSPAN_RE = re.compile(r'colspan=["\']?(\d+)["\']?', re.IGNORECASE)
+
+
+def parse_html_table(table_html: str) -> List[List[str]]:
+    """
+    解析HTML表格,返回二维列表(行 x 列)
+    处理 rowspan 和 colspan,将跨行/跨列单元格展开为重复内容
+    (Markdown表格本身不支持rowspan/colspan,通过重复内容实现)
+    """
+    rows = []
+    rowspan_map = {}  # 记录跨行信息: {(row, col): value}
+    current_row = 0
+
+    for tr_match in HTML_TR_RE.finditer(table_html):
+        tr_content = tr_match.group(1)
+        row = []
+        col_idx = 0
+
+        # 处理 td 和 th
+        cells = list(HTML_TD_RE.finditer(tr_content)) + list(HTML_TH_RE.finditer(tr_content))
+        cells.sort(key=lambda m: m.start())  # 按位置排序
+
+        for cell_match in cells:
+            cell_html = cell_match.group(0)
+            cell_content = cell_match.group(1).strip()
+            # 去除内部HTML标签
+            cell_content = re.sub(r'<[^>]+>', '', cell_content).strip()
+
+            # 解析 rowspan 和 colspan
+            rowspan_match = HTML_ROWSPAN_RE.search(cell_html)
+            colspan_match = HTML_COLSPAN_RE.search(cell_html)
+
+            rowspan = int(rowspan_match.group(1)) if rowspan_match else 1
+            colspan = int(colspan_match.group(1)) if colspan_match else 1
+
+            # 跳过被 rowspan 占用的位置
+            while (current_row, col_idx) in rowspan_map:
+                row.append(rowspan_map[(current_row, col_idx)])
+                col_idx += 1
+
+            # 添加当前单元格内容(重复 colspan 次)
+            for _ in range(colspan):
+                row.append(cell_content)
+
+            # 记录 rowspan 信息(用于后续行填充)
+            if rowspan > 1:
+                for r in range(1, rowspan):
+                    key = (current_row + r, col_idx)
+                    rowspan_map[key] = cell_content
+
+            col_idx += colspan
+
+        # 填充该行剩余的被 rowspan 占用的位置
+        while (current_row, col_idx) in rowspan_map:
+            row.append(rowspan_map[(current_row, col_idx)])
+            col_idx += 1
+
+        if row:
+            rows.append(row)
+            current_row += 1
+
+    return rows
+
+
+def convert_html_table_to_md(table_html: str) -> str:
+    """
+    将HTML表格转换为Markdown表格格式
+    """
+    rows = parse_html_table(table_html)
+    if not rows:
+        return table_html
+
+    # 转换为Markdown表格
+    md_lines = []
+
+    for i, row in enumerate(rows):
+        # 转义管道符
+        escaped_row = [cell.replace('|', '\\|') for cell in row]
+        md_line = '| ' + ' | '.join(escaped_row) + ' |'
+        md_lines.append(md_line)
+
+        # 在第一行后添加分隔符
+        if i == 0:
+            separator = '|' + '|'.join(['---'] * len(row)) + '|'
+            md_lines.append(separator)
+
+    return '\n'.join(md_lines)
+
+
+def convert_all_html_tables_to_md(text: str) -> str:
+    """
+    将文本中所有的HTML表格转换为Markdown表格
+    """
+    def replace_table(match):
+        table_html = match.group(0)
+        return convert_html_table_to_md(table_html)
+
+    return HTML_TABLE_RE.sub(replace_table, text)
 
 
 def split_md_by_blank_lines(md: str) -> List[str]:
@@ -47,6 +153,21 @@ def split_md_by_blank_lines(md: str) -> List[str]:
     return [p.strip() for p in parts if p.strip()]
 
 
+def contains_markdown_table(text: str) -> bool:
+    """
+    判断文本块是否包含 Markdown 表格(至少有表头行 + 分隔行)。
+    """
+    lines = [line.strip() for line in text.replace("\r\n", "\n").replace("\r", "\n").split("\n")]
+    for idx in range(len(lines) - 1):
+        current = lines[idx]
+        next_line = lines[idx + 1]
+        if "|" not in current:
+            continue
+        if MD_TABLE_SEPARATOR_RE.match(next_line):
+            return True
+    return False
+
+
 def is_heading_chunk(chunk: str):
     first_line = chunk.split("\n", 1)[0].strip()
     m = re.match(r"^(#{1,6})\s+(.+?)\s*$", first_line)
@@ -123,6 +244,14 @@ def split_text_by_max_chars(text: str, max_chars: int) -> List[str]:
 
     for chunk in chunks:
         if len(chunk) > max_chars:
+            # Markdown 表格块不做硬切,避免破坏表结构
+            if contains_markdown_table(chunk):
+                if current_slice.strip():
+                    result.append(current_slice.strip())
+                    current_slice = ""
+                result.append(chunk.strip())
+                continue
+
             if current_slice.strip():
                 result.append(current_slice.strip())
                 current_slice = ""
@@ -145,6 +274,142 @@ def split_text_by_max_chars(text: str, max_chars: int) -> List[str]:
     return [s for s in result if s]
 
 
+def utf8_len(text: str) -> int:
+    """返回字符串 UTF-8 字节长度。"""
+    return len((text or "").encode("utf-8"))
+
+
+def split_text_by_utf8_bytes(text: str, max_bytes: int) -> List[str]:
+    """
+    按 UTF-8 字节上限无损切分文本。
+    - 不会直接丢弃超长尾部内容
+    - 不会切到半个字符
+    """
+    content = str(text or "").strip()
+    if not content:
+        return []
+    if utf8_len(content) <= max_bytes:
+        return [content]
+
+    chunks: List[str] = []
+    raw = content.encode("utf-8")
+    start = 0
+
+    while start < len(raw):
+        candidate = raw[start:start + max_bytes]
+        piece = candidate.decode("utf-8", errors="ignore").strip()
+        if piece:
+            chunks.append(piece)
+            start += len(piece.encode("utf-8"))
+        else:
+            # 兜底推进,避免极端情况下死循环
+            start += max_bytes
+
+    return chunks
+
+
+def expand_rows_for_varchar_limit(
+    rows: List[Dict[str, Any]],
+    max_bytes: int,
+    start_index: int = 0,
+) -> List[Dict[str, Any]]:
+    """
+    对 rows 中超长 text 做无损扩展:
+    - 超过 max_bytes 的 text 继续拆成下一条,而不是截断丢弃
+    - 重新连续编号 index
+    """
+    if not rows:
+        return []
+
+    expanded: List[Dict[str, Any]] = []
+    next_index = start_index
+
+    for row in rows:
+        pieces = split_text_by_utf8_bytes(row.get("text", ""), max_bytes=max_bytes)
+        if not pieces:
+            continue
+        for piece in pieces:
+            new_row = dict(row)
+            new_row["text"] = piece
+            new_row["index"] = next_index
+            expanded.append(new_row)
+            next_index += 1
+
+    return expanded
+
+
+def split_text_by_chinese_period(text: str) -> List[str]:
+    """
+    严格按照中文句号"。"对文本进行切分
+    - 按"。"切分后,保留句号在句子末尾
+    - 过滤掉空字符串
+    - 如果没有句号,返回整个文本作为一个句子
+    """
+    if not text or not text.strip():
+        return []
+
+    # 先按句号切分,然后为每个部分(除了最后一个)添加句号
+    parts = text.split("。")
+    sentences = []
+
+    for i, part in enumerate(parts):
+        part = part.strip()
+        if not part:
+            continue
+
+        # 如果不是最后一部分,说明后面有句号,需要加回来
+        if i < len(parts) - 1:
+            part += "。"
+
+        sentences.append(part)
+
+    return sentences
+
+
+def split_completed_sentences_by_chinese_period(text: str) -> Tuple[List[str], str]:
+    """
+    按中文句号"。"提取已完成句子,并返回剩余未完成文本。
+    - 已完成句子:以"。"结尾
+    - 剩余文本:最后一个"。"之后的内容(可能为空)
+    """
+    text = (text or "").strip()
+    if not text:
+        return [], ""
+
+    if "。" not in text:
+        return [], text
+
+    parts = text.split("。")
+    completed: List[str] = []
+
+    # 最后一段是剩余未完成文本(原文本以"。"结尾时为空)
+    for part in parts[:-1]:
+        part = part.strip()
+        if part:
+            completed.append(f"{part}。")
+
+    remainder = parts[-1].strip()
+    return completed, remainder
+
+
+def extract_all_headings(sec_text: str) -> List[str]:
+    """
+    从文本中提取所有标题(# 开头的行)
+    返回标题文本列表(不含#号)
+    """
+    headings = []
+    lines = sec_text.replace("\r\n", "\n").replace("\r", "\n").split("\n")
+
+    for line in lines:
+        line = line.strip()
+        m = re.match(r"^(#{1,6})\s+(.+?)\s*$", line)
+        if m:
+            title = m.group(2).strip()
+            headings.append(title)
+
+    return headings
+
+
 def build_parent_and_children_rows(
     md_text: str,
     file_name: str,
@@ -155,6 +420,12 @@ def build_parent_and_children_rows(
     输出你最终要的极简结构:
     parent:   [{parent_id,index,hierarchy,text}, ...]
     children: [{parent_id,index,hierarchy,text}, ...]
+
+    子表切分逻辑:
+    1. 严格按照中文句号"。"切分文本(未遇到句号前持续累计,不切分)
+    2. 将切分后的句子与已识别的标题进行比对
+    3. 将每个句子内容上方存在的标题信息同步保存到子表的text字段中
+    4. 当切分内容与某个标题完全相同时,该标题不重复保存到子表中
     """
     doc_name = guess_doc_name_from_filename(file_name)
 
@@ -170,15 +441,51 @@ def build_parent_and_children_rows(
             parent_seq=parent_seq,
         )
 
-    # 2) children:在各父段内部按空行切,并维护 hierarchy(outline_path)
+    # 2) children:在各父段内部按中文句号切分,并维护 hierarchy(outline_path)
     children: List[Dict[str, Any]] = []
     child_index = child_index_start
 
     for parent_seq, (_h1_title, sec_text) in enumerate(parent_sections):
         parent_id = parent_seq_to_parent_id[parent_seq]
 
+        # 提取该父段中的所有标题,用于后续比对和去重
+        all_headings = extract_all_headings(sec_text)
+        all_headings_set = set(all_headings)
+
+        # 按空行切分块,用于维护 hierarchy 和标题上下文
         chunks = split_md_by_blank_lines(sec_text)
         heading_path: List[str] = []
+        current_context_headings: List[str] = []  # 当前句子上方的标题信息
+
+        pending_text = ""
+        pending_context_headings: List[str] = []
+        pending_hierarchy = ""
+
+        def append_child_row(
+            text: str,
+            hierarchy_value: str,
+            context_headings: List[str],
+        ) -> None:
+            nonlocal child_index
+            text = (text or "").strip()
+            if not text:
+                return
+
+            if context_headings:
+                heading_prefix = " > ".join(context_headings)
+                full_text = f"{heading_prefix}\n{text}"
+            else:
+                full_text = text
+
+            children.append(
+                {
+                    "index": child_index,
+                    "parent_id": int(parent_id),
+                    "hierarchy": hierarchy_value,
+                    "text": full_text,
+                }
+            )
+            child_index += 1
 
         for chunk in chunks:
             heading_info = is_heading_chunk(chunk)
@@ -187,22 +494,72 @@ def build_parent_and_children_rows(
                 parent_path = heading_path[: level - 1]
                 hierarchy = outline_path_str(parent_path)
                 heading_path = parent_path + [title]
+                # 更新当前上下文标题
+                current_context_headings = heading_path.copy()
+                continue  # 标题本身不生成子表记录
+
+            hierarchy = outline_path_str(heading_path)
+            chunk_text = chunk.strip()
+            if not chunk_text:
+                continue
+
+            # Markdown 表格整体保留,避免按句号切分导致表结构破坏
+            if contains_markdown_table(chunk_text):
+                if pending_text.strip():
+                    sentence = pending_text.strip()
+                    sentence_without_period = sentence.rstrip("。")
+                    if sentence_without_period not in all_headings_set:
+                        append_child_row(sentence, pending_hierarchy, pending_context_headings)
+
+                    pending_text = ""
+                    pending_context_headings = []
+                    pending_hierarchy = ""
+
+                append_child_row(chunk_text, hierarchy, current_context_headings)
+                continue
+
+            # 先拼接历史未完成文本,再按"。"提取已完成句子
+            if pending_text:
+                merged_text = f"{pending_text}\n\n{chunk_text}"
+                sentence_context_headings = pending_context_headings or current_context_headings
+                sentence_hierarchy = pending_hierarchy or hierarchy
             else:
-                hierarchy = outline_path_str(heading_path)
+                merged_text = chunk_text
+                sentence_context_headings = current_context_headings
+                sentence_hierarchy = hierarchy
 
-            children.append(
-                {
-                    "index": child_index,
-                    "parent_id": int(parent_id),
-                    "hierarchy": hierarchy,
-                    "text": chunk,
-                }
-            )
-            child_index += 1
+            completed_sentences, pending_text = split_completed_sentences_by_chinese_period(merged_text)
+
+            for sentence in completed_sentences:
+                sentence = sentence.strip()
+                if not sentence:
+                    continue
+
+                # 检查句子是否与某个标题完全相同
+                sentence_without_period = sentence.rstrip("。")
+                if sentence_without_period in all_headings_set:
+                    # 如果句子与标题完全相同,跳过不保存
+                    continue
+
+                append_child_row(sentence, sentence_hierarchy, sentence_context_headings)
+
+            if pending_text:
+                pending_context_headings = sentence_context_headings.copy()
+                pending_hierarchy = sentence_hierarchy
+            else:
+                pending_context_headings = []
+                pending_hierarchy = ""
+
+        # 父段末尾若仍有未完成句子,整体保留为一条(不按空行块切分)
+        if pending_text.strip():
+            sentence = pending_text.strip()
+            sentence_without_period = sentence.rstrip("。")
+            if sentence_without_period not in all_headings_set:
+                append_child_row(sentence, pending_hierarchy, pending_context_headings)
 
     # 3) parents:父段过长按 max_chars 切片,但 parent_id 不变
     parents: List[Dict[str, Any]] = []
-    parent_row_index = 0  # 这里用“父表行顺序index”,保证唯一、可追溯
+    parent_row_index = 0  # 这里用"父表行顺序index",保证唯一、可追溯
 
     for parent_seq, (h1_title, sec_text) in enumerate(parent_sections):
         parent_id = parent_seq_to_parent_id[parent_seq]
@@ -227,6 +584,18 @@ def build_parent_and_children_rows(
             )
             parent_row_index += 1
 
+    # 4) VARCHAR 保护:按 UTF-8 字节上限做无损扩展(不截断丢尾)
+    parents = expand_rows_for_varchar_limit(
+        parents,
+        max_bytes=MILVUS_VARCHAR_MAX_LENGTH,
+        start_index=0,
+    )
+    children = expand_rows_for_varchar_limit(
+        children,
+        max_bytes=MILVUS_VARCHAR_MAX_LENGTH,
+        start_index=child_index_start,
+    )
+
     return parents, children
 
 
@@ -348,6 +717,9 @@ class StandardInfoGenerator:
                 with open(md_path, "r", encoding="utf-8") as f:
                     md_text = f.read()
 
+                # 将HTML表格转换为Markdown表格
+                md_text = convert_all_html_tables_to_md(md_text)
+
                 parents, children = build_parent_and_children_rows(
                     md_text=md_text,
                     file_name=md_path.name,
@@ -355,7 +727,7 @@ class StandardInfoGenerator:
                     child_index_start=CHILD_INDEX_START,
                 )
 
-                # doc 节点:放“全局信息”(你要的“各种信息”
+                # doc 节点:放"全局信息"(你要的"各种信息"
                 info_to_save = {
                     k: v.isoformat() if isinstance(v, datetime) else v
                     for k, v in standard_info.items()