run_single_test.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 对 330测试fa56f7a8-bd36-4140-8cb1-f9f973ce8745.pdf 进行详细的切分分析
  5. 绕过 LLM 分类,直接测试 TOC 提取、全文提取、文本切分
  6. """
  7. import json
  8. import os
  9. import sys
  10. import traceback
  11. from datetime import datetime
  12. from pathlib import Path
  13. project_root = Path(__file__).parent.parent.parent
  14. sys.path.insert(0, str(project_root))
  15. # 直接导入,避免触发 LLM 初始化
  16. # 先设置环境变量避免某些初始化
  17. os.environ['SKIP_AI_INIT'] = '1'
  18. # 直接导入需要的低层模块
  19. from core.construction_review.component.doc_worker.interfaces import DocumentSource
  20. from core.construction_review.component.doc_worker.config.provider import default_config_provider
  21. from core.construction_review.component.doc_worker.pdf_worker.toc_extractor import PdfTOCExtractor
  22. from core.construction_review.component.doc_worker.pdf_worker.hybrid_extractor import HybridFullTextExtractor
  23. from core.construction_review.component.doc_worker.pdf_worker.text_splitter import PdfTextSplitter
  24. from core.construction_review.component.doc_worker.utils.title_matcher import TitleMatcher
  25. TARGET_FILE = Path(__file__).parent / "330测试fa56f7a8-bd36-4140-8cb1-f9f973ce8745.pdf"
  26. def analyze():
  27. if not TARGET_FILE.exists():
  28. print(f"[ERROR] 文件不存在: {TARGET_FILE}")
  29. return 1
  30. print(f"\n[INFO] 正在处理: {TARGET_FILE.name}")
  31. with open(TARGET_FILE, "rb") as f:
  32. file_content = f.read()
  33. source = DocumentSource(path=None, content=file_content, file_type="pdf")
  34. # 1. TOC 提取
  35. print("\n[Step 1] 提取目录...")
  36. toc_extractor = PdfTOCExtractor()
  37. toc_info = toc_extractor.extract_toc(source)
  38. toc_items = toc_info.get("toc_items") or []
  39. print("\n" + "=" * 80)
  40. print("1. 目录提取结果")
  41. print("=" * 80)
  42. print(f" toc_count: {toc_info.get('toc_count')}")
  43. print(f" toc_pages: {toc_info.get('toc_pages')}")
  44. level1_count = sum(1 for x in toc_items if x.get('level') == 1)
  45. print(f" 一级章节数: {level1_count}")
  46. print(f"\n 所有一级目录项:")
  47. level1_items = [item for item in toc_items if item.get('level') == 1]
  48. for i, item in enumerate(level1_items, 1):
  49. print(f" {i}. [L{item.get('level')}] P{item.get('page')} {item.get('title')}")
  50. # 2. 全文提取
  51. print("\n[Step 2] 提取全文...")
  52. fulltext_extractor = HybridFullTextExtractor()
  53. pages_content = fulltext_extractor.extract_full_text(source)
  54. full_text = "".join(p.get("text", "") for p in pages_content)
  55. print("\n" + "=" * 80)
  56. print("2. 全文提取结果")
  57. print("=" * 80)
  58. print(f" 总页数: {len(pages_content)}")
  59. print(f" 总字符数: {len(full_text)}")
  60. if pages_content:
  61. print(f" 第一页预览: {pages_content[0].get('text', '')[:100]}...")
  62. print(f" 最后一页预览: {pages_content[-1].get('text', '')[:100]}...")
  63. # 3. 标题定位分析(关键!)
  64. print("\n[Step 3] 分析标题在正文中的定位...")
  65. print("\n" + "=" * 80)
  66. print("3. 一级标题在正文中的定位分析")
  67. print("=" * 80)
  68. matcher = TitleMatcher()
  69. toc_pages = toc_info.get("toc_pages", []) or []
  70. located = matcher.find_title_positions(level1_items, full_text, pages_content, toc_pages)
  71. for loc in located:
  72. status = "FOUND" if loc["found"] else "NOT FOUND"
  73. print(f"\n [{status}] '{loc['title']}'")
  74. if loc["found"]:
  75. print(f" toc_page={loc.get('toc_page')}, actual_page={loc.get('actual_page')}, pos={loc['position']}")
  76. # 上下文
  77. pos = loc["position"]
  78. ctx = full_text[max(0, pos-40):min(len(full_text), pos+80)].replace("\n", " ")
  79. print(f" 上下文: ...{ctx}...")
  80. # 检查是否所有标题都找到了
  81. found_count = sum(1 for loc in located if loc["found"])
  82. print(f"\n 定位统计: {found_count}/{len(located)} 个标题成功定位")
  83. # 4. 文本切分
  84. print("\n[Step 4] 执行文本切分...")
  85. print("\n" + "=" * 80)
  86. print("4. 文本切分结果")
  87. print("=" * 80)
  88. text_splitter = PdfTextSplitter()
  89. target_level = int(default_config_provider.get("text_splitting.target_level", 1))
  90. max_chunk_size = int(default_config_provider.get("text_splitting.max_chunk_size", 3000))
  91. min_chunk_size = int(default_config_provider.get("text_splitting.min_chunk_size", 50))
  92. print(f" 切分参数: target_level={target_level}, max_chunk_size={max_chunk_size}")
  93. # 构造 classification_items(不用 LLM,直接用 TOC 的 level1)
  94. classification_items = [
  95. {
  96. "title": item["title"],
  97. "page": item["page"],
  98. "level": item["level"],
  99. "category": "未分类",
  100. "category_code": "other",
  101. }
  102. for item in level1_items
  103. ]
  104. chunks = text_splitter.split_by_hierarchy(
  105. classification_items=classification_items,
  106. pages_content=pages_content,
  107. toc_info=toc_info,
  108. target_level=target_level,
  109. max_chunk_size=max_chunk_size,
  110. min_chunk_size=min_chunk_size,
  111. )
  112. section_labels = [c.get("section_label", "UNKNOWN") for c in chunks]
  113. print(f"\n 总 chunks: {len(chunks)}")
  114. print(f"\n 所有 chunks:")
  115. for i, label in enumerate(section_labels, 1):
  116. chunk = chunks[i-1]
  117. content = chunk.get("review_chunk_content", "") or chunk.get("content", "")
  118. content_preview = content[:60].replace("\n", " ")
  119. print(f" {i}. {label}")
  120. print(f" chunk_id={chunk.get('chunk_id')}, page={chunk.get('element_tag',{}).get('page')}, len={len(content)}")
  121. print(f" preview={content_preview}...")
  122. # 5. 完整性检查
  123. print("\n" + "=" * 80)
  124. print("5. 完整性检查")
  125. print("=" * 80)
  126. last_level1 = level1_items[-1] if level1_items else None
  127. last_title = last_level1.get("title", "").strip() if last_level1 else ""
  128. print(f" 最后一章标题: {last_title}")
  129. print(f" 最后一章页码: {last_level1.get('page') if last_level1 else 'N/A'}")
  130. def normalize(t: str) -> str:
  131. return t.replace(" ", "").replace("\u3000", "").strip()
  132. last_found = False
  133. last_chunk = None
  134. for label in section_labels:
  135. first = label.split("->")[0].strip() if "->" in label else label.strip()
  136. if normalize(last_title) in normalize(first) or normalize(first) in normalize(last_title):
  137. last_found = True
  138. print(f" 最后一章匹配到: {label}")
  139. # 找到对应的 chunk
  140. for c in chunks:
  141. if c.get("section_label") == label:
  142. last_chunk = c
  143. break
  144. break
  145. if not last_found:
  146. print(f" [WARNING] 最后一章未找到对应 chunk!")
  147. # 6. 泄漏检查
  148. print("\n" + "=" * 80)
  149. print("6. 跨章节泄漏检查")
  150. print("=" * 80)
  151. if len(level1_items) >= 2 and last_title:
  152. prev_level1 = level1_items[-2]
  153. prev_title = prev_level1.get("title", "").strip()
  154. print(f" 倒数第二章: {prev_title}")
  155. print(f" 最后一章: {last_title}")
  156. prev_chunks = []
  157. for c in chunks:
  158. label = c.get("section_label", "")
  159. first = label.split("->")[0].strip() if "->" in label else label.strip()
  160. if normalize(prev_title) in normalize(first) or normalize(first) in normalize(prev_title):
  161. prev_chunks.append(c)
  162. print(f" 倒数第二章的 chunks 数: {len(prev_chunks)}")
  163. if prev_chunks:
  164. last_prev = prev_chunks[-1]
  165. content = last_prev.get("review_chunk_content", "") or last_prev.get("content", "")
  166. keywords = [k for k in last_title.split() if len(k) >= 2]
  167. if not keywords:
  168. keywords = [last_title]
  169. print(f" 检查关键词: {keywords}")
  170. leak_found = False
  171. for kw in keywords:
  172. if kw in content:
  173. leak_found = True
  174. idx = content.find(kw)
  175. ctx_start = max(0, idx - 100)
  176. ctx_end = min(len(content), idx + len(kw) + 100)
  177. print(f"\n [LEAK DETECTED] chunk '{last_prev.get('chunk_id')}' ({last_prev.get('section_label')}) 包含 '{kw}'")
  178. print(f" 上下文:")
  179. print(f" ...{content[ctx_start:ctx_end]}...")
  180. if not leak_found:
  181. print(" 未发现跨章节泄漏")
  182. else:
  183. print(" 未找到倒数第二章的 chunks")
  184. # 保存结果
  185. out_dir = Path(__file__).parent
  186. json_path = out_dir / "single_test_result.json"
  187. with open(json_path, "w", encoding="utf-8") as f:
  188. json.dump({
  189. "timestamp": datetime.now().isoformat(),
  190. "filename": TARGET_FILE.name,
  191. "toc_count": toc_info.get('toc_count'),
  192. "toc_pages": toc_info.get('toc_pages'),
  193. "toc_items": toc_items,
  194. "title_locations": [
  195. {
  196. "title": loc["title"],
  197. "found": loc["found"],
  198. "position": loc.get("position"),
  199. "toc_page": loc.get("toc_page"),
  200. "actual_page": loc.get("actual_page"),
  201. }
  202. for loc in located
  203. ],
  204. "chunks_meta": [
  205. {
  206. "chunk_id": c.get("chunk_id"),
  207. "section_label": c.get("section_label"),
  208. "page": c.get("element_tag", {}).get("page"),
  209. "content_len": len(c.get("review_chunk_content", "") or c.get("content", "")),
  210. "content_preview": (c.get("review_chunk_content", "") or c.get("content", ""))[:200].replace("\n", " ")
  211. }
  212. for c in chunks
  213. ],
  214. }, f, ensure_ascii=False, indent=2)
  215. print(f"\n[INFO] 结果已保存: {json_path}")
  216. return 0
  217. if __name__ == "__main__":
  218. sys.exit(analyze())