Переглянути джерело

feat: 创建独立OCR/目录稳定性测试套件,与系统解耦

- 新增 utils_test/minimal_pipeline/ 自包含测试套件,不依赖 core/foundation
- 添加测试脚本: test_ocr_effectiveness.py(支持目录提取/稳定性/检测)
- 添加解耦模块: _ocr_processor.py, _toc_detector.py, _simple_logger.py
- 优化 toc_detector: max_tokens 2048→1024, 添加 seed=42 稳定输出
- 清理旧的测试脚本(chunk_assembler/classifier/models 等)
- 附带 YOLO 模型 best.pt

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WangXuMing 5 днів тому
батько
коміт
83c600c4e2

+ 3 - 2
core/construction_review/component/minimal_pipeline/toc_detector.py

@@ -276,8 +276,9 @@ class TOCCatalogExtractor:
                             ]
                         }
                     ],
-                    "max_tokens": 2048,
-                    "temperature": 0.1
+                    "max_tokens": 1024,
+                    "temperature": 0.1,
+                    "seed": 42
                 }
 
                 headers = {"Content-Type": "application/json"}

+ 7 - 12
utils_test/minimal_pipeline/__init__.py

@@ -1,14 +1,9 @@
-"""
-独立最小化文档处理管线
+"""OCR / 目录识别测试模块(解耦版,不依赖 core/foundation)"""
 
-功能:PDF 结构提取 → 目录识别 → 文档切分 → 分类器(一/二/三级)
-特点:
-- 不依赖 core.* / foundation.* 代码
-- 只使用标准库 + PyMuPDF + openai
-- 可直接运行:python run.py -p xxx.pdf
-"""
+from ._ocr_processor import OcrProcessor, TableRegion, OcrResult, RAPID_LAYOUT_AVAILABLE
+from ._toc_detector import TOCCatalogExtractor
 
-from .pipeline import MinimalPipeline
-from .models import PipelineResult, ClassificationItem, ChunkItem
-
-__all__ = ["MinimalPipeline", "PipelineResult", "ClassificationItem", "ChunkItem"]
+__all__ = [
+    "OcrProcessor", "TableRegion", "OcrResult", "RAPID_LAYOUT_AVAILABLE",
+    "TOCCatalogExtractor",
+]

+ 6 - 0
utils_test/minimal_pipeline/_html_to_md.py

@@ -0,0 +1,6 @@
+"""简易 HTML 转 Markdown 存根,仅透传,无外部依赖。"""
+
+
+def convert_html_to_markdown(html_content: str) -> str:
+    """透传,不执行实际转换(测试场景下 HTML 内容极少)。"""
+    return html_content

+ 485 - 0
utils_test/minimal_pipeline/_ocr_processor.py

@@ -0,0 +1,485 @@
+"""
+OCR 处理模块 - 表格检测与识别
+
+提供 PDF 表格区域检测和 OCR 识别功能,支持:
+- RapidLayout 表格区域检测
+- GLM-OCR 并发识别
+- 表格文本替换回填
+"""
+
+import base64
+import io
+import time
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from dataclasses import dataclass
+from typing import Dict, Any, List, Optional, Tuple, Set
+
+import fitz
+import numpy as np
+import requests
+
+from utils_test.minimal_pipeline._simple_logger import review_logger as logger
+
+# 尝试导入 RapidLayout
+try:
+    from rapid_layout import RapidLayout
+    RAPID_LAYOUT_AVAILABLE = True
+except ImportError:
+    RAPID_LAYOUT_AVAILABLE = False
+    RapidLayout = None
+
+
+@dataclass
+class TableRegion:
+    """表格区域信息"""
+    page_num: int
+    page: fitz.Page
+    bbox: Tuple[float, float, float, float]
+    score: float
+    label: str = "table"  # YOLO 原始标签: table / figure
+
+
+@dataclass
+class OcrResult:
+    """OCR 结果"""
+    page_num: int
+    bbox: Tuple[float, float, float, float]
+    score: float
+    text: str
+    success: bool
+
+
+class OcrProcessor:
+    """OCR 处理器:表格检测与识别"""
+
+    # 默认配置
+    MAX_SHORT_EDGE = 1024
+    JPEG_QUALITY = 90
+    OCR_DPI = 200
+    OCR_CONFIDENCE_THRESHOLD = 0.5
+    OCR_CONCURRENT_WORKERS = 20
+
+    def __init__(
+        self,
+        ocr_api_url: str = "http://183.220.37.46:25429/v1/chat/completions",
+        ocr_timeout: int = 600,
+        ocr_api_key: str = "",
+        max_short_edge: int = 1024,
+        jpeg_quality: int = 90,
+        ocr_dpi: int = 200,
+        confidence_threshold: float = 0.5,
+        concurrent_workers: int = 20,
+    ):
+        """
+        初始化 OCR 处理器
+
+        Args:
+            ocr_api_url: OCR API 地址
+            ocr_timeout: OCR 请求超时时间(秒)
+            ocr_api_key: OCR API 密钥
+            max_short_edge: 图片压缩后短边最大尺寸
+            jpeg_quality: JPEG 压缩质量
+            ocr_dpi: OCR 渲染 DPI
+            confidence_threshold: 表格检测置信度阈值
+            concurrent_workers: OCR 并发工作线程数
+        """
+        self.ocr_api_url = ocr_api_url
+        self.ocr_timeout = ocr_timeout
+        self.ocr_api_key = ocr_api_key
+        self.max_short_edge = max_short_edge
+        self.jpeg_quality = jpeg_quality
+        self.ocr_dpi = ocr_dpi
+        self.confidence_threshold = confidence_threshold
+        self.concurrent_workers = concurrent_workers
+
+        self._layout_engine: Optional[Any] = None
+
+        if not RAPID_LAYOUT_AVAILABLE:
+            logger.warning("RapidLayout 未安装,表格检测功能不可用")
+
+    def is_available(self) -> bool:
+        """检查 OCR 功能是否可用"""
+        return RAPID_LAYOUT_AVAILABLE
+
+    def _get_layout_engine(self) -> Optional[Any]:
+        """延迟初始化 RapidLayout"""
+        if self._layout_engine is None and RAPID_LAYOUT_AVAILABLE:
+            self._layout_engine = RapidLayout()
+        return self._layout_engine
+
+    def detect_table_regions(
+        self,
+        page: fitz.Page,
+        page_num: int,
+        clip_box: fitz.Rect
+    ) -> List[Tuple[Tuple[float, float, float, float], float]]:
+        """
+        检测页面中的表格区域
+
+        Args:
+            page: PDF 页面对象
+            page_num: 页码(用于日志)
+            clip_box: 裁剪区域
+
+        Returns:
+            列表,元素为 ((x1, y1, x2, y2), score)
+        """
+        table_regions: List[Tuple[Tuple[float, float, float, float], float, str]] = []
+
+        if not RAPID_LAYOUT_AVAILABLE:
+            return table_regions
+
+        layout_engine = self._get_layout_engine()
+        if layout_engine is None:
+            return table_regions
+
+        # 渲染页面(裁剪区域)
+        pix = page.get_pixmap(dpi=self.ocr_dpi, clip=clip_box)
+        img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, 3)
+
+        try:
+            layout_output = layout_engine(img)
+
+            # 解析版面结果
+            if hasattr(layout_output, 'boxes') and hasattr(layout_output, 'class_names'):
+                # 获取缩放比例
+                scale_x = clip_box.width / img.shape[1]
+                scale_y = clip_box.height / img.shape[0]
+
+                table_count = 0
+                figure_count = 0
+                for box, label, score in zip(layout_output.boxes, layout_output.class_names, layout_output.scores):
+                    if label in ("table", "figure") and score > self.confidence_threshold:
+                        # 转换为 PDF 坐标
+                        pdf_x1 = clip_box.x0 + box[0] * scale_x
+                        pdf_y1 = clip_box.y0 + box[1] * scale_y
+                        pdf_x2 = clip_box.x0 + box[2] * scale_x
+                        pdf_y2 = clip_box.y0 + box[3] * scale_y
+
+                        table_regions.append(((pdf_x1, pdf_y1, pdf_x2, pdf_y2), score, label))
+                        if label == "table":
+                            table_count += 1
+                        else:
+                            figure_count += 1
+
+                if table_count or figure_count:
+                    logger.info(f"  [YOLO] 第{page_num}页: table={table_count}, figure={figure_count}")
+
+        except Exception as e:
+            logger.warning(f"  第 {page_num} 页: 版面分析失败 ({e})")
+
+        return table_regions
+
+    def process_ocr_concurrent(
+        self,
+        regions: List[TableRegion],
+        progress_callback=None
+    ) -> List[OcrResult]:
+        """
+        同步并发处理 OCR
+
+        Args:
+            regions: 表格区域列表
+            progress_callback: 进度回调函数,接收 (completed, total) 参数
+
+        Returns:
+            OCR 结果列表
+        """
+        results: List[OcrResult] = []
+        total = len(regions)
+        completed = 0
+
+        # 统计
+        table_total = sum(1 for r in regions if r.label == "table")
+        figure_total = sum(1 for r in regions if r.label == "figure")
+        logger.info(f"[OCR] 开始并发识别: table={table_total}, figure={figure_total}, workers={self.concurrent_workers}")
+
+        with ThreadPoolExecutor(max_workers=self.concurrent_workers) as executor:
+            # 提交所有任务
+            future_to_region = {
+                executor.submit(self._ocr_table_region, r.page, r.bbox): r
+                for r in regions
+            }
+
+            # 处理完成的结果
+            non_table_count = 0
+            table_ok_count = 0
+            for future in as_completed(future_to_region):
+                region = future_to_region[future]
+                completed += 1
+                try:
+                    text = future.result()
+                    if text.strip():
+                        table_ok_count += 1
+                    else:
+                        non_table_count += 1
+                    results.append(OcrResult(
+                        page_num=region.page_num,
+                        bbox=region.bbox,
+                        score=region.score,
+                        text=text,
+                        success=True,
+                    ))
+                except Exception as e:
+                    non_table_count += 1
+                    logger.error(f"  第 {region.page_num} 页 {region.label} OCR 失败: {e}")
+                    results.append(OcrResult(
+                        page_num=region.page_num,
+                        bbox=region.bbox,
+                        score=region.score,
+                        text="",
+                        success=False,
+                    ))
+
+                # 每完成5个或最后一个时推送进度
+                if progress_callback and (completed % 5 == 0 or completed == total):
+                    progress_callback(completed, total)
+
+        logger.info(f"[OCR] 完成: table={table_total}, figure={figure_total}, "
+                     f"有效表格={table_ok_count}, Non-table/失败={non_table_count}")
+        return results
+
+    def _ocr_table_region(
+        self,
+        page: fitz.Page,
+        bbox: Tuple[float, float, float, float],
+        max_retries: int = 3
+    ) -> str:
+        """
+        对指定区域进行 OCR 识别(使用 GLM-OCR),支持指数退避重试
+
+        Args:
+            page: PDF 页面对象
+            bbox: 区域坐标 (x1, y1, x2, y2)
+            max_retries: 最大重试次数
+
+        Returns:
+            识别的文本内容
+        """
+        # 渲染指定区域
+        rect = fitz.Rect(bbox)
+        pix = page.get_pixmap(dpi=self.ocr_dpi, clip=rect)
+        img_bytes = pix.tobytes("jpeg")
+
+        # 压缩图片
+        compressed = self._compress_image(img_bytes)
+        img_base64 = base64.b64encode(compressed).decode('utf-8')
+
+        # 请求 OCR
+        payload = {
+            "model": "GLM-OCR",
+            "messages": [
+                {
+                    "role": "user",
+                    "content": [
+                        {
+                            "type": "text",
+                            "text": "判断图片中是否包含表格。"
+                                    "- 若包含表格:用 Markdown 表格格式提取内容,保持行列对齐。"
+                                    "- 若不包含任何表格:只输出 Non-table。"
+                                    "只输出结果,不要解释。"
+                        },
+                        {
+                            "type": "image_url",
+                            "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}
+                        }
+                    ]
+                }
+            ],
+            "max_tokens": 2048,
+            "temperature": 0.1
+        }
+
+        headers = {"Content-Type": "application/json"}
+        if self.ocr_api_key:
+            headers["Authorization"] = f"Bearer {self.ocr_api_key}"
+
+        # 指数退避重试
+        last_error = None
+        for attempt in range(max_retries):
+            try:
+                response = requests.post(
+                    self.ocr_api_url,
+                    headers=headers,
+                    json=payload,
+                    timeout=self.ocr_timeout
+                )
+                response.raise_for_status()
+
+                result = response.json()
+                return self._extract_ocr_content(result)
+
+            except Exception as e:
+                last_error = e
+                if attempt < max_retries - 1:
+                    # 指数退避: 2, 4, 8 秒
+                    wait_time = 2 ** (attempt + 1)
+                    logger.warning(f"  第 {page.number + 1} 页表格 OCR 第 {attempt + 1} 次失败: {e}, {wait_time}秒后重试...")
+                    time.sleep(wait_time)
+                else:
+                    logger.error(f"  第 {page.number + 1} 页表格 OCR 最终失败(已重试{max_retries}次): {e}")
+
+        # 所有重试都失败,抛出最后一个错误
+        raise last_error
+
+    def _compress_image(self, img_bytes: bytes) -> bytes:
+        """
+        压缩图片
+
+        Args:
+            img_bytes: 原始图片字节
+
+        Returns:
+            压缩后的图片字节
+        """
+        try:
+            from PIL import Image
+            img = Image.open(io.BytesIO(img_bytes))
+
+            if img.mode in ('RGBA', 'LA', 'P'):
+                background = Image.new('RGB', img.size, (255, 255, 255))
+                if img.mode == 'P':
+                    img = img.convert('RGBA')
+                if img.mode in ('RGBA', 'LA'):
+                    background.paste(img, mask=img.split()[-1])
+                img = background
+            elif img.mode != 'RGB':
+                img = img.convert('RGB')
+
+            min_edge = min(img.size)
+            if min_edge > self.max_short_edge:
+                ratio = self.max_short_edge / min_edge
+                new_size = (int(img.width * ratio), int(img.height * ratio))
+                img = img.resize(new_size, Image.Resampling.LANCZOS)
+
+            buffer = io.BytesIO()
+            img.save(buffer, format='JPEG', quality=self.jpeg_quality, optimize=True)
+            return buffer.getvalue()
+
+        except Exception as e:
+            logger.warning(f"图片压缩失败,使用原图: {e}")
+            return img_bytes
+
+    def _extract_ocr_content(self, result: Dict) -> str:
+        """
+        从 OCR 响应提取内容,并将 HTML 表格转换为 Markdown
+
+        Args:
+            result: OCR API 响应
+
+        Returns:
+            提取的文本内容
+        """
+        content = ""
+        if "choices" in result and isinstance(result["choices"], list):
+            if len(result["choices"]) > 0:
+                message = result["choices"][0].get("message", {})
+                content = message.get("content", "")
+
+        # GLM 判定为非表格区域,返回空字符串,下游自然跳过
+        if content and content.strip().startswith("Non-table"):
+            return ""
+
+        # 如果内容包含 HTML 标签,转换为 Markdown
+        if content and "<" in content and ">" in content:
+            try:
+                from utils_test.minimal_pipeline._html_to_md import convert_html_to_markdown
+                content = convert_html_to_markdown(content)
+            except Exception as e:
+                logger.debug(f"HTML 转 Markdown 失败,保留原始内容: {e}")
+
+        return content
+
+    def replace_table_regions(
+        self,
+        page: fitz.Page,
+        original_text: str,
+        ocr_results: List[Dict],
+        clip_box: fitz.Rect
+    ) -> str:
+        """
+        用 OCR 结果替换原始文本中的表格区域
+
+        Args:
+            page: PDF 页面对象
+            original_text: 原始文本
+            ocr_results: OCR 结果列表,每个元素包含 region_index, bbox, score, ocr_text
+            clip_box: 裁剪区域
+
+        Returns:
+            替换后的文本
+        """
+        if not ocr_results:
+            return original_text
+
+        # 获取页面上的文本块及其坐标
+        text_blocks = []
+        for block in page.get_text("blocks"):
+            x0, y0, x1, y1, text, _, _ = block
+            # 只考虑裁剪区域内的文本
+            if y0 >= clip_box.y0 and y1 <= clip_box.y1:
+                text_blocks.append({
+                    "bbox": (x0, y0, x1, y1),
+                    "text": text.strip(),
+                })
+
+        # 按 Y 坐标排序
+        text_blocks.sort(key=lambda b: (b["bbox"][1], b["bbox"][0]))
+
+        # 找出属于表格区域的文本块
+        replaced_indices: Set[int] = set()
+        for ocr_result in ocr_results:
+            bbox = ocr_result["bbox"]
+            rx0, ry0, rx1, ry1 = bbox
+
+            for idx, block in enumerate(text_blocks):
+                if idx in replaced_indices:
+                    continue
+                bx0, by0, bx1, by1 = block["bbox"]
+
+                # 检查重叠
+                overlap_x = max(0, min(bx1, rx1) - max(bx0, rx0))
+                overlap_y = max(0, min(by1, ry1) - max(by0, ry0))
+                overlap_area = overlap_x * overlap_y
+                block_area = (bx1 - bx0) * (by1 - by0)
+
+                if block_area > 0 and overlap_area / block_area > 0.5:
+                    replaced_indices.add(idx)
+
+        # 构建新文本
+        result_parts: List[str] = []
+        last_idx = 0
+
+        for ocr_result in sorted(ocr_results, key=lambda r: r["bbox"][1]):
+            bbox = ocr_result["bbox"]
+            rx0, ry0, rx1, ry1 = bbox
+
+            # 找到该表格区域之前的文本
+            region_start_idx = None
+            for idx, block in enumerate(text_blocks):
+                if idx in replaced_indices:
+                    bx0, by0, bx1, by1 = block["bbox"]
+                    if (bx0 >= rx0 - 5 and bx1 <= rx1 + 5 and
+                        by0 >= ry0 - 5 and by1 <= ry1 + 5):
+                        if region_start_idx is None:
+                            region_start_idx = idx
+                        last_idx = idx + 1
+
+            if region_start_idx is not None:
+                # 添加表格前的非表格文本
+                for idx in range(last_idx - (last_idx - region_start_idx), region_start_idx):
+                    if idx not in replaced_indices and idx < len(text_blocks):
+                        result_parts.append(text_blocks[idx]["text"])
+                        result_parts.append("\n")
+
+                # 添加 OCR 结果
+                result_parts.append(ocr_result["ocr_text"])
+                result_parts.append("\n")
+
+        # 添加剩余文本
+        for idx in range(last_idx, len(text_blocks)):
+            if idx not in replaced_indices:
+                result_parts.append(text_blocks[idx]["text"])
+                result_parts.append("\n")
+
+        return "".join(result_parts).strip() or original_text

+ 18 - 0
utils_test/minimal_pipeline/_simple_logger.py

@@ -0,0 +1,18 @@
+"""简易日志模块,替代 foundation.observability.logger.loggering,无外部依赖。"""
+
+import logging
+import sys
+
+# 创建一个简易的 review_logger,输出到 stderr,可被测试脚本静默
+_review_logger = logging.getLogger("review_simple")
+_review_logger.setLevel(logging.WARNING)  # 默认只输出 WARNING 及以上
+
+if not _review_logger.handlers:
+    _handler = logging.StreamHandler(sys.stderr)
+    _handler.setFormatter(logging.Formatter(
+        "%(asctime)s | %(levelname)-8s | %(message)s",
+        datefmt="%H:%M:%S",
+    ))
+    _review_logger.addHandler(_handler)
+
+review_logger = _review_logger

+ 620 - 0
utils_test/minimal_pipeline/_toc_detector.py

@@ -0,0 +1,620 @@
+"""
+YOLO 目录页检测与 OCR 提取模块
+
+用于在文档处理流程早期检测目录页并提取目录内容,
+输出结构与 outline 保持一致,便于后续进行目录完整性检查。
+"""
+
+import io
+import os
+import re
+from dataclasses import dataclass
+from typing import Dict, Any, List, Optional, Tuple
+from pathlib import Path
+
+import fitz
+import numpy as np
+
+from utils_test.minimal_pipeline._simple_logger import review_logger as logger
+
+from ultralytics import YOLO
+from PIL import Image
+
+
+@dataclass
+class CatalogItem:
+    """目录项结构"""
+    index: int           # 章节序号(1-based)
+    title: str           # 章节标题
+    page: str            # 页码(字符串)
+    original: str        # 原始文本
+    level: int = 1       # 层级(1=章,2=节)
+    parent_title: str = ""  # 父章节标题(用于二级)
+
+
+@dataclass
+class CatalogSection:
+    """目录节结构(对应二级目录)"""
+    title: str
+    page: str
+    level: int
+    original: str
+
+
+@dataclass
+class CatalogChapter:
+    """目录章结构(对应一级目录)"""
+    index: int
+    title: str
+    page: str
+    original: str
+    subsections: List[CatalogSection]
+
+
+class TOCCatalogExtractor:
+    """
+    目录页检测与内容提取器
+
+    使用 YOLO 模型检测目录页,使用 GLM-OCR 提取目录文本,
+    解析为结构化数据,输出格式与 outline 保持一致。
+    """
+
+    # YOLO 配置
+    DEFAULT_MODEL_PATH = "best.pt"  # 本地副本
+    CONF_THRESHOLD = 0.25
+    MAX_CHECK_PAGES = 50
+    DPI = 150
+
+    # OCR 配置(目录页使用更低DPI避免请求过大)
+    OCR_DPI = 150
+    MAX_SHORT_EDGE = 800
+    JPEG_QUALITY = 85
+    MAX_IMAGE_SIZE_MB = 5
+
+    def __init__(
+        self,
+        model_path: str = None,
+        ocr_api_url: str = "http://183.220.37.46:25429/v1/chat/completions",
+        ocr_api_key: str = "",
+        ocr_timeout: int = 600,
+    ):
+        self.model_path = model_path or self.DEFAULT_MODEL_PATH
+        self.ocr_api_url = ocr_api_url
+        self.ocr_api_key = ocr_api_key
+        self.ocr_timeout = ocr_timeout
+
+        self._model = None
+
+    def _load_model(self) -> bool:
+        """加载 YOLO 模型,缺少依赖或模型文件直接报错"""
+        if not os.path.exists(self.model_path):
+            raise FileNotFoundError(f"[TOC检测] YOLO模型文件不存在: {self.model_path}")
+
+        if self._model is None:
+            logger.info(f"[TOC检测] 正在加载YOLO模型: {self.model_path}")
+            self._model = YOLO(self.model_path)
+        return True
+
+    def detect_and_extract(
+        self,
+        file_content: bytes,
+        progress_callback=None
+    ) -> Optional[Dict[str, Any]]:
+        """
+        检测目录页并提取目录内容
+
+        Args:
+            file_content: PDF文件字节流
+            progress_callback: 进度回调函数
+
+        Returns:
+            目录结构字典,格式与 outline 保持一致:
+            {
+                "chapters": [...],
+                "total_chapters": N
+            }
+        """
+        if not self._load_model():
+            return None
+
+        doc = fitz.open(stream=file_content)
+        try:
+            # 1. 检测目录页范围
+            toc_pages = self._detect_toc_pages(doc, progress_callback)
+            if not toc_pages:
+                logger.info("[TOC检测] 未检测到目录页")
+                return None
+
+            logger.info(f"[TOC检测] 检测到目录页: 第{toc_pages[0]+1}页 - 第{toc_pages[-1]+1}页")
+
+            # 2. OCR 提取目录页内容
+            if progress_callback:
+                progress_callback("目录识别", 10, f"检测到{len(toc_pages)}页目录,开始OCR识别...")
+
+            toc_text = self._ocr_toc_pages(doc, toc_pages, progress_callback)
+
+            if not toc_text:
+                return None
+
+            # 3. 解析目录文本为结构化数据
+            if progress_callback:
+                progress_callback("目录识别", 80, "解析目录结构...")
+
+            catalog = self._parse_toc_text(toc_text)
+
+            # 添加目录页页码范围(1-based)
+            if toc_pages:
+                catalog["toc_page_range"] = {
+                    "start": toc_pages[0] + 1,  # 转换为1-based页码
+                    "end": toc_pages[-1] + 1
+                }
+
+            if progress_callback:
+                progress_callback("目录识别", 100, f"目录提取完成,共{catalog['total_chapters']}章")
+
+            return catalog
+
+        finally:
+            doc.close()
+
+    def _detect_toc_pages(
+        self,
+        doc: fitz.Document,
+        progress_callback=None
+    ) -> List[int]:
+        """
+        使用 YOLO 检测目录页范围
+
+        Returns:
+            目录页索引列表(0-based)
+        """
+        toc_pages = []
+        total_pages = len(doc)
+        pages_to_check = min(total_pages, self.MAX_CHECK_PAGES)
+
+        for page_idx in range(pages_to_check):
+            page = doc.load_page(page_idx)
+
+            # 渲染页面
+            zoom = self.DPI / 72
+            mat = fitz.Matrix(zoom, zoom)
+            pix = page.get_pixmap(matrix=mat)
+
+            # 转换为 numpy 数组
+            img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
+            img_array = np.array(img)
+
+            # YOLO 检测
+            results = self._model(img_array, conf=self.CONF_THRESHOLD, verbose=False)
+
+            # 检查是否检测到 catalogs 类别
+            has_catalogs = False
+            for result in results:
+                if result.boxes is not None:
+                    for box in result.boxes:
+                        cls_id = int(box.cls.item())
+                        class_name = self._model.names.get(cls_id, f"class_{cls_id}")
+                        if class_name == 'catalogs':
+                            has_catalogs = True
+                            break
+                if has_catalogs:
+                    break
+
+            if has_catalogs:
+                toc_pages.append(page_idx)
+                logger.debug(f"  第{page_idx + 1:3d}页: 检测到目录")
+            else:
+                logger.debug(f"  第{page_idx + 1:3d}页: 未检测到目录")
+                # 如果已经检测到目录,且现在没有检测到,认为目录结束
+                if toc_pages:
+                    break
+
+            if progress_callback and (page_idx + 1) % 5 == 0:
+                progress = int((page_idx + 1) / pages_to_check * 10)
+                progress_callback("目录识别", progress, f"扫描页面 {page_idx + 1}/{pages_to_check}")
+
+        return toc_pages
+
+    def _ocr_toc_pages(
+        self,
+        doc: fitz.Document,
+        toc_pages: List[int],
+        progress_callback=None
+    ) -> str:
+        """
+        对目录页进行 OCR 识别
+
+        Returns:
+            合并后的目录文本
+        """
+        import base64
+        import io
+        import requests
+        import time
+
+        all_texts = []
+        total = len(toc_pages)
+
+        for idx, page_idx in enumerate(toc_pages):
+            page = doc.load_page(page_idx)
+
+            try:
+                # 渲染页面(使用较低DPI避免图片过大)
+                pix = page.get_pixmap(dpi=self.OCR_DPI)
+                img_bytes = pix.tobytes("jpeg")
+
+                # 压缩图片
+                compressed = self._compress_image(img_bytes)
+                img_size_mb = len(compressed) / (1024 * 1024)
+                logger.debug(f"  第{page_idx + 1}页图片大小: {img_size_mb:.2f}MB")
+
+                # 检查图片大小
+                if img_size_mb > self.MAX_IMAGE_SIZE_MB:
+                    logger.warning(f"  第{page_idx + 1}页图片过大({img_size_mb:.2f}MB),尝试进一步压缩")
+                    # 再次压缩
+                    compressed = self._compress_image(compressed, force_smaller=True)
+                    img_size_mb = len(compressed) / (1024 * 1024)
+                    logger.debug(f"  压缩后大小: {img_size_mb:.2f}MB")
+
+                img_base64 = base64.b64encode(compressed).decode('utf-8')
+
+                # 请求 OCR
+                payload = {
+                    "model": "GLM-OCR",
+                    "messages": [
+                        {
+                            "role": "user",
+                            "content": [
+                                {
+                                    "type": "text",
+                                    "text": "识别目录内容,按原文格式输出。保留章节层级和页码。"
+                                },
+                                {
+                                    "type": "image_url",
+                                    "image_url": {"url": f"data:image/jpeg;base64,{img_base64}"}
+                                }
+                            ]
+                        }
+                    ],
+                    "max_tokens": 1024,       # 2048 -> 1024,目录页486 tokens够用
+                    "temperature": 0.1,
+                    "seed": 42                # 固定采样随机性
+                }
+
+                headers = {"Content-Type": "application/json"}
+                if self.ocr_api_key:
+                    headers["Authorization"] = f"Bearer {self.ocr_api_key}"
+
+                # 指数退避重试
+                max_retries = 3
+                for attempt in range(max_retries):
+                    try:
+                        response = requests.post(
+                            self.ocr_api_url,
+                            headers=headers,
+                            json=payload,
+                            timeout=self.ocr_timeout
+                        )
+
+                        # 记录响应状态
+                        if response.status_code != 200:
+                            logger.error(f"  第{page_idx + 1}页OCR请求失败: HTTP {response.status_code}, 响应: {response.text[:200]}")
+                            response.raise_for_status()
+
+                        result = response.json()
+
+                        content = ""
+                        if "choices" in result and result["choices"]:
+                            content = result["choices"][0].get("message", {}).get("content", "")
+
+                        if content:
+                            all_texts.append(content)
+                            logger.info(f"  第{page_idx + 1}页目录OCR成功")
+                        break
+
+                    except requests.exceptions.HTTPError as e:
+                        if response.status_code == 400:
+                            logger.error(f"  第{page_idx + 1}页OCR请求格式错误(400),可能是图片过大")
+                            break  # 400错误不需要重试
+                        if attempt < max_retries - 1:
+                            wait_time = 2 ** (attempt + 1)
+                            logger.warning(f"  第{page_idx + 1}页目录OCR失败,{wait_time}秒后重试...")
+                            time.sleep(wait_time)
+                        else:
+                            logger.error(f"  第{page_idx + 1}页目录OCR最终失败: {e}")
+                    except Exception as e:
+                        if attempt < max_retries - 1:
+                            wait_time = 2 ** (attempt + 1)
+                            logger.warning(f"  第{page_idx + 1}页目录OCR失败,{wait_time}秒后重试...")
+                            time.sleep(wait_time)
+                        else:
+                            logger.error(f"  第{page_idx + 1}页目录OCR最终失败: {e}")
+
+                if progress_callback:
+                    progress = 10 + int((idx + 1) / total * 60)
+                    progress_callback("目录识别", progress, f"OCR识别中 {idx + 1}/{total}")
+
+            except Exception as e:
+                logger.error(f"  第{page_idx + 1}页OCR处理出错: {e}")
+
+        return "\n".join(all_texts)
+
+    def _compress_image(self, img_bytes: bytes, force_smaller: bool = False) -> bytes:
+        """
+        压缩图片
+
+        Args:
+            img_bytes: 图片字节
+            force_smaller: 是否强制更小的尺寸(用于处理过大的图片)
+        """
+        try:
+            img = Image.open(io.BytesIO(img_bytes))
+
+            if img.mode in ('RGBA', 'LA', 'P'):
+                background = Image.new('RGB', img.size, (255, 255, 255))
+                if img.mode == 'P':
+                    img = img.convert('RGBA')
+                if img.mode in ('RGBA', 'LA'):
+                    background.paste(img, mask=img.split()[-1])
+                img = background
+            elif img.mode != 'RGB':
+                img = img.convert('RGB')
+
+            # 计算目标尺寸
+            max_edge = self.MAX_SHORT_EDGE
+            if force_smaller:
+                max_edge = 640  # 强制小尺寸
+
+            min_edge = min(img.size)
+            if min_edge > max_edge:
+                ratio = max_edge / min_edge
+                new_size = (int(img.width * ratio), int(img.height * ratio))
+                img = img.resize(new_size, Image.Resampling.LANCZOS)
+
+            buffer = io.BytesIO()
+            quality = self.JPEG_QUALITY if not force_smaller else 75
+            img.save(buffer, format='JPEG', quality=quality, optimize=True)
+            return buffer.getvalue()
+
+        except Exception as e:
+            logger.warning(f"[TOC检测] 图片压缩失败,使用原图: {e}")
+            return img_bytes
+
+    def _parse_toc_text(self, text: str) -> Dict[str, Any]:
+        """
+        解析目录文本为结构化数据,输出标准格式
+
+        标准格式:
+        第X章 XXX
+        一、XXX
+        二、XXX
+
+        Returns:
+            {
+                "chapters": [...],
+                "total_chapters": N,
+                "raw_ocr_text": "原始OCR文本",
+                "formatted_text": "标准格式文本"
+            }
+        """
+        lines = text.strip().split('\n')
+        chapters = []
+        current_chapter = None
+
+        # 正则表达式模式
+        chapter_pattern = re.compile(
+            r'第\s*([一二三四五六七八九十百0-9]+)\s*章\s*[\s\.]*(.+?)\s*[\.\s]*(\d+)\s*$',
+            re.IGNORECASE
+        )
+        section_pattern = re.compile(
+            r'([一二三四五六七八九十]+)\s*[、\.\s]+\s*(.+?)\s*[\.\s]*(\d+)\s*$'
+        )
+        generic_pattern = re.compile(
+            r'([0-9]+)[\.\s]+(.+?)\s*[\.\s]+(\d+)\s*$'
+        )
+
+        # 中文数字映射
+        chinese_nums = {
+            '一': 1, '二': 2, '三': 3, '四': 4, '五': 5,
+            '六': 6, '七': 7, '八': 8, '九': 9, '十': 10,
+            '十一': 11, '十二': 12, '十三': 13, '十四': 14, '十五': 15
+        }
+
+        for line in lines:
+            line = line.strip()
+            if not line or len(line) < 3:
+                continue
+
+            # 移除 Markdown 表格符号
+            line = re.sub(r'^[\|\s]+|[\|\s]+$', '', line)
+            line = line.replace('|', ' ')
+
+            # 尝试匹配章
+            chapter_match = chapter_pattern.search(line)
+            if chapter_match:
+                chapter_num = chapter_match.group(1)
+                title = chapter_match.group(2).strip()
+                page = chapter_match.group(3).strip()
+
+                # 保存上一个章
+                if current_chapter:
+                    chapters.append(current_chapter)
+
+                # 标准化为阿拉伯数字
+                if chapter_num.isdigit():
+                    idx = int(chapter_num)
+                else:
+                    idx = chinese_nums.get(chapter_num, len(chapters) + 1)
+
+                # 从原始行提取完整标题(保留原文格式)
+                # 移除行尾页码,保留章节号+标题的原文形式
+                original_title = re.sub(r'[\.\s]*(\d+)\s*$', '', line).strip()
+
+                current_chapter = {
+                    "index": idx,
+                    "title": original_title,
+                    "page": page,
+                    "original": line,
+                    "subsections": []
+                }
+                continue
+
+            # 尝试匹配节(二级)- 标准化为一、二、三格式
+            section_match = section_pattern.search(line)
+            if section_match and current_chapter:
+                section_num = section_match.group(1)
+                title = section_match.group(2).strip()
+                page = section_match.group(3).strip()
+
+                # 标准化节编号
+                if section_num.isdigit():
+                    section_idx = int(section_num)
+                    section_cn = self._number_to_chinese(section_idx)
+                else:
+                    section_cn = section_num
+
+                current_chapter["subsections"].append({
+                    "title": title,
+                    "page": page,
+                    "level": 2,
+                    "original": line
+                })
+                continue
+
+            # 尝试通用匹配(数字开头)
+            generic_match = generic_pattern.search(line)
+            if generic_match and current_chapter:
+                title = generic_match.group(2).strip()
+                page = generic_match.group(3).strip()
+
+                # 判断是章还是节(根据内容特征)
+                if any(kw in title for kw in ['编制依据', '工程概况', '施工计划', '施工工艺',
+                                               '安全保证', '质量保证', '环境保证', '人员配备',
+                                               '验收要求']):
+                    chapters.append(current_chapter)
+                    idx = len(chapters) + 1
+                    # 保留原标题,只移除页码
+                    original_title = re.sub(r'[\.\s]*(\d+)\s*$', '', line).strip()
+                    current_chapter = {
+                        "index": idx,
+                        "title": original_title,
+                        "page": page,
+                        "original": line,
+                        "subsections": []
+                    }
+                else:
+                    # 作为节,保留原标题
+                    current_chapter["subsections"].append({
+                        "title": title,
+                        "page": page,
+                        "level": 2,
+                        "original": line
+                    })
+
+        # 添加最后一个章
+        if current_chapter:
+            chapters.append(current_chapter)
+
+        # 如果没有匹配到章,尝试按空行或缩进分割
+        if not chapters and lines:
+            chapters = self._fallback_parse(lines)
+
+        # 构建标准格式文本
+        formatted_lines = []
+        for ch in chapters:
+            formatted_lines.append(ch["title"])
+            for sub in ch.get("subsections", []):
+                formatted_lines.append(f"  {sub['title']}")
+
+        formatted_text = "\n".join(formatted_lines)
+
+        # 日志输出完整的目录解析结果
+        logger.info(f"[TOC解析] 共 {len(chapters)} 章,标准格式文本:\n{formatted_text}")
+
+        return {
+            "chapters": chapters,
+            "total_chapters": len(chapters),
+            "raw_ocr_text": text,
+            "formatted_text": formatted_text
+        }
+
+    def _fallback_parse(self, lines: List[str]) -> List[Dict[str, Any]]:
+        """
+        降级解析策略:当正则无法匹配时使用启发式方法
+        输出标准格式:第X章 XXX / 一、XXX
+        """
+        chapters = []
+        idx = 0
+        section_idx = 0
+
+        for line in lines:
+            line = line.strip()
+            if not line:
+                continue
+
+            # 检查是否包含页码(行尾数字)
+            page_match = re.search(r'(\d+)\s*$', line)
+            if not page_match:
+                continue
+
+            page = page_match.group(1)
+            title = re.sub(r'[\.\s]+\d+\s*$', '', line).strip()
+
+            # 根据内容特征判断层级
+            is_chapter = any(kw in title for kw in ['编制依据', '工程概况', '施工计划',
+                                                       '施工工艺', '安全保证', '质量保证',
+                                                       '环境保证', '人员配备', '验收',
+                                                       '其他资料'])
+
+            if is_chapter or len(chapters) == 0:
+                idx += 1
+                section_idx = 0  # 重置节计数
+                chapters.append({
+                    "index": idx,
+                    "title": title,
+                    "page": page,
+                    "original": line,
+                    "subsections": []
+                })
+            else:
+                # 作为上一章的节,保留原标题
+                if chapters:
+                    section_idx += 1
+                    chapters[-1]["subsections"].append({
+                        "title": title,
+                        "page": page,
+                        "level": 2,
+                        "original": line
+                    })
+
+        return chapters
+
+    def _number_to_chinese(self, num: int) -> str:
+        """阿拉伯数字转中文数字"""
+        chinese_nums = {
+            1: '一', 2: '二', 3: '三', 4: '四', 5: '五',
+            6: '六', 7: '七', 8: '八', 9: '九', 10: '十',
+            11: '十一', 12: '十二', 13: '十三', 14: '十四', 15: '十五'
+        }
+        return chinese_nums.get(num, str(num))
+
+
+def extract_catalog_from_pdf(
+    file_content: bytes,
+    model_path: str = None,
+    ocr_api_url: str = "http://183.220.37.46:25429/v1/chat/completions",
+    ocr_api_key: str = "",
+    progress_callback=None
+) -> Optional[Dict[str, Any]]:
+    """
+    便捷函数:从 PDF 提取目录结构
+
+    Returns:
+        {"chapters": [...], "total_chapters": N} 或 None
+    """
+    extractor = TOCCatalogExtractor(
+        model_path=model_path,
+        ocr_api_url=ocr_api_url,
+        ocr_api_key=ocr_api_key
+    )
+    return extractor.detect_and_extract(file_content, progress_callback)

BIN
utils_test/minimal_pipeline/best.pt


+ 1571 - 0
utils_test/minimal_pipeline/test_ocr_effectiveness.py

@@ -0,0 +1,1571 @@
+"""
+OCR 模型效果与稳定性测试脚本
+
+测试 GLM-OCR 模型在施工方案 PDF 上的表现:
+- 目录提取测试(默认): YOLO检测目录页 → GLM-OCR识别 → 规则解析 → 保存 catalog JSON
+- 版面检测(--detection): RapidLayout 表格/图片区域检测
+- OCR 识别(--detection): GLM-OCR 对表格区域的识别质量
+- 全链路测试(--detection --full-pipeline): 检测 → OCR → 文本回填
+- 稳定性测试(--detection --stability): 高并发下的错误率和延迟分布
+
+运行方式:
+    # 默认:仅目录提取测试(快速)
+    python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf
+
+    # 目录OCR稳定性测试(10次提取对比一致性)
+    python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf --catalog-stability
+
+    # 目录OCR稳定性测试(30次)
+    python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf --catalog-stability --catalog-iterations 30
+
+    # 目录提取 + 版面检测 + OCR识别
+    python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf --detection
+
+    # 目录+检测+全链路(检测→OCR→文本回填)
+    python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf --detection --full-pipeline
+
+    # 目录+检测+稳定性测试(20并发,50次调用)
+    python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p utils_test/minimal_pipeline/完整性1.pdf --detection --stability --concurrency 20 --iterations 50
+
+    # 批量测试
+    python utils_test/minimal_pipeline/test_ocr_effectiveness.py -d <pdf_dir>
+    python utils_test/minimal_pipeline/test_ocr_effectiveness.py -d <pdf_dir> --detection
+
+输出目录: utils_test/minimal_pipeline/temp/test_ocr_effectiveness/
+  ├── catalog/      目录提取结果(每次带时间戳)
+  ├── detection/    版面检测全页标注图
+  ├── table/        表格区域截图+OCR文本
+  ├── figure/       图片区域截图+OCR文本
+  └── results/      JSON 汇总结果
+"""
+
+import argparse
+import configparser
+import json
+import os
+import sys
+import time
+import statistics
+from collections import Counter
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple
+
+import fitz
+import numpy as np
+
+# 从 config.ini 读取 OCR 配置
+_CONFIG_PATH = Path(__file__).resolve().parent.parent.parent / "config" / "config.ini"
+_OCR_CONFIG: Dict[str, str] = {}
+if _CONFIG_PATH.exists():
+    _cp = configparser.ConfigParser()
+    _cp.read(str(_CONFIG_PATH), encoding="utf-8")
+    if _cp.has_section("ocr"):
+        _OCR_CONFIG = {
+            "GLM_OCR_API_URL": _cp.get("ocr", "GLM_OCR_API_URL", fallback="http://183.220.37.46:25429/v1/chat/completions"),
+            "GLM_OCR_API_KEY": _cp.get("ocr", "GLM_OCR_API_KEY", fallback=""),
+            "GLM_OCR_TIMEOUT": _cp.get("ocr", "GLM_OCR_TIMEOUT", fallback="600"),
+        }
+    else:
+        _OCR_CONFIG = {
+            "GLM_OCR_API_URL": "http://183.220.37.46:25429/v1/chat/completions",
+            "GLM_OCR_API_KEY": "",
+            "GLM_OCR_TIMEOUT": "600",
+        }
+else:
+    _OCR_CONFIG = {
+        "GLM_OCR_API_URL": "http://183.220.37.46:25429/v1/chat/completions",
+        "GLM_OCR_API_KEY": "",
+        "GLM_OCR_TIMEOUT": "600",
+    }
+
+# 将被测试的目标模块(本地解耦版,不依赖 core/foundation)
+TEST_DIR = Path(__file__).resolve().parent
+sys.path.insert(0, str(TEST_DIR.parent.parent))  # 项目根目录,使 utils_test 可导入
+
+from utils_test.minimal_pipeline._ocr_processor import (
+    OcrProcessor,
+    RAPID_LAYOUT_AVAILABLE,
+    TableRegion,
+    OcrResult,
+)
+
+
+# ============================================================
+# 数据结构
+# ============================================================
+
+@dataclass
+class DetectionSample:
+    """单次版面检测样本"""
+    page_num: int
+    label: str
+    score: float
+    bbox: Tuple[float, float, float, float]
+    width: float
+    height: float
+
+
+@dataclass
+class PageDetectionResult:
+    """单页版面检测结果"""
+    page_num: int
+    samples: List[DetectionSample]
+    table_count: int
+    figure_count: int
+
+
+@dataclass
+class OcrSampleResult:
+    """单次 OCR 识别样本"""
+    page_num: int
+    label: str
+    score: float
+    bbox: Tuple[float, float, float, float]
+    text: str
+    text_length: int
+    success: bool
+    latency_ms: float
+    retry_count: int = 0
+    error: Optional[str] = None
+
+
+@dataclass
+class OcrTestResult:
+    """OCR 测试结果汇总"""
+    file_name: str
+    total_pages: int
+    detection: Dict[str, Any] = field(default_factory=dict)
+    ocr: Dict[str, Any] = field(default_factory=dict)
+    pipeline: Dict[str, Any] = field(default_factory=dict)
+
+
+# ============================================================
+# OCR 测试器
+# ============================================================
+
+class OcrEffectivenessTester:
+    """OCR 模型效果与稳定性测试器"""
+
+    def __init__(
+        self,
+        ocr_api_url: str = "http://183.220.37.46:25429/v1/chat/completions",
+        ocr_api_key: str = "",
+        ocr_timeout: int = 600,
+        dpi: int = 200,
+        clip_top: float = 60,
+        clip_bottom: float = 60,
+        confidence_threshold: float = 0.5,
+        concurrent_workers: int = 5,
+    ):
+        self.dpi = dpi
+        self.clip_top = clip_top
+        self.clip_bottom = clip_bottom
+        self.confidence_threshold = confidence_threshold
+        self.concurrent_workers = concurrent_workers
+
+        # 初始化 OcrProcessor 用以复用其版面检测和 OCR 逻辑
+        self.ocr_processor = OcrProcessor(
+            ocr_api_url=ocr_api_url,
+            ocr_api_key=ocr_api_key,
+            ocr_timeout=ocr_timeout,
+            ocr_dpi=dpi,
+            confidence_threshold=confidence_threshold,
+            concurrent_workers=concurrent_workers,
+        )
+
+    # 工具: 检查 RapidLayout 是否可用
+    def check_environment(self) -> Dict[str, bool]:
+        """检查运行环境依赖"""
+        return {
+            "rapid_layout_available": RAPID_LAYOUT_AVAILABLE,
+            "pymupdf_available": True,
+            "numpy_available": True,
+        }
+
+    # ============================================================
+    # 效果测试: 版面检测
+    # ============================================================
+
+    def test_detection(
+        self,
+        pdf_path: Path,
+        pages: Optional[List[int]] = None,
+        save_images_dir: Optional[Path] = None,
+    ) -> Dict[str, Any]:
+        """测试 RapidLayout 版面检测效果"""
+        if not RAPID_LAYOUT_AVAILABLE:
+            return {"error": "RapidLayout 未安装,无法测试版面检测"}
+
+        doc = fitz.open(str(pdf_path))
+        try:
+            total_pages = len(doc)
+            target_pages = pages if pages is not None else list(range(total_pages))
+
+            all_samples: List[DetectionSample] = []
+            page_results: List[PageDetectionResult] = []
+
+            for page_num in target_pages:
+                page = doc.load_page(page_num)
+                rect = page.rect
+                clip_box = fitz.Rect(
+                    0, self.clip_top,
+                    rect.width, rect.height - self.clip_bottom,
+                )
+
+                # 使用 OcrProcessor 的版面检测逻辑
+                regions = self.ocr_processor.detect_table_regions(page, page_num + 1, clip_box)
+
+                page_samples: List[DetectionSample] = []
+                for bbox, score, label in regions:
+                    x1, y1, x2, y2 = bbox
+                    page_samples.append(DetectionSample(
+                        page_num=page_num + 1,
+                        label=label,
+                        score=score,
+                        bbox=bbox,
+                        width=x2 - x1,
+                        height=y2 - y1,
+                    ))
+
+                all_samples.extend(page_samples)
+                page_results.append(PageDetectionResult(
+                    page_num=page_num + 1,
+                    samples=page_samples,
+                    table_count=sum(1 for s in page_samples if s.label == "table"),
+                    figure_count=sum(1 for s in page_samples if s.label == "figure"),
+                ))
+
+                # 保存标注图片
+                if save_images_dir and page_samples:
+                    self._save_detection_image(page, clip_box, page_samples, page_num + 1, save_images_dir)
+
+        finally:
+            doc.close()
+
+        # 汇总统计
+        label_counter = Counter(s.label for s in all_samples)
+        table_count = label_counter.get("table", 0)
+        figure_count = label_counter.get("figure", 0)
+
+        # 尺寸分布
+        table_widths = [s.width for s in all_samples if s.label == "table"]
+        table_heights = [s.height for s in all_samples if s.label == "table"]
+
+        # 置信度分布
+        table_scores = [s.score for s in all_samples if s.label == "table"]
+        figure_scores = [s.score for s in all_samples if s.label == "figure"]
+
+        return {
+            "status": "ok",
+            "total_pages": total_pages,
+            "analyzed_pages": len(target_pages),
+            "total_regions": len(all_samples),
+            "label_distribution": dict(label_counter.most_common()),
+            "table_count": table_count,
+            "figure_count": figure_count,
+            "tables_per_page_avg": round(table_count / max(len(target_pages), 1), 2),
+            "figures_per_page_avg": round(figure_count / max(len(target_pages), 1), 2),
+            "table_width_avg": round(statistics.mean(table_widths), 1) if table_widths else None,
+            "table_height_avg": round(statistics.mean(table_heights), 1) if table_heights else None,
+            "table_score_avg": round(statistics.mean(table_scores), 4) if table_scores else None,
+            "figure_score_avg": round(statistics.mean(figure_scores), 4) if figure_scores else None,
+            "table_score_min": round(min(table_scores), 4) if table_scores else None,
+            "table_score_max": round(max(table_scores), 4) if table_scores else None,
+            "page_details": [
+                {
+                    "page": r.page_num,
+                    "table_count": r.table_count,
+                    "figure_count": r.figure_count,
+                    "regions": [
+                        {
+                            "label": s.label,
+                            "score": round(s.score, 4),
+                            "bbox": [round(c, 1) for c in s.bbox],
+                            "size": [round(s.width, 1), round(s.height, 1)],
+                        }
+                        for s in r.samples
+                    ],
+                }
+                for r in page_results if r.samples
+            ],
+        }
+
+    # ============================================================
+    # 效果测试: OCR 识别
+    # ============================================================
+
+    def test_ocr_recognition(
+        self,
+        pdf_path: Path,
+        pages: Optional[List[int]] = None,
+        max_regions_per_page: int = 5,
+    ) -> Dict[str, Any]:
+        """测试 GLM-OCR 识别质量,先检测表格区域再逐个识别"""
+        doc = fitz.open(str(pdf_path))
+        try:
+            total_pages = len(doc)
+            target_pages = pages if pages is not None else list(range(total_pages))
+
+            # 阶段1: 收集表格区域
+            all_regions: List[TableRegion] = []
+            for page_num in target_pages:
+                page = doc.load_page(page_num)
+                rect = page.rect
+                clip_box = fitz.Rect(
+                    0, self.clip_top,
+                    rect.width, rect.height - self.clip_bottom,
+                )
+                regions = self.ocr_processor.detect_table_regions(page, page_num + 1, clip_box)
+                for bbox, score, label in regions[:max_regions_per_page]:
+                    all_regions.append(TableRegion(
+                        page_num=page_num + 1,
+                        page=page,
+                        bbox=bbox,
+                        score=score,
+                        label=label,
+                    ))
+
+            if not all_regions:
+                return {
+                    "status": "no_regions",
+                    "message": "未检测到表格区域,无需 OCR 识别",
+                    "total_pages": total_pages,
+                }
+
+            # 阶段2: 串行逐个识别(记录详细统计)
+            ocr_samples: List[OcrSampleResult] = []
+            total = len(all_regions)
+
+            print(f"\n  [OCR识别测试] 共 {total} 个区域,开始串行识别...")
+
+            for idx, region in enumerate(all_regions):
+                start_time = time.perf_counter()
+                retry_count = 0
+                error = None
+                text = ""
+                success = False
+
+                # 手动调用 _ocr_table_region 并记录重试次数
+                # (使用指数退避重试,最多3次)
+                for attempt in range(3):
+                    try:
+                        text = self.ocr_processor._ocr_table_region(
+                            region.page, region.bbox, max_retries=1,
+                        )
+                        success = True
+                        retry_count = attempt
+                        break
+                    except Exception as e:
+                        error = str(e)[:200]
+                        if attempt < 2:
+                            time.sleep(1)
+
+                latency = (time.perf_counter() - start_time) * 1000
+
+                # 判断是否为 Non-table
+                is_non_table = text.strip() == ""
+                ocr_samples.append(OcrSampleResult(
+                    page_num=region.page_num,
+                    label=region.label,
+                    score=region.score,
+                    bbox=region.bbox,
+                    text=text,
+                    text_length=len(text.strip()),
+                    success=success or is_non_table,  # Non-table 也算成功
+                    latency_ms=round(latency, 1),
+                    retry_count=retry_count,
+                    error=error if not success and not is_non_table else None,
+                ))
+
+                progress = f"[{idx + 1}/{total}]"
+                status = "OK" if success else f"FAIL({error[:40]})"
+                print(f"    {progress} 第{region.page_num}页 [{region.label}] "
+                      f"score={region.score:.2f} 耗时={latency:.0f}ms 状态={status}")
+
+        finally:
+            doc.close()
+
+        # 统计
+        total_count = len(ocr_samples)
+        success_count = sum(1 for s in ocr_samples if s.success)
+        non_table_count = sum(1 for s in ocr_samples if not s.text.strip())
+        table_with_content = sum(1 for s in ocr_samples if s.text.strip())
+        latencies = [s.latency_ms for s in ocr_samples if s.success]
+        text_lengths = [s.text_length for s in ocr_samples if s.text_length > 0]
+
+        return {
+            "status": "ok",
+            "total_regions": total_count,
+            "success_count": success_count,
+            "non_table_count": non_table_count,
+            "table_with_content": table_with_content,
+            "success_rate": round(success_count / max(total_count, 1) * 100, 1),
+            "content_rate": round(table_with_content / max(total_count, 1) * 100, 1),
+            "latency_ms_avg": round(statistics.mean(latencies), 0) if latencies else None,
+            "latency_ms_min": round(min(latencies), 0) if latencies else None,
+            "latency_ms_max": round(max(latencies), 0) if latencies else None,
+            "latency_ms_p50": self._percentile(latencies, 50) if latencies else None,
+            "latency_ms_p95": self._percentile(latencies, 95) if latencies else None,
+            "text_length_avg": round(statistics.mean(text_lengths), 0) if text_lengths else None,
+            "text_length_max": max(text_lengths) if text_lengths else None,
+            "retry_distribution": dict(Counter(s.retry_count for s in ocr_samples).most_common()),
+            "label_breakdown": {
+                label: {
+                    "count": sum(1 for s in ocr_samples if s.label == label),
+                    "success": sum(1 for s in ocr_samples if s.label == label and s.success),
+                    "with_content": sum(1 for s in ocr_samples if s.label == label and s.text.strip()),
+                }
+                for label in set(s.label for s in ocr_samples)
+            },
+            "errors": list(set(s.error for s in ocr_samples if s.error))[:10],
+            "samples": [
+                {
+                    "page": s.page_num,
+                    "label": s.label,
+                    "score": round(s.score, 4),
+                    "text_preview": s.text[:200] if s.text else "(empty/Non-table)",
+                    "text_length": s.text_length,
+                    "success": s.success,
+                    "latency_ms": s.latency_ms,
+                    "retry_count": s.retry_count,
+                }
+                for s in ocr_samples[:20]  # 只保留前20个样本
+            ],
+        }
+
+    # ============================================================
+    # 稳定性测试: 并发 + 重试
+    # ============================================================
+
+    def test_stability(
+        self,
+        pdf_path: Path,
+        concurrency: int = 5,
+        iterations: int = 10,
+        pages: Optional[List[int]] = None,
+    ) -> Dict[str, Any]:
+        """稳定性测试:高并发 OCR 调用,观测错误率、延迟分布、资源泄漏
+
+        Args:
+            concurrency: 并发线程数
+            iterations: 总 OCR 调用次数(分配到各区域)
+        """
+        doc = fitz.open(str(pdf_path))
+        try:
+            total_pages = len(doc)
+            target_pages = pages if pages is not None else list(range(min(total_pages, 10)))
+
+            # 收集一定数量的表格区域作为测试样本
+            all_regions: List[TableRegion] = []
+            for page_num in target_pages:
+                page = doc.load_page(page_num)
+                rect = page.rect
+                clip_box = fitz.Rect(
+                    0, self.clip_top,
+                    rect.width, rect.height - self.clip_bottom,
+                )
+                regions = self.ocr_processor.detect_table_regions(page, page_num + 1, clip_box)
+                for bbox, score, label in regions:
+                    all_regions.append(TableRegion(
+                        page_num=page_num + 1,
+                        page=page,
+                        bbox=bbox,
+                        score=score,
+                        label=label,
+                    ))
+
+            if not all_regions:
+                return {
+                    "status": "no_regions",
+                    "message": "未检测到表格区域,跳过稳定性测试",
+                }
+
+            # 循环分配任务: 每次从 regions 列表循环取一个
+            total_tasks = min(iterations, len(all_regions) * 3)
+            task_regions = [all_regions[i % len(all_regions)] for i in range(total_tasks)]
+
+            print(f"\n  [稳定性测试] 并发={concurrency}, 任务数={total_tasks}, 区域样本数={len(all_regions)}")
+
+            # 并发执行 OCR
+            ocr_samples: List[OcrSampleResult] = []
+            progress_lock = [0]
+
+            def _ocr_task(region: TableRegion, task_idx: int) -> OcrSampleResult:
+                start_time = time.perf_counter()
+                error = None
+                text = ""
+                success = False
+                retry_count = 0
+
+                for attempt in range(3):
+                    try:
+                        text = self.ocr_processor._ocr_table_region(
+                            region.page, region.bbox, max_retries=1,
+                        )
+                        success = True
+                        retry_count = attempt
+                        break
+                    except Exception as e:
+                        error = str(e)[:200]
+                        time.sleep(0.5)
+
+                latency = (time.perf_counter() - start_time) * 1000
+
+                with ThreadPoolExecutor._thread_queues:
+                    pass  # dummy for lock
+
+                # 简单进度
+                progress_lock[0] += 1
+                done = progress_lock[0]
+                if done % max(1, total_tasks // 10) == 0 or done == total_tasks:
+                    pct = done / total_tasks * 100
+                    print(f"    [进度] {done}/{total_tasks} ({pct:.0f}%)", flush=True)
+
+                return OcrSampleResult(
+                    page_num=region.page_num,
+                    label=region.label,
+                    score=region.score,
+                    bbox=region.bbox,
+                    text=text,
+                    text_length=len(text.strip()),
+                    success=success,
+                    latency_ms=round(latency, 1),
+                    retry_count=retry_count,
+                    error=error if not success else None,
+                )
+
+            # 使用 ThreadPoolExecutor 并发执行
+            results: List[OcrSampleResult] = []
+            with ThreadPoolExecutor(max_workers=concurrency) as executor:
+                futures = {
+                    executor.submit(_ocr_task, region, idx): (region, idx)
+                    for idx, region in enumerate(task_regions)
+                }
+                for future in as_completed(futures):
+                    try:
+                        results.append(future.result())
+                    except Exception as e:
+                        # 不会发生,因为内部已 catch
+                        pass
+
+            ocr_samples = results
+
+        finally:
+            doc.close()
+
+        # 统计
+        total_count = len(ocr_samples)
+        success_count = sum(1 for s in ocr_samples if s.success)
+        non_table_count = sum(1 for s in ocr_samples if not s.text.strip())
+        table_with_content = sum(1 for s in ocr_samples if s.text.strip())
+        fail_count = total_count - success_count
+        latencies = sorted(s.latency_ms for s in ocr_samples if s.success)
+
+        return {
+            "status": "ok",
+            "concurrency": concurrency,
+            "total_requests": total_count,
+            "success_count": success_count,
+            "fail_count": fail_count,
+            "non_table_count": non_table_count,
+            "table_with_content": table_with_content,
+            "success_rate": round(success_count / max(total_count, 1) * 100, 1),
+            "error_rate": round(fail_count / max(total_count, 1) * 100, 1),
+            "latency_ms_avg": round(statistics.mean(latencies), 0) if latencies else None,
+            "latency_ms_min": min(latencies) if latencies else None,
+            "latency_ms_max": max(latencies) if latencies else None,
+            "latency_ms_p50": self._percentile(latencies, 50) if latencies else None,
+            "latency_ms_p95": self._percentile(latencies, 95) if latencies else None,
+            "latency_ms_p99": self._percentile(latencies, 99) if latencies else None,
+            "latency_ms_std": round(statistics.stdev(latencies), 0) if len(latencies) > 1 else None,
+            "retry_distribution": dict(Counter(s.retry_count for s in ocr_samples).most_common()),
+            "errors": list(set(s.error for s in ocr_samples if s.error))[:10],
+        }
+
+    # ============================================================
+    # 全链路测试: 检测 → OCR → 回填
+    # ============================================================
+
+    def test_full_pipeline(
+        self,
+        pdf_path: Path,
+        pages: Optional[List[int]] = None,
+    ) -> Dict[str, Any]:
+        """测试 OCR 全链路: 版面检测 → 并发 OCR → 文本回填"""
+        doc = fitz.open(str(pdf_path))
+        try:
+            total_pages = len(doc)
+            target_pages = pages if pages is not None else list(range(total_pages))
+
+            # 阶段1: 检测表格区域
+            all_regions: List[TableRegion] = []
+            for page_num in target_pages:
+                page = doc.load_page(page_num)
+                rect = page.rect
+                clip_box = fitz.Rect(
+                    0, self.clip_top,
+                    rect.width, rect.height - self.clip_bottom,
+                )
+                regions = self.ocr_processor.detect_table_regions(page, page_num + 1, clip_box)
+                for bbox, score, label in regions:
+                    all_regions.append(TableRegion(
+                        page_num=page_num + 1,
+                        page=page,
+                        bbox=bbox,
+                        score=score,
+                        label=label,
+                    ))
+
+            table_count = sum(1 for r in all_regions if r.label == "table")
+            figure_count = sum(1 for r in all_regions if r.label == "figure")
+
+            if not all_regions:
+                return {
+                    "status": "no_regions",
+                    "total_pages": total_pages,
+                    "message": "未检测到表格/图片区域",
+                }
+
+            # 阶段2: 并发 OCR
+            ocr_start = time.perf_counter()
+            ocr_results = extractor._process_ocr_concurrent(all_regions)
+            ocr_elapsed = time.perf_counter() - ocr_start
+
+            ocr_success = sum(1 for r in ocr_results if r.success and r.text.strip())
+            ocr_fail = sum(1 for r in ocr_results if not r.success)
+            ocr_empty = sum(1 for r in ocr_results if r.success and not r.text.strip())
+
+            # 阶段3: 检查文本回填效果
+            # 对每页对比 原始文本 vs OCR回填文本
+            page_comparison = []
+            for page_num in target_pages:
+                page = doc.load_page(page_num)
+                rect = page.rect
+                clip_box = fitz.Rect(
+                    0, self.clip_top,
+                    rect.width, rect.height - self.clip_bottom,
+                )
+                original_text = page.get_text("text", clip=clip_box)
+
+                page_ocr_results = [
+                    {
+                        "region_index": i,
+                        "bbox": r.bbox,
+                        "score": r.score,
+                        "ocr_text": r.text,
+                    }
+                    for i, r in enumerate(ocr_results)
+                    if r.page_num == page_num + 1 and r.success
+                ]
+
+                replaced_text = extractor._replace_table_regions(
+                    page, original_text, page_ocr_results, clip_box,
+                )
+
+                has_replacement = replaced_text != original_text
+
+                page_comparison.append({
+                    "page": page_num + 1,
+                    "original_length": len(original_text),
+                    "replaced_length": len(replaced_text),
+                    "has_replacement": has_replacement,
+                    "ocr_regions_on_page": len(page_ocr_results),
+                    "length_change": len(replaced_text) - len(original_text),
+                })
+
+        finally:
+            doc.close()
+
+        replaced_pages = sum(1 for p in page_comparison if p["has_replacement"])
+        total_latencies = [r.latency_ms for r in ocr_results if r.success]
+
+        return {
+            "status": "ok",
+            "total_pages": total_pages,
+            "analyzed_pages": len(target_pages),
+            "total_regions": len(all_regions),
+            "table_count": table_count,
+            "figure_count": figure_count,
+            "ocr_results": {
+                "total": len(ocr_results),
+                "success_with_content": ocr_success,
+                "empty_non_table": ocr_empty,
+                "failed": ocr_fail,
+                "content_rate": round(ocr_success / max(len(ocr_results), 1) * 100, 1),
+                "ocr_total_time_s": round(ocr_elapsed, 2),
+                "ocr_avg_latency_ms": round(statistics.mean(total_latencies), 0) if total_latencies else None,
+            },
+            "replacement": {
+                "pages_with_replacement": replaced_pages,
+                "replacement_rate": round(replaced_pages / max(len(target_pages), 1) * 100, 1),
+            },
+            "page_details": page_comparison[:30],
+        }
+
+    # ============================================================
+    # 辅助方法
+    # ============================================================
+
+    @staticmethod
+    def _percentile(data: List[float], p: float) -> float:
+        if not data:
+            return 0.0
+        sorted_data = sorted(data)
+        idx = max(0, min(len(sorted_data) - 1, int(len(sorted_data) * p / 100)))
+        return round(sorted_data[idx], 0)
+
+    def _save_detection_image(
+        self,
+        page: fitz.Page,
+        clip_box: fitz.Rect,
+        samples: List[DetectionSample],
+        page_num: int,
+        output_dir: Path,
+    ):
+        """保存带检测框的页面图片"""
+        try:
+            from PIL import Image, ImageDraw
+        except ImportError:
+            return
+
+        pix = page.get_pixmap(dpi=self.dpi, clip=clip_box)
+        img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
+        draw = ImageDraw.Draw(img)
+
+        # 还原原始图片尺寸(裁剪前)
+        scale_x = pix.width / clip_box.width
+        scale_y = pix.height / clip_box.height
+
+        colors = {
+            "table": (0, 255, 0),
+            "figure": (255, 80, 80),
+        }
+
+        for s in samples:
+            color = colors.get(s.label, (200, 200, 200))
+            x1 = (s.bbox[0] - clip_box.x0) * scale_x
+            y1 = (s.bbox[1] - clip_box.y0) * scale_y
+            x2 = (s.bbox[2] - clip_box.x0) * scale_x
+            y2 = (s.bbox[3] - clip_box.y0) * scale_y
+            draw.rectangle([x1, y1, x2, y2], outline=color, width=2)
+            draw.text((x1 + 2, y1 + 2), f"{s.label} ({s.score:.2f})", fill=color)
+
+        output_path = output_dir / f"page_{page_num:03d}_ocr_detection.jpg"
+        img.save(str(output_path), quality=85)
+
+
+# ============================================================
+# 报告打印
+# ============================================================
+
+def print_env_report(env: Dict[str, bool]):
+    print("\n" + "=" * 60)
+    print("  环境检查")
+    print("=" * 60)
+    for k, v in env.items():
+        status = "✓ 可用" if v else "✗ 不可用"
+        print(f"  {k:30s}: {status}")
+    if not env.get("rapid_layout_available"):
+        print("\n  ⚠ RapidLayout 未安装,版面检测功能受限")
+        print("    安装: pip install rapid-layout")
+    print()
+
+
+def print_detection_report(result: Dict[str, Any]):
+    if "error" in result:
+        print(f"  [错误] {result['error']}")
+        return
+
+    print("\n" + "=" * 70)
+    print("  版面检测效果报告 (RapidLayout)")
+    print("=" * 70)
+    print(f"  分析页数:     {result['analyzed_pages']} / {result['total_pages']}")
+    print(f"  检测区域总数: {result['total_regions']}")
+    print(f"  表格数:       {result['table_count']} (均 {result['tables_per_page_avg']}/页)")
+    print(f"  图片数:       {result['figure_count']} (均 {result['figures_per_page_avg']}/页)")
+    print()
+
+    if result["label_distribution"]:
+        print("  标签分布:")
+        for label, count in result["label_distribution"].items():
+            pct = count / max(result["total_regions"], 1) * 100
+            bar = "█" * int(pct / 2)
+            print(f"    {label:12s}: {count:4d} ({pct:5.1f}%) {bar}")
+
+    print()
+    if result.get("table_score_avg"):
+        print(f"  表格置信度: avg={result['table_score_avg']:.3f} "
+              f"min={result['table_score_min']:.3f} max={result['table_score_max']:.3f}")
+    if result.get("figure_score_avg"):
+        print(f"  图片置信度: avg={result['figure_score_avg']:.3f}")
+
+    if result.get("page_details"):
+        print()
+        print("  逐页详情:")
+        for p in result["page_details"]:
+            regions_str = ", ".join(
+                f"[{r['label']}]({r['score']:.2f})"
+                for r in p["regions"]
+            )
+            print(f"    第{p['page']:3d}页:  table={p['table_count']}  figure={p['figure_count']}  {regions_str}")
+
+
+def print_ocr_report(result: Dict[str, Any]):
+    if "error" in result:
+        print(f"  [错误] {result['error']}")
+        return
+    if result.get("status") == "no_regions":
+        print(f"\n  [提示] {result['message']}")
+        return
+
+    print("\n" + "=" * 70)
+    print("  OCR 识别效果报告 (GLM-OCR)")
+    print("=" * 70)
+    print(f"  总区域数:       {result['total_regions']}")
+    print(f"  识别成功:       {result['success_count']} ({result['success_rate']}%)")
+    print(f"  含表格内容:     {result['table_with_content']} ({result['content_rate']}%)")
+    print(f"  Non-table(跳过): {result['non_table_count']}")
+
+    if result.get("latency_ms_avg"):
+        print(f"\n  延迟统计 (ms):")
+        print(f"    平均:  {result['latency_ms_avg']:.0f}")
+        print(f"    最小:  {result['latency_ms_min']:.0f}")
+        print(f"    最大:  {result['latency_ms_max']:.0f}")
+        print(f"    P50:   {result['latency_ms_p50']:.0f}")
+        print(f"    P95:   {result['latency_ms_p95']:.0f}")
+
+    if result.get("text_length_avg"):
+        print(f"\n  文本长度:  avg={result['text_length_avg']:.0f}  max={result['text_length_max']}")
+
+    if result.get("retry_distribution"):
+        print(f"\n  重试分布: {result['retry_distribution']}")
+
+    if result.get("label_breakdown"):
+        print(f"\n  按标签统计:")
+        for label, stats in result["label_breakdown"].items():
+            print(f"    {label:8s}: 总数={stats['count']}, 成功={stats['success']}, "
+                  f"含内容={stats['with_content']}")
+
+    if result.get("errors"):
+        print(f"\n  错误 ({len(result['errors'])} 种):")
+        for e in result["errors"]:
+            print(f"    - {e}")
+
+    if result.get("samples"):
+        print(f"\n  样本预览 (前20):")
+        print(f"    {'页':>4s} {'标签':>8s} {'置信度':>8s} {'耗时ms':>8s} {'重试':>4s} {'内容':>6s}  {'预览'}")
+        print(f"    {'-'*60}")
+        for s in result["samples"]:
+            preview = (s["text_preview"][:50] + "..") if len(s.get("text_preview", "")) > 50 else s.get("text_preview", "")
+            ok = "✓" if s["success"] else "✗"
+            print(f"    {s['page']:4d} {s['label']:>8s} {s['score']:.2f}     {s['latency_ms']:6.0f} {s['retry_count']:3d}  "
+                  f"{ok:>4s}  {preview}")
+    print()
+
+
+def print_stability_report(result: Dict[str, Any]):
+    if result.get("status") == "no_regions":
+        print(f"\n  [提示] {result['message']}")
+        return
+
+    print("\n" + "=" * 70)
+    print("  稳定性测试报告")
+    print("=" * 70)
+    print(f"  并发数:        {result['concurrency']}")
+    print(f"  总请求数:      {result['total_requests']}")
+    print(f"  成功:          {result['success_count']} ({result['success_rate']}%)")
+    print(f"  失败:          {result['fail_count']} ({result['error_rate']}%)")
+    print(f"  含表格内容:    {result['table_with_content']}")
+    print(f"  Non-table跳过: {result['non_table_count']}")
+
+    if result.get("latency_ms_avg"):
+        print(f"\n  延迟统计 (ms):")
+        print(f"    平均:  {result['latency_ms_avg']:.0f}")
+        print(f"    最小:  {result['latency_ms_min']:.0f}")
+        print(f"    最大:  {result['latency_ms_max']:.0f}")
+        print(f"    P50:   {result['latency_ms_p50']:.0f}")
+        print(f"    P95:   {result['latency_ms_p95']:.0f}")
+        print(f"    P99:   {result['latency_ms_p99']:.0f}")
+        if result.get("latency_ms_std"):
+            print(f"    标准差: {result['latency_ms_std']:.0f}")
+
+    if result.get("retry_distribution"):
+        print(f"\n  重试分布: {result['retry_distribution']}")
+
+    if result.get("errors"):
+        print(f"\n  错误列表:")
+        for e in result["errors"]:
+            print(f"    - {e}")
+    print()
+
+
+def print_pipeline_report(result: Dict[str, Any]):
+    if result.get("status") == "no_regions":
+        print(f"\n  [提示] {result['message']}")
+        return
+
+    print("\n" + "=" * 70)
+    print("  全链路测试报告 (检测 → OCR → 回填)")
+    print("=" * 70)
+    print(f"  总页数:          {result['total_pages']}")
+    print(f"  分析页数:        {result['analyzed_pages']}")
+    print(f"  检测区域:        表格={result['table_count']}, 图片={result['figure_count']}")
+
+    ocr = result.get("ocr_results", {})
+    print(f"\n  OCR 识别:")
+    print(f"    总区域:       {ocr.get('total', 0)}")
+    print(f"    含内容:       {ocr.get('success_with_content', 0)} ({ocr.get('content_rate', 0)}%)")
+    print(f"    Non-table跳过: {ocr.get('empty_non_table', 0)}")
+    print(f"    失败:         {ocr.get('failed', 0)}")
+    print(f"    总耗时:       {ocr.get('ocr_total_time_s', 0)}s")
+    if ocr.get("ocr_avg_latency_ms"):
+        print(f"    平均延迟:     {ocr['ocr_avg_latency_ms']:.0f}ms")
+
+    repl = result.get("replacement", {})
+    print(f"\n  文本回填:")
+    print(f"    发生替换的页数: {repl.get('pages_with_replacement', 0)}/{result['analyzed_pages']} ({repl.get('replacement_rate', 0)}%)")
+    print()
+
+
+def _save_ocr_region_images(
+    pdf_path: Path,
+    det_result: Dict[str, Any],
+    table_img_dir: Path,
+    figure_img_dir: Path,
+    tester: OcrEffectivenessTester,
+) -> Dict[str, int]:
+    """将检测到的表格/图片区域截图和OCR识别内容分别保存到对应目录"""
+    from PIL import Image
+
+    count = {"table": 0, "figure": 0}
+    page_details = det_result.get("page_details", [])
+    if not page_details:
+        return count
+
+    doc = fitz.open(str(pdf_path))
+    try:
+        for page_info in page_details:
+            page_num = page_info["page"] - 1
+            page = doc.load_page(page_num)
+            rect = page.rect
+            clip_box = fitz.Rect(0, tester.clip_top, rect.width, rect.height - tester.clip_bottom)
+
+            for region in page_info.get("regions", []):
+                label = region["label"]
+                bbox = region["bbox"]
+                score = region["score"]
+
+                # 确定保存目录
+                if label == "table":
+                    target_dir = table_img_dir
+                elif label == "figure":
+                    target_dir = figure_img_dir
+                else:
+                    continue
+
+                pdf_rect = fitz.Rect(bbox)
+                pix = page.get_pixmap(dpi=tester.dpi, clip=pdf_rect)
+                img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
+
+                basename = f"{pdf_path.stem}_p{page_info['page']:03d}_{label}_{count[label]:02d}"
+                img_path = target_dir / f"{basename}.jpg"
+                img.save(str(img_path), quality=90)
+
+                # 尝试 OCR 识别并保存文本内容
+                try:
+                    ocr_text = tester.ocr_processor._ocr_table_region(page, bbox, max_retries=1)
+                    txt_path = target_dir / f"{basename}.txt"
+                    with open(txt_path, "w", encoding="utf-8") as f:
+                        f.write(ocr_text)
+                except Exception as e:
+                    txt_path = target_dir / f"{basename}.txt"
+                    with open(txt_path, "w", encoding="utf-8") as f:
+                        f.write(f"[OCR识别失败] {e}")
+
+                count[label] += 1
+
+    finally:
+        doc.close()
+
+    return count
+
+
+def _test_catalog_ocr(
+    pdf_path: Path,
+    tester: OcrEffectivenessTester,
+    catalog_dir: Path,
+    detection_img_dir: Path,
+    timestamp: str = "",
+) -> Dict[str, Any]:
+    """测试目录页(目录)OCR识别效果,使用项目实际的 _extract_catalog 链路
+
+    输出格式与项目 catalog JSON 一致:
+    {"chapters": [...], "total_chapters": N, "raw_ocr_text": "...", "formatted_text": "..."}
+    """
+    from utils_test.minimal_pipeline._toc_detector import TOCCatalogExtractor
+
+    file_content = pdf_path.read_bytes()
+
+    # 文件名前缀(提前定义,供后续截图使用)
+    suffix = f"_{timestamp}" if timestamp else ""
+    stem = f"{pdf_path.stem}{suffix}"
+
+    # 使用 YOLO + GLM-OCR 目录提取链路(不依赖 core/foundation)
+    extractor = TOCCatalogExtractor(
+        model_path=str(TEST_DIR / "best.pt"),
+        ocr_api_url=tester.ocr_processor.ocr_api_url,
+        ocr_api_key=tester.ocr_processor.ocr_api_key,
+        ocr_timeout=tester.ocr_processor.ocr_timeout,
+    )
+
+    catalog = extractor.detect_and_extract(file_content)
+    catalog = catalog or {}
+
+    # 保存目录页截图(使用 YOLO 检测到的目录页码范围)
+    from PIL import Image as PILImage
+    try:
+        doc = fitz.open(stream=file_content)
+        try:
+            toc_range = (catalog or {}).get("toc_page_range")
+            if toc_range:
+                for page_num in range(toc_range["start"] - 1, toc_range["end"]):
+                    page = doc.load_page(page_num)
+                    pix = page.get_pixmap(dpi=150)
+                    img = PILImage.frombytes("RGB", [pix.width, pix.height], pix.samples)
+                    img_path = catalog_dir / f"{stem}_catalog_page_{page_num + 1:03d}.jpg"
+                    img.save(str(img_path), quality=85)
+        finally:
+            doc.close()
+    except Exception as e:
+        print(f"  [警告] 目录页截图保存失败: {e}")
+
+    # 构造与项目格式一致的 catalog 输出
+    # 格式: {"catalog": {"chapters": [...], "total_chapters": N}, "raw_ocr_text": "..."}
+    # 不含 content/page_start/page_end 等后续流程才填充的字段
+    catalog_output: Dict[str, Any] = {
+        "catalog": {
+            "chapters": [],
+            "total_chapters": 0,
+        },
+        "raw_ocr_text": "",
+    }
+    raw_ocr_text = ""
+    extract_status = "failed"
+
+    if catalog:
+        chapters = catalog.get("chapters", [])
+        # 清理掉可能混入的 content 等字段(仅保留 catalog 原始字段)
+        clean_chapters = []
+        for ch in chapters:
+            clean_ch = {
+                "index": ch.get("index", 0),
+                "title": ch.get("title", ""),
+                "page": str(ch.get("page", "")),
+                "original": ch.get("original", ""),
+            }
+            clean_subs = []
+            for sub in ch.get("subsections", []):
+                clean_subs.append({
+                    "title": sub.get("title", ""),
+                    "page": str(sub.get("page", "")),
+                    "level": sub.get("level", 2),
+                    "original": sub.get("original", ""),
+                })
+            clean_ch["subsections"] = clean_subs
+            clean_chapters.append(clean_ch)
+
+        catalog_output["catalog"]["chapters"] = clean_chapters
+        catalog_output["catalog"]["total_chapters"] = len(clean_chapters)
+        raw_ocr_text = catalog.get("raw_ocr_text", "") or ""
+        catalog_output["raw_ocr_text"] = raw_ocr_text
+        extract_status = "success"
+
+    # 保存 catalog JSON(含 raw_ocr_text 字段替代单独的 txt)
+    json_path = catalog_dir / f"{stem}_catalog.json"
+    with open(json_path, "w", encoding="utf-8") as f:
+        json.dump(catalog_output, f, ensure_ascii=False, indent=2)
+
+    # 返回简洁统计信息,json_path 供主流程使用
+    return {
+        "extract_status": extract_status,
+        "total_chapters": catalog_output["catalog"]["total_chapters"],
+        "raw_ocr_length": len(raw_ocr_text),
+        "json_path": str(json_path),
+    }
+
+
+def _test_catalog_stability(
+    pdf_path: Path,
+    tester: OcrEffectivenessTester,
+    catalog_dir: Path,
+    iterations: int = 10,
+) -> Dict[str, Any]:
+    """目录OCR稳定性测试:多次执行目录提取,对比结果一致性"""
+    from utils_test.minimal_pipeline._toc_detector import TOCCatalogExtractor
+
+    file_content = pdf_path.read_bytes()
+    timestamp = time.strftime("%Y%m%d_%H%M%S")
+    results = []
+
+    # 先快速提取一次获取目录页范围用于截图
+    first_extractor = TOCCatalogExtractor(
+        model_path=str(TEST_DIR / "best.pt"),
+        ocr_api_url=tester.ocr_processor.ocr_api_url,
+        ocr_api_key=tester.ocr_processor.ocr_api_key,
+        ocr_timeout=tester.ocr_processor.ocr_timeout,
+    )
+    first_catalog = first_extractor.detect_and_extract(file_content) or {}
+
+    # 保存目录页截图(使用 YOLO 检测到的目录页码范围)
+    from PIL import Image as PILImage
+    try:
+        doc = fitz.open(stream=file_content)
+        try:
+            toc_range = first_catalog.get("toc_page_range")
+            if toc_range:
+                for page_num in range(toc_range["start"] - 1, toc_range["end"]):
+                    page = doc.load_page(page_num)
+                    pix = page.get_pixmap(dpi=150)
+                    img = PILImage.frombytes("RGB", [pix.width, pix.height], pix.samples)
+                    img_path = catalog_dir / f"{pdf_path.stem}_{timestamp}_catalog_page_{page_num + 1:03d}.jpg"
+                    img.save(str(img_path), quality=85)
+        finally:
+            doc.close()
+        print(f"  [保存] 目录页截图 → {catalog_dir}/")
+    except Exception as e:
+        print(f"  [警告] 目录页截图保存失败: {e}")
+
+    print(f"\n  [目录稳定性] 开始 {iterations} 次目录提取...")
+
+    for i in range(iterations):
+        extractor = TOCCatalogExtractor(
+            model_path=str(TEST_DIR / "best.pt"),
+            ocr_api_url=tester.ocr_processor.ocr_api_url,
+            ocr_api_key=tester.ocr_processor.ocr_api_key,
+            ocr_timeout=tester.ocr_processor.ocr_timeout,
+        )
+
+        start = time.perf_counter()
+        try:
+            catalog = extractor.detect_and_extract(file_content)
+            catalog = catalog or {}
+            elapsed = (time.perf_counter() - start) * 1000
+
+            if catalog:
+                chapters = catalog.get("chapters", [])
+                raw_text = catalog.get("raw_ocr_text", "") or ""
+                chapter_titles = [ch.get("title", "") for ch in chapters]
+
+                # 保存本次运行的独立 catalog JSON(含 raw_ocr_text)
+                run_output = {
+                    "run": i + 1,
+                    "catalog": {
+                        "chapters": [
+                            {"index": ch.get("index"), "title": ch.get("title", ""),
+                             "page": str(ch.get("page", "")), "original": ch.get("original", ""),
+                             "subsections": [
+                                 {"title": s.get("title", ""), "page": str(s.get("page", "")),
+                                  "level": s.get("level", 2), "original": s.get("original", "")}
+                                 for s in (ch.get("subsections") or [])
+                             ]}
+                            for ch in chapters
+                        ],
+                        "total_chapters": len(chapters),
+                    },
+                    "raw_ocr_text": raw_text,
+                    "elapsed_ms": round(elapsed, 0),
+                }
+                run_path = catalog_dir / f"{pdf_path.stem}_{timestamp}_{i + 1:03d}_catalog.json"
+                with open(run_path, "w", encoding="utf-8") as f:
+                    json.dump(run_output, f, ensure_ascii=False, indent=2)
+
+                results.append({
+                    "run": i + 1,
+                    "success": True,
+                    "elapsed_ms": round(elapsed, 0),
+                    "chapter_count": len(chapters),
+                    "chapter_titles": chapter_titles,
+                    "raw_text_len": len(raw_text),
+                    "raw_text_hash": hash(raw_text),
+                    "json_path": str(run_path),
+                    "raw_text": raw_text,
+                })
+            else:
+                results.append({
+                    "run": i + 1, "success": False, "elapsed_ms": round(elapsed, 0),
+                    "chapter_count": 0, "error": "catalog is None",
+                })
+        except Exception as e:
+            results.append({
+                "run": i + 1, "success": False,
+                "elapsed_ms": round((time.perf_counter() - start) * 1000, 0),
+                "error": str(e)[:200],
+            })
+
+        r = results[-1]
+        print(f"    [{i + 1}/{iterations}] "
+              f"{'OK' if r['success'] else 'FAIL'} "
+              f"{r.get('chapter_count', 0):>2}章 "
+              f"{r.get('elapsed_ms', 0):.0f}ms", flush=True)
+
+    # ---- 一致性分析 ----
+    success_runs = [r for r in results if r["success"]]
+    fail_count = len(results) - len(success_runs)
+    title_sequences = [tuple(r["chapter_titles"]) for r in success_runs]
+    unique_sequences = set(title_sequences)
+    text_hashes = {r.get("raw_text_hash") for r in success_runs if "raw_text_hash" in r}
+    chapter_counts = [r["chapter_count"] for r in success_runs]
+
+    count_distribution = {}
+    for c in chapter_counts:
+        count_distribution[c] = count_distribution.get(c, 0) + 1
+
+    patterns = []
+    for seq in unique_sequences:
+        matching_runs = [r["run"] for r in success_runs if tuple(r["chapter_titles"]) == seq]
+        patterns.append({"titles": list(seq), "count": len(matching_runs), "runs": matching_runs})
+    patterns.sort(key=lambda x: -x["count"])
+
+    latencies = [r["elapsed_ms"] for r in success_runs]
+    stability = {
+        "total_runs": len(results),
+        "success_count": len(success_runs),
+        "fail_count": fail_count,
+        "success_rate": round(len(success_runs) / max(len(results), 1) * 100, 1),
+        "all_titles_identical": len(unique_sequences) <= 1,
+        "all_text_identical": len(text_hashes) <= 1,
+        "unique_title_patterns": len(unique_sequences),
+        "unique_text_hashes": len(text_hashes),
+        "chapter_count_distribution": count_distribution,
+        "most_common_chapter_count": max(set(chapter_counts), key=chapter_counts.count) if chapter_counts else 0,
+        "latency_ms_avg": round(statistics.mean(latencies), 0) if latencies else None,
+        "latency_ms_min": min(latencies) if latencies else None,
+        "latency_ms_max": max(latencies) if latencies else None,
+        "patterns": patterns,
+    }
+
+    output = {"stability": stability, "runs": results}
+    json_path = catalog_dir / f"{pdf_path.stem}_{timestamp}_catalog_stability.json"
+    with open(json_path, "w", encoding="utf-8") as f:
+        json.dump(output, f, ensure_ascii=False, indent=2)
+
+    print(f"  [保存] 稳定性报告 → {json_path}")
+    return stability
+
+
+# ============================================================
+# 主入口
+# ============================================================
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="OCR 模型效果与稳定性测试",
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+示例:
+  # 默认:仅目录提取测试
+  python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p test.pdf
+
+  # 版面检测测试(表格/图片检测 + OCR识别)
+  python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p test.pdf --detection
+
+  # 全链路测试(检测+OCR+回填)
+  python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p test.pdf --detection --full-pipeline
+
+  # 稳定性测试(20并发,50次调用)
+  python utils_test/minimal_pipeline/test_ocr_effectiveness.py -p test.pdf --detection --stability --concurrency 20 --iterations 50
+
+  # 批量测试目录下所有 PDF
+  python utils_test/minimal_pipeline/test_ocr_effectiveness.py -d ./pdfs/
+        """,
+    )
+    parser.add_argument("-p", "--pdf", help="单个 PDF 文件路径")
+    parser.add_argument("-d", "--dir", help="批量: PDF 文件目录")
+    parser.add_argument("positional_pdf", nargs="?", metavar="PDF", help="也支持位置参数直接传 PDF 路径")
+    parser.add_argument("--pages", help="分析指定页码, 逗号分隔 (1-based)")
+    parser.add_argument("--detection", action="store_true", help="版面检测 + OCR 识别测试(默认仅目录OCR)")
+    parser.add_argument("--catalog-stability", action="store_true", help="目录OCR稳定性测试(多次提取对比一致性)")
+    parser.add_argument("--catalog-iterations", type=int, default=10, help="目录稳定性测试迭代次数(默认 10)")
+    parser.add_argument("--full-pipeline", action="store_true", help="全链路测试(需同时开启 --detection)")
+    parser.add_argument("--stability", action="store_true", help="稳定性测试(需同时开启 --detection)")
+    parser.add_argument("--concurrency", type=int, default=5, help="稳定性测试并发数")
+    parser.add_argument("--iterations", type=int, default=10, help="稳定性测试迭代次数")
+    parser.add_argument("--output-dir", help="输出目录(默认 utils_test/minimal_pipeline/temp/test_ocr_effectiveness/)")
+    parser.add_argument("--confidence", type=float, default=0.5, help="检测置信度阈值")
+    parser.add_argument("--json", action="store_true", help="以 JSON 格式输出结果")
+    parser.add_argument("--ocr-url", default=_OCR_CONFIG["GLM_OCR_API_URL"], help="OCR API 地址")
+    parser.add_argument("--ocr-key", default=_OCR_CONFIG["GLM_OCR_API_KEY"], help="OCR API 密钥")
+    parser.add_argument("--ocr-timeout", type=int, default=int(_OCR_CONFIG["GLM_OCR_TIMEOUT"]), help="OCR 超时秒数")
+    args = parser.parse_args()
+
+    if not args.pdf and not args.dir:
+        if args.positional_pdf:
+            args.pdf = args.positional_pdf
+        else:
+            parser.print_help()
+            return 1
+
+    tester = OcrEffectivenessTester(
+        ocr_api_url=args.ocr_url,
+        ocr_api_key=args.ocr_key,
+        ocr_timeout=args.ocr_timeout,
+        confidence_threshold=args.confidence,
+        concurrent_workers=args.concurrency if args.stability else 5,
+    )
+
+    pages = None
+    if args.pages:
+        pages = [int(p.strip()) - 1 for p in args.pages.split(",")]
+
+    # ---- 环境检查 ----
+    env = tester.check_environment()
+    if not args.json:
+        print_env_report(env)
+
+    # ---- 单个文件模式 ----
+    if args.pdf:
+        pdf_path = Path(args.pdf)
+        if not pdf_path.exists():
+            print(f"[错误] PDF 文件不存在: {pdf_path}")
+            return 1
+
+        # 输出目录结构: temp/test_ocr_effectiveness/{catalog, detection, table, figure, results}
+        base_output = Path(args.output_dir) if args.output_dir else TEST_DIR / "temp" / "test_ocr_effectiveness"
+        catalog_dir = base_output / "catalog"
+        detection_img_dir = base_output / "detection"
+        table_img_dir = base_output / "table"
+        figure_img_dir = base_output / "figure"
+        results_dir = base_output / "results"
+        for d in [catalog_dir, detection_img_dir, table_img_dir, figure_img_dir, results_dir]:
+            d.mkdir(parents=True, exist_ok=True)
+
+        timestamp = time.strftime("%Y%m%d_%H%M%S")
+
+        print(f"\n[测试] {pdf_path.name}")
+        print(f"[输出] 目录提取 → {catalog_dir}/")
+        print(f"[输出] 版面检测图 → {detection_img_dir}/")
+        print(f"[输出] 表格区域截图 → {table_img_dir}/")
+        print(f"[输出] 图片区域截图 → {figure_img_dir}/")
+        print(f"[输出] JSON 结果 → {results_dir}/")
+        print("=" * 70)
+
+        # 目录页 OCR 识别测试
+        # 有 --catalog-stability 时,稳定性编号文件已含完整结果,跳过单次提取
+        catalog_result = None
+        catalog_stability_result = None
+
+        if args.catalog_stability:
+            catalog_stability_result = _test_catalog_stability(
+                pdf_path, tester, catalog_dir, iterations=args.catalog_iterations,
+            )
+            s = catalog_stability_result
+            print(f"  [目录稳定性] {s['total_runs']}次, "
+                  f"一致={s['all_titles_identical']}, "
+                  f"模式数={s['unique_title_patterns']}, "
+                  f"延迟avg={s['latency_ms_avg']:.0f}ms")
+            catalog_result = {
+                "extract_status": "success" if s.get("success_count", 0) > 0 else "failed",
+                "total_chapters": s.get("most_common_chapter_count", 0),
+                "raw_ocr_length": 0,
+                "json_path": "",
+            }
+        else:
+            catalog_result = _test_catalog_ocr(pdf_path, tester, catalog_dir, detection_img_dir, timestamp)
+            cat_status = catalog_result.get("extract_status", "failed")
+            cat_chapters = catalog_result.get("total_chapters", 0)
+            cat_raw_len = catalog_result.get("raw_ocr_length", 0)
+            print(f"  [目录OCR] status={cat_status}, chapters={cat_chapters}, raw_ocr_len={cat_raw_len}")
+            if cat_chapters > 0:
+                print(f"  [保存] catalog JSON → {catalog_result.get('json_path', '')}")
+
+        # 版面检测 + OCR 识别(仅 --detection 时启用)
+        det_result = {"status": "skipped", "total_pages": 0}
+        ocr_result = {"status": "skipped"}
+        pipeline_result = None
+        stab_result = None
+        saved_table_count = {"table": 0, "figure": 0}
+
+        if args.detection:
+            det_result = tester.test_detection(pdf_path, pages=pages, save_images_dir=detection_img_dir)
+            if not args.json:
+                print_detection_report(det_result)
+
+            saved_table_count = _save_ocr_region_images(pdf_path, det_result, table_img_dir, figure_img_dir, tester)
+            print(f"  [保存] 表格区域截图: {saved_table_count['table']} 张 → {table_img_dir}/")
+            print(f"  [保存] 图片区域截图: {saved_table_count['figure']} 张 → {figure_img_dir}/")
+
+            ocr_result = tester.test_ocr_recognition(pdf_path, pages=pages)
+            if not args.json:
+                print_ocr_report(ocr_result)
+
+            if args.full_pipeline:
+                pipeline_result = tester.test_full_pipeline(pdf_path, pages=pages)
+                if not args.json:
+                    print_pipeline_report(pipeline_result)
+
+            if args.stability:
+                stab_result = tester.test_stability(
+                    pdf_path, concurrency=args.concurrency, iterations=args.iterations, pages=pages,
+                )
+                if not args.json:
+                    print_stability_report(stab_result)
+
+        # 保存 JSON 结果到文件
+        output = {
+            "file": pdf_path.name,
+            "test_time": time.strftime("%Y-%m-%d %H:%M:%S"),
+            "environment": env,
+            "detection": det_result,
+            "ocr": ocr_result,
+            "catalog_ocr": {
+                "extract_status": catalog_result.get("extract_status"),
+                "total_chapters": catalog_result.get("total_chapters"),
+                "raw_ocr_length": catalog_result.get("raw_ocr_length"),
+                "json_path": catalog_result.get("json_path"),
+            },
+        }
+        if pipeline_result:
+            output["pipeline"] = pipeline_result
+        if stab_result:
+            output["stability"] = stab_result
+        if catalog_stability_result:
+            output["catalog_stability"] = {
+                "total_runs": catalog_stability_result.get("total_runs"),
+                "all_titles_identical": catalog_stability_result.get("all_titles_identical"),
+                "unique_title_patterns": catalog_stability_result.get("unique_title_patterns"),
+                "most_common_chapter_count": catalog_stability_result.get("most_common_chapter_count"),
+            }
+
+        json_path = results_dir / f"{pdf_path.stem}_ocr_test_result.json"
+        with open(json_path, "w", encoding="utf-8") as f:
+            json.dump(output, f, ensure_ascii=False, indent=2)
+        print(f"\n  [保存] JSON 结果 → {json_path}")
+
+        if args.json:
+            print(json.dumps(output, ensure_ascii=False, indent=2))
+
+        return 0
+
+    # ---- 批量模式 ----
+    dir_path = Path(args.dir)
+    if not dir_path.is_dir():
+        print(f"[错误] 目录不存在: {dir_path}")
+        return 1
+
+    pdf_files = sorted(dir_path.glob("*.pdf"))
+    if not pdf_files:
+        print(f"[错误] 目录下无 PDF 文件: {dir_path}")
+        return 1
+
+    # 批量输出目录
+    base_output = Path(args.output_dir) if args.output_dir else TEST_DIR / "temp" / "test_ocr_effectiveness"
+    catalog_dir = base_output / "catalog"
+    detection_img_dir = base_output / "detection"
+    table_img_dir = base_output / "table"
+    figure_img_dir = base_output / "figure"
+    results_dir = base_output / "results"
+    for d in [catalog_dir, detection_img_dir, table_img_dir, figure_img_dir, results_dir]:
+        d.mkdir(parents=True, exist_ok=True)
+
+    batch_timestamp = time.strftime("%Y%m%d_%H%M%S")
+
+    print(f"\n[批量测试] 找到 {len(pdf_files)} 个 PDF 文件")
+    print(f"[批量测试] 目录: {dir_path}\n")
+
+    batch_results: List[Dict] = []
+    for idx, pdf_path in enumerate(pdf_files, 1):
+        print(f"[{idx}/{len(pdf_files)}] {pdf_path.name} ...", flush=True)
+        try:
+            # 目录提取(默认执行)
+            cat = _test_catalog_ocr(pdf_path, tester, catalog_dir, detection_img_dir, batch_timestamp)
+            cat_chap = cat.get("total_chapters", 0)
+            cat_ocr_len = cat.get("raw_ocr_length", 0)
+
+            file_result = {
+                "file": pdf_path.name,
+                "catalog_status": cat.get("extract_status"),
+                "catalog_chapters": cat_chap,
+                "raw_ocr_length": cat_ocr_len,
+            }
+
+            # 版面检测 + OCR(仅 --detection 时)
+            if args.detection:
+                det = tester.test_detection(pdf_path, pages=pages, save_images_dir=detection_img_dir)
+                _save_ocr_region_images(pdf_path, det, table_img_dir, figure_img_dir, tester)
+                ocr = tester.test_ocr_recognition(pdf_path, pages=pages)
+                file_result.update({
+                    "pages": det.get("total_pages", 0),
+                    "table_count": det.get("table_count", 0),
+                    "figure_count": det.get("figure_count", 0),
+                    "ocr_success_rate": ocr.get("success_rate"),
+                    "ocr_content_rate": ocr.get("content_rate"),
+                    "ocr_avg_latency": ocr.get("latency_ms_avg"),
+                })
+
+            batch_results.append(file_result)
+            print(f"  → catalog={cat_chap}章, raw_ocr={cat_ocr_len}字符", flush=True)
+
+        except Exception as e:
+            print(f"  → 失败: {e}", flush=True)
+            batch_results.append({"file": pdf_path.name, "error": str(e)})
+
+    # 保存批量汇总 JSON
+    batch_json_path = results_dir / f"batch_{batch_timestamp}_summary.json"
+    with open(batch_json_path, "w", encoding="utf-8") as f:
+        json.dump(batch_results, f, ensure_ascii=False, indent=2)
+    print(f"\n  [保存] 批量汇总 → {batch_json_path}")
+
+    # 批量汇总报告
+    valid = [r for r in batch_results if "error" not in r]
+    errors = [r for r in batch_results if "error" in r]
+
+    if not args.json:
+        print("\n" + "=" * 90)
+        print("  批量测试汇总报告")
+        print("=" * 90)
+        print(f"  文件数: {len(batch_results)} (成功={len(valid)}, 失败={len(errors)})")
+        if valid:
+            total_chapters = sum(r.get("catalog_chapters", 0) for r in valid)
+            total_ocr_len = sum(r.get("raw_ocr_length", 0) for r in valid)
+            print(f"\n  目录提取统计:")
+            print(f"    总章数:       {total_chapters}")
+            print(f"    总OCR字符数:  {total_ocr_len}")
+
+            print(f"\n  逐文件:")
+            print(f"    {'文件':40s} {'章数':>6s} {'OCR字符':>8s}")
+            print(f"    {'-'*55}")
+            for r in valid:
+                name = r["file"][:38] + ".." if len(r["file"]) > 38 else r["file"]
+                print(f"    {name:40s} {r.get('catalog_chapters', 0):5d} {r.get('raw_ocr_length', 0):7d}")
+
+            # --detection 时额外输出检测统计
+            if valid[0].get("table_count") is not None:
+                total_tables = sum(r["table_count"] for r in valid)
+                total_figures = sum(r["figure_count"] for r in valid)
+                ocr_rates = [r["ocr_success_rate"] for r in valid if r["ocr_success_rate"] is not None]
+                ocr_latencies = [r["ocr_avg_latency"] for r in valid if r["ocr_avg_latency"] is not None]
+
+                print(f"\n  版面检测统计:")
+                print(f"    总表格数:     {total_tables}")
+                print(f"    总图片数:     {total_figures}")
+                if ocr_rates:
+                    print(f"    OCR成功率:     avg={statistics.mean(ocr_rates):.1f}%")
+                if ocr_latencies:
+                    print(f"    OCR延迟(ms):   avg={statistics.mean(ocr_latencies):.0f}")
+
+        if errors:
+            print(f"\n  失败文件:")
+            for e in errors:
+                print(f"    - {e['file']}: {e.get('error', '')}")
+        print()
+
+    if args.json:
+        print(json.dumps(batch_results, ensure_ascii=False, indent=2))
+
+    return 0
+
+
+if __name__ == "__main__":
+    sys.exit(main())