""" OCR 模型效果与稳定性测试脚本 测试 GLM-OCR 模型在施工方案 PDF 上的表现: - 目录提取测试(默认): YOLO检测目录页 → GLM-OCR识别 → 规则解析 → 保存 catalog JSON - 版面检测(--detection): RapidLayout 表格/图片区域检测 - OCR 识别(--detection): GLM-OCR 对表格区域的识别质量 - 全链路测试(--detection --full-pipeline): 检测 → OCR → 文本回填 - 稳定性测试(--detection --stability): 高并发下的错误率和延迟分布 运行方式: # 默认:仅目录提取测试(快速) python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf # 目录OCR稳定性测试(10次提取对比一致性) python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf --catalog-stability # 目录OCR稳定性测试(30次) python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf --catalog-stability --catalog-iterations 30 # 目录提取 + 版面检测 + OCR识别 python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf --detection # 目录+检测+全链路(检测→OCR→文本回填) python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf --detection --full-pipeline # 目录+检测+稳定性测试(20并发,50次调用) python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf --detection --stability --concurrency 20 --iterations 50 # 批量测试 python utils_test/minimal_pipeline/test_ocr_effectiveness.py -d python utils_test/minimal_pipeline/test_ocr_effectiveness.py -d --detection 输出目录: utils_test/minimal_pipeline/temp/test_ocr_effectiveness/ ├── catalog/ 目录提取结果(每次带时间戳) ├── detection/ 版面检测全页标注图 ├── table/ 表格区域截图+OCR文本 ├── figure/ 图片区域截图+OCR文本 └── results/ JSON 汇总结果 """ import argparse import configparser import json import os import sys import time import statistics from collections import Counter from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import fitz import numpy as np # 从 config.ini 读取 OCR 配置 _CONFIG_PATH = Path(__file__).resolve().parent.parent.parent / "config" / "config.ini" _OCR_CONFIG: Dict[str, str] = {} if _CONFIG_PATH.exists(): _cp = configparser.ConfigParser() _cp.read(str(_CONFIG_PATH), encoding="utf-8") if _cp.has_section("ocr"): _OCR_CONFIG = { "GLM_OCR_API_URL": _cp.get("ocr", "GLM_OCR_API_URL", fallback="http://183.220.37.46:25429/v1/chat/completions"), "GLM_OCR_API_KEY": _cp.get("ocr", "GLM_OCR_API_KEY", fallback=""), "GLM_OCR_TIMEOUT": _cp.get("ocr", "GLM_OCR_TIMEOUT", fallback="600"), } else: _OCR_CONFIG = { "GLM_OCR_API_URL": "http://183.220.37.46:25429/v1/chat/completions", "GLM_OCR_API_KEY": "", "GLM_OCR_TIMEOUT": "600", } else: _OCR_CONFIG = { "GLM_OCR_API_URL": "http://183.220.37.46:25429/v1/chat/completions", "GLM_OCR_API_KEY": "", "GLM_OCR_TIMEOUT": "600", } # 将被测试的目标模块(本地解耦版,不依赖 core/foundation) TEST_DIR = Path(__file__).resolve().parent sys.path.insert(0, str(TEST_DIR.parent.parent)) # 项目根目录,使 utils_test 可导入 from utils_test.minimal_pipeline._ocr_processor import ( OcrProcessor, RAPID_LAYOUT_AVAILABLE, TableRegion, OcrResult, ) # ============================================================ # 数据结构 # ============================================================ @dataclass class DetectionSample: """单次版面检测样本""" page_num: int label: str score: float bbox: Tuple[float, float, float, float] width: float height: float @dataclass class PageDetectionResult: """单页版面检测结果""" page_num: int samples: List[DetectionSample] table_count: int figure_count: int @dataclass class OcrSampleResult: """单次 OCR 识别样本""" page_num: int label: str score: float bbox: Tuple[float, float, float, float] text: str text_length: int success: bool latency_ms: float retry_count: int = 0 error: Optional[str] = None @dataclass class OcrTestResult: """OCR 测试结果汇总""" file_name: str total_pages: int detection: Dict[str, Any] = field(default_factory=dict) ocr: Dict[str, Any] = field(default_factory=dict) pipeline: Dict[str, Any] = field(default_factory=dict) # ============================================================ # OCR 测试器 # ============================================================ class OcrEffectivenessTester: """OCR 模型效果与稳定性测试器""" def __init__( self, ocr_api_url: str = "http://183.220.37.46:25429/v1/chat/completions", ocr_api_key: str = "", ocr_timeout: int = 600, dpi: int = 200, clip_top: float = 60, clip_bottom: float = 60, confidence_threshold: float = 0.5, concurrent_workers: int = 5, ): self.dpi = dpi self.clip_top = clip_top self.clip_bottom = clip_bottom self.confidence_threshold = confidence_threshold self.concurrent_workers = concurrent_workers # 初始化 OcrProcessor 用以复用其版面检测和 OCR 逻辑 self.ocr_processor = OcrProcessor( ocr_api_url=ocr_api_url, ocr_api_key=ocr_api_key, ocr_timeout=ocr_timeout, ocr_dpi=dpi, confidence_threshold=confidence_threshold, concurrent_workers=concurrent_workers, ) # 工具: 检查 RapidLayout 是否可用 def check_environment(self) -> Dict[str, bool]: """检查运行环境依赖""" return { "rapid_layout_available": RAPID_LAYOUT_AVAILABLE, "pymupdf_available": True, "numpy_available": True, } # ============================================================ # 效果测试: 版面检测 # ============================================================ def test_detection( self, pdf_path: Path, pages: Optional[List[int]] = None, save_images_dir: Optional[Path] = None, ) -> Dict[str, Any]: """测试 RapidLayout 版面检测效果""" if not RAPID_LAYOUT_AVAILABLE: return {"error": "RapidLayout 未安装,无法测试版面检测"} doc = fitz.open(str(pdf_path)) try: total_pages = len(doc) target_pages = pages if pages is not None else list(range(total_pages)) all_samples: List[DetectionSample] = [] page_results: List[PageDetectionResult] = [] for page_num in target_pages: page = doc.load_page(page_num) rect = page.rect clip_box = fitz.Rect( 0, self.clip_top, rect.width, rect.height - self.clip_bottom, ) # 使用 OcrProcessor 的版面检测逻辑 regions = self.ocr_processor.detect_table_regions(page, page_num + 1, clip_box) page_samples: List[DetectionSample] = [] for bbox, score, label in regions: x1, y1, x2, y2 = bbox page_samples.append(DetectionSample( page_num=page_num + 1, label=label, score=score, bbox=bbox, width=x2 - x1, height=y2 - y1, )) all_samples.extend(page_samples) page_results.append(PageDetectionResult( page_num=page_num + 1, samples=page_samples, table_count=sum(1 for s in page_samples if s.label == "table"), figure_count=sum(1 for s in page_samples if s.label == "figure"), )) # 保存标注图片 if save_images_dir and page_samples: self._save_detection_image(page, clip_box, page_samples, page_num + 1, save_images_dir) finally: doc.close() # 汇总统计 label_counter = Counter(s.label for s in all_samples) table_count = label_counter.get("table", 0) figure_count = label_counter.get("figure", 0) # 尺寸分布 table_widths = [s.width for s in all_samples if s.label == "table"] table_heights = [s.height for s in all_samples if s.label == "table"] # 置信度分布 table_scores = [s.score for s in all_samples if s.label == "table"] figure_scores = [s.score for s in all_samples if s.label == "figure"] return { "status": "ok", "total_pages": total_pages, "analyzed_pages": len(target_pages), "total_regions": len(all_samples), "label_distribution": dict(label_counter.most_common()), "table_count": table_count, "figure_count": figure_count, "tables_per_page_avg": round(table_count / max(len(target_pages), 1), 2), "figures_per_page_avg": round(figure_count / max(len(target_pages), 1), 2), "table_width_avg": round(statistics.mean(table_widths), 1) if table_widths else None, "table_height_avg": round(statistics.mean(table_heights), 1) if table_heights else None, "table_score_avg": round(statistics.mean(table_scores), 4) if table_scores else None, "figure_score_avg": round(statistics.mean(figure_scores), 4) if figure_scores else None, "table_score_min": round(min(table_scores), 4) if table_scores else None, "table_score_max": round(max(table_scores), 4) if table_scores else None, "page_details": [ { "page": r.page_num, "table_count": r.table_count, "figure_count": r.figure_count, "regions": [ { "label": s.label, "score": round(s.score, 4), "bbox": [round(c, 1) for c in s.bbox], "size": [round(s.width, 1), round(s.height, 1)], } for s in r.samples ], } for r in page_results if r.samples ], } # ============================================================ # 效果测试: OCR 识别 # ============================================================ def test_ocr_recognition( self, pdf_path: Path, pages: Optional[List[int]] = None, max_regions_per_page: int = 5, ) -> Dict[str, Any]: """测试 GLM-OCR 识别质量,先检测表格区域再逐个识别""" doc = fitz.open(str(pdf_path)) try: total_pages = len(doc) target_pages = pages if pages is not None else list(range(total_pages)) # 阶段1: 收集表格区域 all_regions: List[TableRegion] = [] for page_num in target_pages: page = doc.load_page(page_num) rect = page.rect clip_box = fitz.Rect( 0, self.clip_top, rect.width, rect.height - self.clip_bottom, ) regions = self.ocr_processor.detect_table_regions(page, page_num + 1, clip_box) for bbox, score, label in regions[:max_regions_per_page]: all_regions.append(TableRegion( page_num=page_num + 1, page=page, bbox=bbox, score=score, label=label, )) if not all_regions: return { "status": "no_regions", "message": "未检测到表格区域,无需 OCR 识别", "total_pages": total_pages, } # 阶段2: 串行逐个识别(记录详细统计) ocr_samples: List[OcrSampleResult] = [] total = len(all_regions) print(f"\n [OCR识别测试] 共 {total} 个区域,开始串行识别...") for idx, region in enumerate(all_regions): start_time = time.perf_counter() retry_count = 0 error = None text = "" success = False # 手动调用 _ocr_table_region 并记录重试次数 # (使用指数退避重试,最多3次) for attempt in range(3): try: text = self.ocr_processor._ocr_table_region( region.page, region.bbox, max_retries=1, ) success = True retry_count = attempt break except Exception as e: error = str(e)[:200] if attempt < 2: time.sleep(1) latency = (time.perf_counter() - start_time) * 1000 # 判断是否为 Non-table is_non_table = text.strip() == "" ocr_samples.append(OcrSampleResult( page_num=region.page_num, label=region.label, score=region.score, bbox=region.bbox, text=text, text_length=len(text.strip()), success=success or is_non_table, # Non-table 也算成功 latency_ms=round(latency, 1), retry_count=retry_count, error=error if not success and not is_non_table else None, )) progress = f"[{idx + 1}/{total}]" status = "OK" if success else f"FAIL({error[:40]})" print(f" {progress} 第{region.page_num}页 [{region.label}] " f"score={region.score:.2f} 耗时={latency:.0f}ms 状态={status}") finally: doc.close() # 统计 total_count = len(ocr_samples) success_count = sum(1 for s in ocr_samples if s.success) non_table_count = sum(1 for s in ocr_samples if not s.text.strip()) table_with_content = sum(1 for s in ocr_samples if s.text.strip()) latencies = [s.latency_ms for s in ocr_samples if s.success] text_lengths = [s.text_length for s in ocr_samples if s.text_length > 0] return { "status": "ok", "total_regions": total_count, "success_count": success_count, "non_table_count": non_table_count, "table_with_content": table_with_content, "success_rate": round(success_count / max(total_count, 1) * 100, 1), "content_rate": round(table_with_content / max(total_count, 1) * 100, 1), "latency_ms_avg": round(statistics.mean(latencies), 0) if latencies else None, "latency_ms_min": round(min(latencies), 0) if latencies else None, "latency_ms_max": round(max(latencies), 0) if latencies else None, "latency_ms_p50": self._percentile(latencies, 50) if latencies else None, "latency_ms_p95": self._percentile(latencies, 95) if latencies else None, "text_length_avg": round(statistics.mean(text_lengths), 0) if text_lengths else None, "text_length_max": max(text_lengths) if text_lengths else None, "retry_distribution": dict(Counter(s.retry_count for s in ocr_samples).most_common()), "label_breakdown": { label: { "count": sum(1 for s in ocr_samples if s.label == label), "success": sum(1 for s in ocr_samples if s.label == label and s.success), "with_content": sum(1 for s in ocr_samples if s.label == label and s.text.strip()), } for label in set(s.label for s in ocr_samples) }, "errors": list(set(s.error for s in ocr_samples if s.error))[:10], "samples": [ { "page": s.page_num, "label": s.label, "score": round(s.score, 4), "text_preview": s.text[:200] if s.text else "(empty/Non-table)", "text_length": s.text_length, "success": s.success, "latency_ms": s.latency_ms, "retry_count": s.retry_count, } for s in ocr_samples[:20] # 只保留前20个样本 ], } # ============================================================ # 稳定性测试: 并发 + 重试 # ============================================================ def test_stability( self, pdf_path: Path, concurrency: int = 5, iterations: int = 10, pages: Optional[List[int]] = None, ) -> Dict[str, Any]: """稳定性测试:高并发 OCR 调用,观测错误率、延迟分布、资源泄漏 Args: concurrency: 并发线程数 iterations: 总 OCR 调用次数(分配到各区域) """ doc = fitz.open(str(pdf_path)) try: total_pages = len(doc) target_pages = pages if pages is not None else list(range(min(total_pages, 10))) # 收集一定数量的表格区域作为测试样本 all_regions: List[TableRegion] = [] for page_num in target_pages: page = doc.load_page(page_num) rect = page.rect clip_box = fitz.Rect( 0, self.clip_top, rect.width, rect.height - self.clip_bottom, ) regions = self.ocr_processor.detect_table_regions(page, page_num + 1, clip_box) for bbox, score, label in regions: all_regions.append(TableRegion( page_num=page_num + 1, page=page, bbox=bbox, score=score, label=label, )) if not all_regions: return { "status": "no_regions", "message": "未检测到表格区域,跳过稳定性测试", } # 循环分配任务: 每次从 regions 列表循环取一个 total_tasks = min(iterations, len(all_regions) * 3) task_regions = [all_regions[i % len(all_regions)] for i in range(total_tasks)] print(f"\n [稳定性测试] 并发={concurrency}, 任务数={total_tasks}, 区域样本数={len(all_regions)}") # 并发执行 OCR ocr_samples: List[OcrSampleResult] = [] progress_lock = [0] def _ocr_task(region: TableRegion, task_idx: int) -> OcrSampleResult: start_time = time.perf_counter() error = None text = "" success = False retry_count = 0 for attempt in range(3): try: text = self.ocr_processor._ocr_table_region( region.page, region.bbox, max_retries=1, ) success = True retry_count = attempt break except Exception as e: error = str(e)[:200] time.sleep(0.5) latency = (time.perf_counter() - start_time) * 1000 with ThreadPoolExecutor._thread_queues: pass # dummy for lock # 简单进度 progress_lock[0] += 1 done = progress_lock[0] if done % max(1, total_tasks // 10) == 0 or done == total_tasks: pct = done / total_tasks * 100 print(f" [进度] {done}/{total_tasks} ({pct:.0f}%)", flush=True) return OcrSampleResult( page_num=region.page_num, label=region.label, score=region.score, bbox=region.bbox, text=text, text_length=len(text.strip()), success=success, latency_ms=round(latency, 1), retry_count=retry_count, error=error if not success else None, ) # 使用 ThreadPoolExecutor 并发执行 results: List[OcrSampleResult] = [] with ThreadPoolExecutor(max_workers=concurrency) as executor: futures = { executor.submit(_ocr_task, region, idx): (region, idx) for idx, region in enumerate(task_regions) } for future in as_completed(futures): try: results.append(future.result()) except Exception as e: # 不会发生,因为内部已 catch pass ocr_samples = results finally: doc.close() # 统计 total_count = len(ocr_samples) success_count = sum(1 for s in ocr_samples if s.success) non_table_count = sum(1 for s in ocr_samples if not s.text.strip()) table_with_content = sum(1 for s in ocr_samples if s.text.strip()) fail_count = total_count - success_count latencies = sorted(s.latency_ms for s in ocr_samples if s.success) return { "status": "ok", "concurrency": concurrency, "total_requests": total_count, "success_count": success_count, "fail_count": fail_count, "non_table_count": non_table_count, "table_with_content": table_with_content, "success_rate": round(success_count / max(total_count, 1) * 100, 1), "error_rate": round(fail_count / max(total_count, 1) * 100, 1), "latency_ms_avg": round(statistics.mean(latencies), 0) if latencies else None, "latency_ms_min": min(latencies) if latencies else None, "latency_ms_max": max(latencies) if latencies else None, "latency_ms_p50": self._percentile(latencies, 50) if latencies else None, "latency_ms_p95": self._percentile(latencies, 95) if latencies else None, "latency_ms_p99": self._percentile(latencies, 99) if latencies else None, "latency_ms_std": round(statistics.stdev(latencies), 0) if len(latencies) > 1 else None, "retry_distribution": dict(Counter(s.retry_count for s in ocr_samples).most_common()), "errors": list(set(s.error for s in ocr_samples if s.error))[:10], } # ============================================================ # 全链路测试: 检测 → OCR → 回填 # ============================================================ def test_full_pipeline( self, pdf_path: Path, pages: Optional[List[int]] = None, ) -> Dict[str, Any]: """测试 OCR 全链路: 版面检测 → 并发 OCR → 文本回填""" doc = fitz.open(str(pdf_path)) try: total_pages = len(doc) target_pages = pages if pages is not None else list(range(total_pages)) # 阶段1: 检测表格区域 all_regions: List[TableRegion] = [] for page_num in target_pages: page = doc.load_page(page_num) rect = page.rect clip_box = fitz.Rect( 0, self.clip_top, rect.width, rect.height - self.clip_bottom, ) regions = self.ocr_processor.detect_table_regions(page, page_num + 1, clip_box) for bbox, score, label in regions: all_regions.append(TableRegion( page_num=page_num + 1, page=page, bbox=bbox, score=score, label=label, )) table_count = sum(1 for r in all_regions if r.label == "table") figure_count = sum(1 for r in all_regions if r.label == "figure") if not all_regions: return { "status": "no_regions", "total_pages": total_pages, "message": "未检测到表格/图片区域", } # 阶段2: 并发 OCR ocr_start = time.perf_counter() ocr_results = extractor._process_ocr_concurrent(all_regions) ocr_elapsed = time.perf_counter() - ocr_start ocr_success = sum(1 for r in ocr_results if r.success and r.text.strip()) ocr_fail = sum(1 for r in ocr_results if not r.success) ocr_empty = sum(1 for r in ocr_results if r.success and not r.text.strip()) # 阶段3: 检查文本回填效果 # 对每页对比 原始文本 vs OCR回填文本 page_comparison = [] for page_num in target_pages: page = doc.load_page(page_num) rect = page.rect clip_box = fitz.Rect( 0, self.clip_top, rect.width, rect.height - self.clip_bottom, ) original_text = page.get_text("text", clip=clip_box) page_ocr_results = [ { "region_index": i, "bbox": r.bbox, "score": r.score, "ocr_text": r.text, } for i, r in enumerate(ocr_results) if r.page_num == page_num + 1 and r.success ] replaced_text = extractor._replace_table_regions( page, original_text, page_ocr_results, clip_box, ) has_replacement = replaced_text != original_text page_comparison.append({ "page": page_num + 1, "original_length": len(original_text), "replaced_length": len(replaced_text), "has_replacement": has_replacement, "ocr_regions_on_page": len(page_ocr_results), "length_change": len(replaced_text) - len(original_text), }) finally: doc.close() replaced_pages = sum(1 for p in page_comparison if p["has_replacement"]) total_latencies = [r.latency_ms for r in ocr_results if r.success] return { "status": "ok", "total_pages": total_pages, "analyzed_pages": len(target_pages), "total_regions": len(all_regions), "table_count": table_count, "figure_count": figure_count, "ocr_results": { "total": len(ocr_results), "success_with_content": ocr_success, "empty_non_table": ocr_empty, "failed": ocr_fail, "content_rate": round(ocr_success / max(len(ocr_results), 1) * 100, 1), "ocr_total_time_s": round(ocr_elapsed, 2), "ocr_avg_latency_ms": round(statistics.mean(total_latencies), 0) if total_latencies else None, }, "replacement": { "pages_with_replacement": replaced_pages, "replacement_rate": round(replaced_pages / max(len(target_pages), 1) * 100, 1), }, "page_details": page_comparison[:30], } # ============================================================ # 辅助方法 # ============================================================ @staticmethod def _percentile(data: List[float], p: float) -> float: if not data: return 0.0 sorted_data = sorted(data) idx = max(0, min(len(sorted_data) - 1, int(len(sorted_data) * p / 100))) return round(sorted_data[idx], 0) def _save_detection_image( self, page: fitz.Page, clip_box: fitz.Rect, samples: List[DetectionSample], page_num: int, output_dir: Path, ): """保存带检测框的页面图片""" try: from PIL import Image, ImageDraw except ImportError: return pix = page.get_pixmap(dpi=self.dpi, clip=clip_box) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) draw = ImageDraw.Draw(img) # 还原原始图片尺寸(裁剪前) scale_x = pix.width / clip_box.width scale_y = pix.height / clip_box.height colors = { "table": (0, 255, 0), "figure": (255, 80, 80), } for s in samples: color = colors.get(s.label, (200, 200, 200)) x1 = (s.bbox[0] - clip_box.x0) * scale_x y1 = (s.bbox[1] - clip_box.y0) * scale_y x2 = (s.bbox[2] - clip_box.x0) * scale_x y2 = (s.bbox[3] - clip_box.y0) * scale_y draw.rectangle([x1, y1, x2, y2], outline=color, width=2) draw.text((x1 + 2, y1 + 2), f"{s.label} ({s.score:.2f})", fill=color) output_path = output_dir / f"page_{page_num:03d}_ocr_detection.jpg" img.save(str(output_path), quality=85) # ============================================================ # 报告打印 # ============================================================ def print_env_report(env: Dict[str, bool]): print("\n" + "=" * 60) print(" 环境检查") print("=" * 60) for k, v in env.items(): status = "✓ 可用" if v else "✗ 不可用" print(f" {k:30s}: {status}") if not env.get("rapid_layout_available"): print("\n ⚠ RapidLayout 未安装,版面检测功能受限") print(" 安装: pip install rapid-layout") print() def print_detection_report(result: Dict[str, Any]): if "error" in result: print(f" [错误] {result['error']}") return print("\n" + "=" * 70) print(" 版面检测效果报告 (RapidLayout)") print("=" * 70) print(f" 分析页数: {result['analyzed_pages']} / {result['total_pages']}") print(f" 检测区域总数: {result['total_regions']}") print(f" 表格数: {result['table_count']} (均 {result['tables_per_page_avg']}/页)") print(f" 图片数: {result['figure_count']} (均 {result['figures_per_page_avg']}/页)") print() if result["label_distribution"]: print(" 标签分布:") for label, count in result["label_distribution"].items(): pct = count / max(result["total_regions"], 1) * 100 bar = "█" * int(pct / 2) print(f" {label:12s}: {count:4d} ({pct:5.1f}%) {bar}") print() if result.get("table_score_avg"): print(f" 表格置信度: avg={result['table_score_avg']:.3f} " f"min={result['table_score_min']:.3f} max={result['table_score_max']:.3f}") if result.get("figure_score_avg"): print(f" 图片置信度: avg={result['figure_score_avg']:.3f}") if result.get("page_details"): print() print(" 逐页详情:") for p in result["page_details"]: regions_str = ", ".join( f"[{r['label']}]({r['score']:.2f})" for r in p["regions"] ) print(f" 第{p['page']:3d}页: table={p['table_count']} figure={p['figure_count']} {regions_str}") def print_ocr_report(result: Dict[str, Any]): if "error" in result: print(f" [错误] {result['error']}") return if result.get("status") == "no_regions": print(f"\n [提示] {result['message']}") return print("\n" + "=" * 70) print(" OCR 识别效果报告 (GLM-OCR)") print("=" * 70) print(f" 总区域数: {result['total_regions']}") print(f" 识别成功: {result['success_count']} ({result['success_rate']}%)") print(f" 含表格内容: {result['table_with_content']} ({result['content_rate']}%)") print(f" Non-table(跳过): {result['non_table_count']}") if result.get("latency_ms_avg"): print(f"\n 延迟统计 (ms):") print(f" 平均: {result['latency_ms_avg']:.0f}") print(f" 最小: {result['latency_ms_min']:.0f}") print(f" 最大: {result['latency_ms_max']:.0f}") print(f" P50: {result['latency_ms_p50']:.0f}") print(f" P95: {result['latency_ms_p95']:.0f}") if result.get("text_length_avg"): print(f"\n 文本长度: avg={result['text_length_avg']:.0f} max={result['text_length_max']}") if result.get("retry_distribution"): print(f"\n 重试分布: {result['retry_distribution']}") if result.get("label_breakdown"): print(f"\n 按标签统计:") for label, stats in result["label_breakdown"].items(): print(f" {label:8s}: 总数={stats['count']}, 成功={stats['success']}, " f"含内容={stats['with_content']}") if result.get("errors"): print(f"\n 错误 ({len(result['errors'])} 种):") for e in result["errors"]: print(f" - {e}") if result.get("samples"): print(f"\n 样本预览 (前20):") print(f" {'页':>4s} {'标签':>8s} {'置信度':>8s} {'耗时ms':>8s} {'重试':>4s} {'内容':>6s} {'预览'}") print(f" {'-'*60}") for s in result["samples"]: preview = (s["text_preview"][:50] + "..") if len(s.get("text_preview", "")) > 50 else s.get("text_preview", "") ok = "✓" if s["success"] else "✗" print(f" {s['page']:4d} {s['label']:>8s} {s['score']:.2f} {s['latency_ms']:6.0f} {s['retry_count']:3d} " f"{ok:>4s} {preview}") print() def print_stability_report(result: Dict[str, Any]): if result.get("status") == "no_regions": print(f"\n [提示] {result['message']}") return print("\n" + "=" * 70) print(" 稳定性测试报告") print("=" * 70) print(f" 并发数: {result['concurrency']}") print(f" 总请求数: {result['total_requests']}") print(f" 成功: {result['success_count']} ({result['success_rate']}%)") print(f" 失败: {result['fail_count']} ({result['error_rate']}%)") print(f" 含表格内容: {result['table_with_content']}") print(f" Non-table跳过: {result['non_table_count']}") if result.get("latency_ms_avg"): print(f"\n 延迟统计 (ms):") print(f" 平均: {result['latency_ms_avg']:.0f}") print(f" 最小: {result['latency_ms_min']:.0f}") print(f" 最大: {result['latency_ms_max']:.0f}") print(f" P50: {result['latency_ms_p50']:.0f}") print(f" P95: {result['latency_ms_p95']:.0f}") print(f" P99: {result['latency_ms_p99']:.0f}") if result.get("latency_ms_std"): print(f" 标准差: {result['latency_ms_std']:.0f}") if result.get("retry_distribution"): print(f"\n 重试分布: {result['retry_distribution']}") if result.get("errors"): print(f"\n 错误列表:") for e in result["errors"]: print(f" - {e}") print() def print_pipeline_report(result: Dict[str, Any]): if result.get("status") == "no_regions": print(f"\n [提示] {result['message']}") return print("\n" + "=" * 70) print(" 全链路测试报告 (检测 → OCR → 回填)") print("=" * 70) print(f" 总页数: {result['total_pages']}") print(f" 分析页数: {result['analyzed_pages']}") print(f" 检测区域: 表格={result['table_count']}, 图片={result['figure_count']}") ocr = result.get("ocr_results", {}) print(f"\n OCR 识别:") print(f" 总区域: {ocr.get('total', 0)}") print(f" 含内容: {ocr.get('success_with_content', 0)} ({ocr.get('content_rate', 0)}%)") print(f" Non-table跳过: {ocr.get('empty_non_table', 0)}") print(f" 失败: {ocr.get('failed', 0)}") print(f" 总耗时: {ocr.get('ocr_total_time_s', 0)}s") if ocr.get("ocr_avg_latency_ms"): print(f" 平均延迟: {ocr['ocr_avg_latency_ms']:.0f}ms") repl = result.get("replacement", {}) print(f"\n 文本回填:") print(f" 发生替换的页数: {repl.get('pages_with_replacement', 0)}/{result['analyzed_pages']} ({repl.get('replacement_rate', 0)}%)") print() def _save_ocr_region_images( pdf_path: Path, det_result: Dict[str, Any], table_img_dir: Path, figure_img_dir: Path, tester: OcrEffectivenessTester, ) -> Dict[str, int]: """将检测到的表格/图片区域截图和OCR识别内容分别保存到对应目录""" from PIL import Image count = {"table": 0, "figure": 0} page_details = det_result.get("page_details", []) if not page_details: return count doc = fitz.open(str(pdf_path)) try: for page_info in page_details: page_num = page_info["page"] - 1 page = doc.load_page(page_num) rect = page.rect clip_box = fitz.Rect(0, tester.clip_top, rect.width, rect.height - tester.clip_bottom) for region in page_info.get("regions", []): label = region["label"] bbox = region["bbox"] score = region["score"] # 确定保存目录 if label == "table": target_dir = table_img_dir elif label == "figure": target_dir = figure_img_dir else: continue pdf_rect = fitz.Rect(bbox) pix = page.get_pixmap(dpi=tester.dpi, clip=pdf_rect) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) basename = f"{pdf_path.stem}_p{page_info['page']:03d}_{label}_{count[label]:02d}" img_path = target_dir / f"{basename}.jpg" img.save(str(img_path), quality=90) # 尝试 OCR 识别并保存文本内容 try: ocr_text = tester.ocr_processor._ocr_table_region(page, bbox, max_retries=1) txt_path = target_dir / f"{basename}.txt" with open(txt_path, "w", encoding="utf-8") as f: f.write(ocr_text) except Exception as e: txt_path = target_dir / f"{basename}.txt" with open(txt_path, "w", encoding="utf-8") as f: f.write(f"[OCR识别失败] {e}") count[label] += 1 finally: doc.close() return count def _test_catalog_ocr( pdf_path: Path, tester: OcrEffectivenessTester, catalog_dir: Path, detection_img_dir: Path, timestamp: str = "", ) -> Dict[str, Any]: """测试目录页(目录)OCR识别效果,使用项目实际的 _extract_catalog 链路 输出格式与项目 catalog JSON 一致: {"chapters": [...], "total_chapters": N, "raw_ocr_text": "...", "formatted_text": "..."} """ from utils_test.minimal_pipeline._toc_detector import TOCCatalogExtractor file_content = pdf_path.read_bytes() # 文件名前缀(提前定义,供后续截图使用) suffix = f"_{timestamp}" if timestamp else "" stem = f"{pdf_path.stem}{suffix}" # 使用 YOLO + GLM-OCR 目录提取链路(不依赖 core/foundation) extractor = TOCCatalogExtractor( model_path=str(TEST_DIR / "best.pt"), ocr_api_url=tester.ocr_processor.ocr_api_url, ocr_api_key=tester.ocr_processor.ocr_api_key, ocr_timeout=tester.ocr_processor.ocr_timeout, ) catalog = extractor.detect_and_extract(file_content) catalog = catalog or {} # 保存目录页截图(使用 YOLO 检测到的目录页码范围) from PIL import Image as PILImage try: doc = fitz.open(stream=file_content) try: toc_range = (catalog or {}).get("toc_page_range") if toc_range: for page_num in range(toc_range["start"] - 1, toc_range["end"]): page = doc.load_page(page_num) pix = page.get_pixmap(dpi=150) img = PILImage.frombytes("RGB", [pix.width, pix.height], pix.samples) img_path = catalog_dir / f"{stem}_catalog_page_{page_num + 1:03d}.jpg" img.save(str(img_path), quality=85) finally: doc.close() except Exception as e: print(f" [警告] 目录页截图保存失败: {e}") # 构造与项目格式一致的 catalog 输出 # 格式: {"catalog": {"chapters": [...], "total_chapters": N}, "raw_ocr_text": "..."} # 不含 content/page_start/page_end 等后续流程才填充的字段 catalog_output: Dict[str, Any] = { "catalog": { "chapters": [], "total_chapters": 0, }, "raw_ocr_text": "", } raw_ocr_text = "" extract_status = "failed" if catalog: chapters = catalog.get("chapters", []) # 清理掉可能混入的 content 等字段(仅保留 catalog 原始字段) clean_chapters = [] for ch in chapters: clean_ch = { "index": ch.get("index", 0), "title": ch.get("title", ""), "page": str(ch.get("page", "")), "original": ch.get("original", ""), } clean_subs = [] for sub in ch.get("subsections", []): clean_subs.append({ "title": sub.get("title", ""), "page": str(sub.get("page", "")), "level": sub.get("level", 2), "original": sub.get("original", ""), }) clean_ch["subsections"] = clean_subs clean_chapters.append(clean_ch) catalog_output["catalog"]["chapters"] = clean_chapters catalog_output["catalog"]["total_chapters"] = len(clean_chapters) raw_ocr_text = catalog.get("raw_ocr_text", "") or "" catalog_output["raw_ocr_text"] = raw_ocr_text extract_status = "success" # 保存 catalog JSON(含 raw_ocr_text 字段替代单独的 txt) json_path = catalog_dir / f"{stem}_catalog.json" with open(json_path, "w", encoding="utf-8") as f: json.dump(catalog_output, f, ensure_ascii=False, indent=2) # 返回简洁统计信息,json_path 供主流程使用 return { "extract_status": extract_status, "total_chapters": catalog_output["catalog"]["total_chapters"], "raw_ocr_length": len(raw_ocr_text), "json_path": str(json_path), } def _test_catalog_stability( pdf_path: Path, tester: OcrEffectivenessTester, catalog_dir: Path, iterations: int = 10, ) -> Dict[str, Any]: """目录OCR稳定性测试:多次执行目录提取,对比结果一致性""" from utils_test.minimal_pipeline._toc_detector import TOCCatalogExtractor file_content = pdf_path.read_bytes() timestamp = time.strftime("%Y%m%d_%H%M%S") results = [] # 先快速提取一次获取目录页范围用于截图 first_extractor = TOCCatalogExtractor( model_path=str(TEST_DIR / "best.pt"), ocr_api_url=tester.ocr_processor.ocr_api_url, ocr_api_key=tester.ocr_processor.ocr_api_key, ocr_timeout=tester.ocr_processor.ocr_timeout, ) first_catalog = first_extractor.detect_and_extract(file_content) or {} # 保存目录页截图(使用 YOLO 检测到的目录页码范围) from PIL import Image as PILImage try: doc = fitz.open(stream=file_content) try: toc_range = first_catalog.get("toc_page_range") if toc_range: for page_num in range(toc_range["start"] - 1, toc_range["end"]): page = doc.load_page(page_num) pix = page.get_pixmap(dpi=150) img = PILImage.frombytes("RGB", [pix.width, pix.height], pix.samples) img_path = catalog_dir / f"{pdf_path.stem}_{timestamp}_catalog_page_{page_num + 1:03d}.jpg" img.save(str(img_path), quality=85) finally: doc.close() print(f" [保存] 目录页截图 → {catalog_dir}/") except Exception as e: print(f" [警告] 目录页截图保存失败: {e}") print(f"\n [目录稳定性] 开始 {iterations} 次目录提取...") for i in range(iterations): extractor = TOCCatalogExtractor( model_path=str(TEST_DIR / "best.pt"), ocr_api_url=tester.ocr_processor.ocr_api_url, ocr_api_key=tester.ocr_processor.ocr_api_key, ocr_timeout=tester.ocr_processor.ocr_timeout, ) start = time.perf_counter() try: catalog = extractor.detect_and_extract(file_content) catalog = catalog or {} elapsed = (time.perf_counter() - start) * 1000 if catalog: chapters = catalog.get("chapters", []) raw_text = catalog.get("raw_ocr_text", "") or "" chapter_titles = [ch.get("title", "") for ch in chapters] # 保存本次运行的独立 catalog JSON(含 raw_ocr_text) run_output = { "run": i + 1, "catalog": { "chapters": [ {"index": ch.get("index"), "title": ch.get("title", ""), "page": str(ch.get("page", "")), "original": ch.get("original", ""), "subsections": [ {"title": s.get("title", ""), "page": str(s.get("page", "")), "level": s.get("level", 2), "original": s.get("original", "")} for s in (ch.get("subsections") or []) ]} for ch in chapters ], "total_chapters": len(chapters), }, "raw_ocr_text": raw_text, "elapsed_ms": round(elapsed, 0), } run_path = catalog_dir / f"{pdf_path.stem}_{timestamp}_{i + 1:03d}_catalog.json" with open(run_path, "w", encoding="utf-8") as f: json.dump(run_output, f, ensure_ascii=False, indent=2) results.append({ "run": i + 1, "success": True, "elapsed_ms": round(elapsed, 0), "chapter_count": len(chapters), "chapter_titles": chapter_titles, "raw_text_len": len(raw_text), "raw_text_hash": hash(raw_text), "json_path": str(run_path), "raw_text": raw_text, }) else: results.append({ "run": i + 1, "success": False, "elapsed_ms": round(elapsed, 0), "chapter_count": 0, "error": "catalog is None", }) except Exception as e: results.append({ "run": i + 1, "success": False, "elapsed_ms": round((time.perf_counter() - start) * 1000, 0), "error": str(e)[:200], }) r = results[-1] print(f" [{i + 1}/{iterations}] " f"{'OK' if r['success'] else 'FAIL'} " f"{r.get('chapter_count', 0):>2}章 " f"{r.get('elapsed_ms', 0):.0f}ms", flush=True) # ---- 一致性分析 ---- success_runs = [r for r in results if r["success"]] fail_count = len(results) - len(success_runs) title_sequences = [tuple(r["chapter_titles"]) for r in success_runs] unique_sequences = set(title_sequences) text_hashes = {r.get("raw_text_hash") for r in success_runs if "raw_text_hash" in r} chapter_counts = [r["chapter_count"] for r in success_runs] count_distribution = {} for c in chapter_counts: count_distribution[c] = count_distribution.get(c, 0) + 1 patterns = [] for seq in unique_sequences: matching_runs = [r["run"] for r in success_runs if tuple(r["chapter_titles"]) == seq] patterns.append({"titles": list(seq), "count": len(matching_runs), "runs": matching_runs}) patterns.sort(key=lambda x: -x["count"]) latencies = [r["elapsed_ms"] for r in success_runs] stability = { "total_runs": len(results), "success_count": len(success_runs), "fail_count": fail_count, "success_rate": round(len(success_runs) / max(len(results), 1) * 100, 1), "all_titles_identical": len(unique_sequences) <= 1, "all_text_identical": len(text_hashes) <= 1, "unique_title_patterns": len(unique_sequences), "unique_text_hashes": len(text_hashes), "chapter_count_distribution": count_distribution, "most_common_chapter_count": max(set(chapter_counts), key=chapter_counts.count) if chapter_counts else 0, "latency_ms_avg": round(statistics.mean(latencies), 0) if latencies else None, "latency_ms_min": min(latencies) if latencies else None, "latency_ms_max": max(latencies) if latencies else None, "patterns": patterns, } output = {"stability": stability, "runs": results} json_path = catalog_dir / f"{pdf_path.stem}_{timestamp}_catalog_stability.json" with open(json_path, "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) print(f" [保存] 稳定性报告 → {json_path}") return stability # ============================================================ # 主入口 # ============================================================ def main(): parser = argparse.ArgumentParser( description="OCR 模型效果与稳定性测试", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: # 默认:仅目录提取测试 python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p test.pdf # 版面检测测试(表格/图片检测 + OCR识别) python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p test.pdf --detection # 全链路测试(检测+OCR+回填) python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p test.pdf --detection --full-pipeline # 稳定性测试(20并发,50次调用) python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p test.pdf --detection --stability --concurrency 20 --iterations 50 # 批量测试目录下所有 PDF python utils_test/minimal_pipeline/test_ocr_effectiveness.py -d ./pdfs/ """, ) parser.add_argument("-p", "--pdf", help="单个 PDF 文件路径") parser.add_argument("-d", "--dir", help="批量: PDF 文件目录") parser.add_argument("positional_pdf", nargs="?", metavar="PDF", help="也支持位置参数直接传 PDF 路径") parser.add_argument("--pages", help="分析指定页码, 逗号分隔 (1-based)") parser.add_argument("--detection", action="store_true", help="版面检测 + OCR 识别测试(默认仅目录OCR)") parser.add_argument("--catalog-stability", action="store_true", help="目录OCR稳定性测试(多次提取对比一致性)") parser.add_argument("--catalog-iterations", type=int, default=10, help="目录稳定性测试迭代次数(默认 10)") parser.add_argument("--full-pipeline", action="store_true", help="全链路测试(需同时开启 --detection)") parser.add_argument("--stability", action="store_true", help="稳定性测试(需同时开启 --detection)") parser.add_argument("--concurrency", type=int, default=5, help="稳定性测试并发数") parser.add_argument("--iterations", type=int, default=10, help="稳定性测试迭代次数") parser.add_argument("--output-dir", help="输出目录(默认 utils_test/minimal_pipeline/temp/test_ocr_effectiveness/)") parser.add_argument("--confidence", type=float, default=0.5, help="检测置信度阈值") parser.add_argument("--json", action="store_true", help="以 JSON 格式输出结果") parser.add_argument("--ocr-url", default=_OCR_CONFIG["GLM_OCR_API_URL"], help="OCR API 地址") parser.add_argument("--ocr-key", default=_OCR_CONFIG["GLM_OCR_API_KEY"], help="OCR API 密钥") parser.add_argument("--ocr-timeout", type=int, default=int(_OCR_CONFIG["GLM_OCR_TIMEOUT"]), help="OCR 超时秒数") args = parser.parse_args() if not args.pdf and not args.dir: if args.positional_pdf: args.pdf = args.positional_pdf else: parser.print_help() return 1 tester = OcrEffectivenessTester( ocr_api_url=args.ocr_url, ocr_api_key=args.ocr_key, ocr_timeout=args.ocr_timeout, confidence_threshold=args.confidence, concurrent_workers=args.concurrency if args.stability else 5, ) pages = None if args.pages: pages = [int(p.strip()) - 1 for p in args.pages.split(",")] # ---- 环境检查 ---- env = tester.check_environment() if not args.json: print_env_report(env) # ---- 单个文件模式 ---- if args.pdf: pdf_path = Path(args.pdf) if not pdf_path.exists(): print(f"[错误] PDF 文件不存在: {pdf_path}") return 1 # 输出目录结构: temp/test_ocr_effectiveness/{catalog, detection, table, figure, results} base_output = Path(args.output_dir) if args.output_dir else TEST_DIR / "temp" / "test_ocr_effectiveness" catalog_dir = base_output / "catalog" detection_img_dir = base_output / "detection" table_img_dir = base_output / "table" figure_img_dir = base_output / "figure" results_dir = base_output / "results" for d in [catalog_dir, detection_img_dir, table_img_dir, figure_img_dir, results_dir]: d.mkdir(parents=True, exist_ok=True) timestamp = time.strftime("%Y%m%d_%H%M%S") print(f"\n[测试] {pdf_path.name}") print(f"[输出] 目录提取 → {catalog_dir}/") print(f"[输出] 版面检测图 → {detection_img_dir}/") print(f"[输出] 表格区域截图 → {table_img_dir}/") print(f"[输出] 图片区域截图 → {figure_img_dir}/") print(f"[输出] JSON 结果 → {results_dir}/") print("=" * 70) # 目录页 OCR 识别测试 # 有 --catalog-stability 时,稳定性编号文件已含完整结果,跳过单次提取 catalog_result = None catalog_stability_result = None if args.catalog_stability: catalog_stability_result = _test_catalog_stability( pdf_path, tester, catalog_dir, iterations=args.catalog_iterations, ) s = catalog_stability_result print(f" [目录稳定性] {s['total_runs']}次, " f"一致={s['all_titles_identical']}, " f"模式数={s['unique_title_patterns']}, " f"延迟avg={s['latency_ms_avg']:.0f}ms") catalog_result = { "extract_status": "success" if s.get("success_count", 0) > 0 else "failed", "total_chapters": s.get("most_common_chapter_count", 0), "raw_ocr_length": 0, "json_path": "", } else: catalog_result = _test_catalog_ocr(pdf_path, tester, catalog_dir, detection_img_dir, timestamp) cat_status = catalog_result.get("extract_status", "failed") cat_chapters = catalog_result.get("total_chapters", 0) cat_raw_len = catalog_result.get("raw_ocr_length", 0) print(f" [目录OCR] status={cat_status}, chapters={cat_chapters}, raw_ocr_len={cat_raw_len}") if cat_chapters > 0: print(f" [保存] catalog JSON → {catalog_result.get('json_path', '')}") # 版面检测 + OCR 识别(仅 --detection 时启用) det_result = {"status": "skipped", "total_pages": 0} ocr_result = {"status": "skipped"} pipeline_result = None stab_result = None saved_table_count = {"table": 0, "figure": 0} if args.detection: det_result = tester.test_detection(pdf_path, pages=pages, save_images_dir=detection_img_dir) if not args.json: print_detection_report(det_result) saved_table_count = _save_ocr_region_images(pdf_path, det_result, table_img_dir, figure_img_dir, tester) print(f" [保存] 表格区域截图: {saved_table_count['table']} 张 → {table_img_dir}/") print(f" [保存] 图片区域截图: {saved_table_count['figure']} 张 → {figure_img_dir}/") ocr_result = tester.test_ocr_recognition(pdf_path, pages=pages) if not args.json: print_ocr_report(ocr_result) if args.full_pipeline: pipeline_result = tester.test_full_pipeline(pdf_path, pages=pages) if not args.json: print_pipeline_report(pipeline_result) if args.stability: stab_result = tester.test_stability( pdf_path, concurrency=args.concurrency, iterations=args.iterations, pages=pages, ) if not args.json: print_stability_report(stab_result) # 保存 JSON 结果到文件 output = { "file": pdf_path.name, "test_time": time.strftime("%Y-%m-%d %H:%M:%S"), "environment": env, "detection": det_result, "ocr": ocr_result, "catalog_ocr": { "extract_status": catalog_result.get("extract_status"), "total_chapters": catalog_result.get("total_chapters"), "raw_ocr_length": catalog_result.get("raw_ocr_length"), "json_path": catalog_result.get("json_path"), }, } if pipeline_result: output["pipeline"] = pipeline_result if stab_result: output["stability"] = stab_result if catalog_stability_result: output["catalog_stability"] = { "total_runs": catalog_stability_result.get("total_runs"), "all_titles_identical": catalog_stability_result.get("all_titles_identical"), "unique_title_patterns": catalog_stability_result.get("unique_title_patterns"), "most_common_chapter_count": catalog_stability_result.get("most_common_chapter_count"), } json_path = results_dir / f"{pdf_path.stem}_ocr_test_result.json" with open(json_path, "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) print(f"\n [保存] JSON 结果 → {json_path}") if args.json: print(json.dumps(output, ensure_ascii=False, indent=2)) return 0 # ---- 批量模式 ---- dir_path = Path(args.dir) if not dir_path.is_dir(): print(f"[错误] 目录不存在: {dir_path}") return 1 pdf_files = sorted(dir_path.glob("*.pdf")) if not pdf_files: print(f"[错误] 目录下无 PDF 文件: {dir_path}") return 1 # 批量输出目录 base_output = Path(args.output_dir) if args.output_dir else TEST_DIR / "temp" / "test_ocr_effectiveness" catalog_dir = base_output / "catalog" detection_img_dir = base_output / "detection" table_img_dir = base_output / "table" figure_img_dir = base_output / "figure" results_dir = base_output / "results" for d in [catalog_dir, detection_img_dir, table_img_dir, figure_img_dir, results_dir]: d.mkdir(parents=True, exist_ok=True) batch_timestamp = time.strftime("%Y%m%d_%H%M%S") print(f"\n[批量测试] 找到 {len(pdf_files)} 个 PDF 文件") print(f"[批量测试] 目录: {dir_path}\n") batch_results: List[Dict] = [] for idx, pdf_path in enumerate(pdf_files, 1): print(f"[{idx}/{len(pdf_files)}] {pdf_path.name} ...", flush=True) try: # 目录提取(默认执行) cat = _test_catalog_ocr(pdf_path, tester, catalog_dir, detection_img_dir, batch_timestamp) cat_chap = cat.get("total_chapters", 0) cat_ocr_len = cat.get("raw_ocr_length", 0) file_result = { "file": pdf_path.name, "catalog_status": cat.get("extract_status"), "catalog_chapters": cat_chap, "raw_ocr_length": cat_ocr_len, } # 版面检测 + OCR(仅 --detection 时) if args.detection: det = tester.test_detection(pdf_path, pages=pages, save_images_dir=detection_img_dir) _save_ocr_region_images(pdf_path, det, table_img_dir, figure_img_dir, tester) ocr = tester.test_ocr_recognition(pdf_path, pages=pages) file_result.update({ "pages": det.get("total_pages", 0), "table_count": det.get("table_count", 0), "figure_count": det.get("figure_count", 0), "ocr_success_rate": ocr.get("success_rate"), "ocr_content_rate": ocr.get("content_rate"), "ocr_avg_latency": ocr.get("latency_ms_avg"), }) batch_results.append(file_result) print(f" → catalog={cat_chap}章, raw_ocr={cat_ocr_len}字符", flush=True) except Exception as e: print(f" → 失败: {e}", flush=True) batch_results.append({"file": pdf_path.name, "error": str(e)}) # 保存批量汇总 JSON batch_json_path = results_dir / f"batch_{batch_timestamp}_summary.json" with open(batch_json_path, "w", encoding="utf-8") as f: json.dump(batch_results, f, ensure_ascii=False, indent=2) print(f"\n [保存] 批量汇总 → {batch_json_path}") # 批量汇总报告 valid = [r for r in batch_results if "error" not in r] errors = [r for r in batch_results if "error" in r] if not args.json: print("\n" + "=" * 90) print(" 批量测试汇总报告") print("=" * 90) print(f" 文件数: {len(batch_results)} (成功={len(valid)}, 失败={len(errors)})") if valid: total_chapters = sum(r.get("catalog_chapters", 0) for r in valid) total_ocr_len = sum(r.get("raw_ocr_length", 0) for r in valid) print(f"\n 目录提取统计:") print(f" 总章数: {total_chapters}") print(f" 总OCR字符数: {total_ocr_len}") print(f"\n 逐文件:") print(f" {'文件':40s} {'章数':>6s} {'OCR字符':>8s}") print(f" {'-'*55}") for r in valid: name = r["file"][:38] + ".." if len(r["file"]) > 38 else r["file"] print(f" {name:40s} {r.get('catalog_chapters', 0):5d} {r.get('raw_ocr_length', 0):7d}") # --detection 时额外输出检测统计 if valid[0].get("table_count") is not None: total_tables = sum(r["table_count"] for r in valid) total_figures = sum(r["figure_count"] for r in valid) ocr_rates = [r["ocr_success_rate"] for r in valid if r["ocr_success_rate"] is not None] ocr_latencies = [r["ocr_avg_latency"] for r in valid if r["ocr_avg_latency"] is not None] print(f"\n 版面检测统计:") print(f" 总表格数: {total_tables}") print(f" 总图片数: {total_figures}") if ocr_rates: print(f" OCR成功率: avg={statistics.mean(ocr_rates):.1f}%") if ocr_latencies: print(f" OCR延迟(ms): avg={statistics.mean(ocr_latencies):.0f}") if errors: print(f"\n 失败文件:") for e in errors: print(f" - {e['file']}: {e.get('error', '')}") print() if args.json: print(json.dumps(batch_results, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": sys.exit(main())