#!/usr/bin/env python # -*- coding: utf-8 -*- """ 对 330测试fa56f7a8-bd36-4140-8cb1-f9f973ce8745.pdf 进行详细的切分分析 绕过 LLM 分类,直接测试 TOC 提取、全文提取、文本切分 """ import json import os import sys import traceback from datetime import datetime from pathlib import Path project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) # 直接导入,避免触发 LLM 初始化 # 先设置环境变量避免某些初始化 os.environ['SKIP_AI_INIT'] = '1' # 直接导入需要的低层模块 from core.construction_review.component.doc_worker.interfaces import DocumentSource from core.construction_review.component.doc_worker.config.provider import default_config_provider from core.construction_review.component.doc_worker.pdf_worker.toc_extractor import PdfTOCExtractor from core.construction_review.component.doc_worker.pdf_worker.hybrid_extractor import HybridFullTextExtractor from core.construction_review.component.doc_worker.pdf_worker.text_splitter import PdfTextSplitter from core.construction_review.component.doc_worker.utils.title_matcher import TitleMatcher TARGET_FILE = Path(__file__).parent / "330测试fa56f7a8-bd36-4140-8cb1-f9f973ce8745.pdf" def analyze(): if not TARGET_FILE.exists(): print(f"[ERROR] 文件不存在: {TARGET_FILE}") return 1 print(f"\n[INFO] 正在处理: {TARGET_FILE.name}") with open(TARGET_FILE, "rb") as f: file_content = f.read() source = DocumentSource(path=None, content=file_content, file_type="pdf") # 1. TOC 提取 print("\n[Step 1] 提取目录...") toc_extractor = PdfTOCExtractor() toc_info = toc_extractor.extract_toc(source) toc_items = toc_info.get("toc_items") or [] print("\n" + "=" * 80) print("1. 目录提取结果") print("=" * 80) print(f" toc_count: {toc_info.get('toc_count')}") print(f" toc_pages: {toc_info.get('toc_pages')}") level1_count = sum(1 for x in toc_items if x.get('level') == 1) print(f" 一级章节数: {level1_count}") print(f"\n 所有一级目录项:") level1_items = [item for item in toc_items if item.get('level') == 1] for i, item in enumerate(level1_items, 1): print(f" {i}. [L{item.get('level')}] P{item.get('page')} {item.get('title')}") # 2. 全文提取 print("\n[Step 2] 提取全文...") fulltext_extractor = HybridFullTextExtractor() pages_content = fulltext_extractor.extract_full_text(source) full_text = "".join(p.get("text", "") for p in pages_content) print("\n" + "=" * 80) print("2. 全文提取结果") print("=" * 80) print(f" 总页数: {len(pages_content)}") print(f" 总字符数: {len(full_text)}") if pages_content: print(f" 第一页预览: {pages_content[0].get('text', '')[:100]}...") print(f" 最后一页预览: {pages_content[-1].get('text', '')[:100]}...") # 3. 标题定位分析(关键!) print("\n[Step 3] 分析标题在正文中的定位...") print("\n" + "=" * 80) print("3. 一级标题在正文中的定位分析") print("=" * 80) matcher = TitleMatcher() toc_pages = toc_info.get("toc_pages", []) or [] located = matcher.find_title_positions(level1_items, full_text, pages_content, toc_pages) for loc in located: status = "FOUND" if loc["found"] else "NOT FOUND" print(f"\n [{status}] '{loc['title']}'") if loc["found"]: print(f" toc_page={loc.get('toc_page')}, actual_page={loc.get('actual_page')}, pos={loc['position']}") # 上下文 pos = loc["position"] ctx = full_text[max(0, pos-40):min(len(full_text), pos+80)].replace("\n", " ") print(f" 上下文: ...{ctx}...") # 检查是否所有标题都找到了 found_count = sum(1 for loc in located if loc["found"]) print(f"\n 定位统计: {found_count}/{len(located)} 个标题成功定位") # 4. 文本切分 print("\n[Step 4] 执行文本切分...") print("\n" + "=" * 80) print("4. 文本切分结果") print("=" * 80) text_splitter = PdfTextSplitter() target_level = int(default_config_provider.get("text_splitting.target_level", 1)) max_chunk_size = int(default_config_provider.get("text_splitting.max_chunk_size", 3000)) min_chunk_size = int(default_config_provider.get("text_splitting.min_chunk_size", 50)) print(f" 切分参数: target_level={target_level}, max_chunk_size={max_chunk_size}") # 构造 classification_items(不用 LLM,直接用 TOC 的 level1) classification_items = [ { "title": item["title"], "page": item["page"], "level": item["level"], "category": "未分类", "category_code": "other", } for item in level1_items ] chunks = text_splitter.split_by_hierarchy( classification_items=classification_items, pages_content=pages_content, toc_info=toc_info, target_level=target_level, max_chunk_size=max_chunk_size, min_chunk_size=min_chunk_size, ) section_labels = [c.get("section_label", "UNKNOWN") for c in chunks] print(f"\n 总 chunks: {len(chunks)}") print(f"\n 所有 chunks:") for i, label in enumerate(section_labels, 1): chunk = chunks[i-1] content = chunk.get("review_chunk_content", "") or chunk.get("content", "") content_preview = content[:60].replace("\n", " ") print(f" {i}. {label}") print(f" chunk_id={chunk.get('chunk_id')}, page={chunk.get('element_tag',{}).get('page')}, len={len(content)}") print(f" preview={content_preview}...") # 5. 完整性检查 print("\n" + "=" * 80) print("5. 完整性检查") print("=" * 80) last_level1 = level1_items[-1] if level1_items else None last_title = last_level1.get("title", "").strip() if last_level1 else "" print(f" 最后一章标题: {last_title}") print(f" 最后一章页码: {last_level1.get('page') if last_level1 else 'N/A'}") def normalize(t: str) -> str: return t.replace(" ", "").replace("\u3000", "").strip() last_found = False last_chunk = None for label in section_labels: first = label.split("->")[0].strip() if "->" in label else label.strip() if normalize(last_title) in normalize(first) or normalize(first) in normalize(last_title): last_found = True print(f" 最后一章匹配到: {label}") # 找到对应的 chunk for c in chunks: if c.get("section_label") == label: last_chunk = c break break if not last_found: print(f" [WARNING] 最后一章未找到对应 chunk!") # 6. 泄漏检查 print("\n" + "=" * 80) print("6. 跨章节泄漏检查") print("=" * 80) if len(level1_items) >= 2 and last_title: prev_level1 = level1_items[-2] prev_title = prev_level1.get("title", "").strip() print(f" 倒数第二章: {prev_title}") print(f" 最后一章: {last_title}") prev_chunks = [] for c in chunks: label = c.get("section_label", "") first = label.split("->")[0].strip() if "->" in label else label.strip() if normalize(prev_title) in normalize(first) or normalize(first) in normalize(prev_title): prev_chunks.append(c) print(f" 倒数第二章的 chunks 数: {len(prev_chunks)}") if prev_chunks: last_prev = prev_chunks[-1] content = last_prev.get("review_chunk_content", "") or last_prev.get("content", "") keywords = [k for k in last_title.split() if len(k) >= 2] if not keywords: keywords = [last_title] print(f" 检查关键词: {keywords}") leak_found = False for kw in keywords: if kw in content: leak_found = True idx = content.find(kw) ctx_start = max(0, idx - 100) ctx_end = min(len(content), idx + len(kw) + 100) print(f"\n [LEAK DETECTED] chunk '{last_prev.get('chunk_id')}' ({last_prev.get('section_label')}) 包含 '{kw}'") print(f" 上下文:") print(f" ...{content[ctx_start:ctx_end]}...") if not leak_found: print(" 未发现跨章节泄漏") else: print(" 未找到倒数第二章的 chunks") # 保存结果 out_dir = Path(__file__).parent json_path = out_dir / "single_test_result.json" with open(json_path, "w", encoding="utf-8") as f: json.dump({ "timestamp": datetime.now().isoformat(), "filename": TARGET_FILE.name, "toc_count": toc_info.get('toc_count'), "toc_pages": toc_info.get('toc_pages'), "toc_items": toc_items, "title_locations": [ { "title": loc["title"], "found": loc["found"], "position": loc.get("position"), "toc_page": loc.get("toc_page"), "actual_page": loc.get("actual_page"), } for loc in located ], "chunks_meta": [ { "chunk_id": c.get("chunk_id"), "section_label": c.get("section_label"), "page": c.get("element_tag", {}).get("page"), "content_len": len(c.get("review_chunk_content", "") or c.get("content", "")), "content_preview": (c.get("review_chunk_content", "") or c.get("content", ""))[:200].replace("\n", " ") } for c in chunks ], }, f, ensure_ascii=False, indent=2) print(f"\n[INFO] 结果已保存: {json_path}") return 0 if __name__ == "__main__": sys.exit(analyze())