Ver código fonte

feat: 施工方案专家意见和公司意见文件抽取

ai02 4 semanas atrás
pai
commit
b74a2fb656

+ 956 - 0
src/app/scripts/ceshi/03-施工方案筛选.py

@@ -0,0 +1,956 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+评审意见PDF文件筛选脚本 - 二级筛选+断点续传版
+
+功能说明:
+    从raw/670目录下的数字编号子目录中筛选评审意见PDF文件。
+
+    一级筛选(默认):
+    - 检查前15页是否包含"专家评审"或"公司评审"或"集团评审"
+
+    二级筛选(可选):
+    - 对一级筛选未找到的文件,检查前30页
+
+    每个目录可以选出两种评审意见,判断规则:
+    1. 先取文件大小排名前5的文件(top5)
+    2. 检查关键词(专家评审、公司/集团评审意见说明)
+    3. 如果top5中没找到,则将范围扩大到其余文件
+    4. 如果都没找到,记录为"无评审意见"
+    5. 如果多份文件都找到关键词,以创建时间最新的为准
+    6. 专家评审输出到output/expert_review目录
+    7. 公司/集团评审输出到output/company_review目录
+    8. 支持断点续传,在temp目录缓存处理进度
+
+输入:
+    - 源目录: raw/670/ (包含数字编号子目录,如1567、1569等)
+    - 子目录中的PDF文件名是UUID格式
+
+输出:
+    - 专家评审目录: output/expert_review/ (专家评审PDF)
+    - 公司评审目录: output/company_review/ (公司/集团评审PDF)
+    - 结果记录: output/评审筛选结果记录.xlsx
+    - 统计JSON: output/评审筛选统计.json
+    - 缓存文件: data_pipline/script/temp/评审筛选进度缓存.json
+
+作者: Claude
+日期: 2026-04-21
+"""
+
+import pandas as pd
+import json
+import sys
+import os
+import shutil
+import re
+import warnings
+from pathlib import Path
+from datetime import datetime
+from typing import List, Dict, Tuple, Optional, Set
+import PyPDF2
+from docx import Document
+
+# 抑制 PyPDF2 的警告
+warnings.filterwarnings('ignore', category=UserWarning, module='PyPDF2')
+warnings.filterwarnings('ignore', category=Warning)
+
+# ==================== 路径配置(可在文件首部直接修改)====================
+# 规则:
+# 1) 填绝对路径(如 E:/data/raw/670)则直接使用(Windows 建议用 / 或 \\)
+# 2) 填相对路径(如 ../../raw/670)则相对当前脚本目录解析
+SOURCE_DIR = r"F:\提供的原始文件\原始文件\100份"
+EXPERT_OUTPUT_DIR = r"F:\提供的原始文件\原始文件\PDF分类结果_服务器MinerU版\专家评审意见_记录"
+COMPANY_OUTPUT_DIR = r"F:\提供的原始文件\原始文件\PDF分类结果_服务器MinerU版\公司集团评审意见说明"
+TEMP_DIR = "temp"
+
+
+# 分批配置(仅用于统计显示,不创建子目录)
+BATCH_SIZE = 50  # 每批处理的目录数量(仅用于进度显示)
+
+# 关键词配置
+KEYWORDS = {
+    # 专家意见:必须命中“专家审查”相关表述(仅“评审/评估”不算专家审查)
+    "expert": [
+        "专家审查意见", "专家审查记录", "专家审查结论",
+        "专家审查说明", "专家审查建议", "专家审查纪要", "专家审查报告",
+        "专家审查审核表", "专家审查审查表",
+    ],
+    # 公司/集团:必须包含“公司”或“集团”主体表述
+    "company": ["公司评审意见", "集团评审意见", "公司审核意见", "集团审核意见", "公司审查意见", "集团审查意见"]  # 公司/集团关键词
+}
+
+# 更宽松但可控的规则,用于提升OCR/版式噪声下的命中率
+KEYWORD_PATTERNS = {
+    "expert": [
+        # 仅允许“审查”语义命中(避免把“专家评审/评估”误判为专家审查)
+        r"专家.{0,12}审查.{0,12}(意见|记录|结论|说明|建议|纪要|报告|审核表|审查表)",
+        r"审查.{0,10}专家.{0,12}(意见|记录|结论|说明|建议|纪要|报告|审核表|审查表)",
+    ],
+    "company": [
+        r"(公司|集团).{0,10}(评审|审核|审查).{0,10}(意见|说明|记录)",
+        r"(公司|集团).{0,10}施工方案.{0,10}(审核|审查|评审).{0,10}(意见|说明|记录)",
+    ],
+}
+
+# 筛选阶段配置
+PHASE_1_PAGES = 15  # 一级筛选:前15页
+PHASE_2_PAGES = 30  # 二级筛选:前30页
+
+
+def get_file_size(file_path: Path) -> int:
+    """获取文件大小(字节)"""
+    try:
+        return file_path.stat().st_size
+    except Exception:
+        return 0
+
+
+def get_file_creation_time(file_path: Path) -> float:
+    """获取文件创建时间(时间戳)"""
+    try:
+        return file_path.stat().st_ctime
+    except Exception:
+        return 0
+
+
+def _extract_pdf_text_worker(pdf_path_str: str, max_pages: int, result_queue):
+    """子进程工作函数:提取PDF文本并通过队列返回结果"""
+    import io
+    from contextlib import redirect_stderr, redirect_stdout
+
+    try:
+        stderr_capture = io.StringIO()
+        stdout_capture = io.StringIO()
+        extracted = ""
+
+        with redirect_stderr(stderr_capture), redirect_stdout(stdout_capture):
+            with open(pdf_path_str, 'rb') as f:
+                pdf_reader = PyPDF2.PdfReader(f)
+                num_pages = min(len(pdf_reader.pages), max_pages)
+                for i in range(num_pages):
+                    try:
+                        page = pdf_reader.pages[i]
+                        page_text = page.extract_text()
+                        if page_text:
+                            extracted += page_text + "\n"
+                    except Exception:
+                        continue
+
+        result_queue.put({"ok": True, "text": extracted})
+    except Exception as e:
+        result_queue.put({"ok": False, "error": str(e)})
+
+
+def extract_text_with_pages(pdf_path: Path, max_pages: int, timeout_seconds: int = 30) -> str:
+    """从PDF文件中提取文本内容(指定页数),带硬超时机制(子进程)"""
+    import multiprocessing as mp
+
+    text = ""
+    file_size_mb = pdf_path.stat().st_size / (1024 * 1024)
+
+    if file_size_mb > 50:
+        print(f"\n      [大文件 {file_size_mb:.1f}MB,读取中...]", end="", flush=True)
+
+    try:
+        # 使用spawn上下文,兼容Windows;子进程可被强制终止,避免线程卡死。
+        ctx = mp.get_context("spawn")
+        result_queue = ctx.Queue()
+        process = ctx.Process(
+            target=_extract_pdf_text_worker,
+            args=(str(pdf_path), max_pages, result_queue)
+        )
+        process.start()
+        process.join(timeout_seconds)
+
+        if process.is_alive():
+            process.terminate()
+            process.join(timeout=2)
+            print(f" [超时跳过]", end="", flush=True)
+            return ""
+
+        result = result_queue.get_nowait() if not result_queue.empty() else {"ok": False, "error": "子进程无返回结果"}
+
+        if not result.get("ok"):
+            error_msg = result.get("error", "")
+            if "PyCryptodome" in error_msg or "AES" in error_msg:
+                print(f" [加密PDF需PyCryptodome]", end="", flush=True)
+            elif "Password" in error_msg or "password" in error_msg:
+                print(f" [PDF加密需要密码]", end="", flush=True)
+            else:
+                print(f" [读取失败]", end="", flush=True)
+            return ""
+
+        text = result.get("text", "")
+    except Exception:
+        print(f" [读取错误]", end="", flush=True)
+
+    return text
+
+
+def check_pdf_contains_keywords_with_pages(pdf_path: Path, keywords: List[str], max_pages: int) -> Tuple[bool, str]:
+    """检查PDF文件指定页数是否包含任一关键词
+    
+    Returns:
+        (是否匹配, 匹配到的关键词)
+    """
+    text = extract_text_with_pages(pdf_path, max_pages=max_pages)
+    cleaned_text = re.sub(r'\s+', '', text)
+    
+    for keyword in keywords:
+        cleaned_keyword = re.sub(r'\s+', '', keyword)
+        if cleaned_keyword in cleaned_text:
+            return True, keyword
+    return False, ""
+
+
+def extract_docx_text(docx_path: Path, max_pages: int) -> str:
+    """提取DOCX文本(按段落近似页数限制)"""
+    try:
+        # Word没有固定分页信息,这里用“每页约40段”进行近似截断,避免读取过慢。
+        approx_max_paragraphs = max(1, max_pages * 40)
+        doc = Document(str(docx_path))
+        texts = []
+        for i, p in enumerate(doc.paragraphs):
+            if i >= approx_max_paragraphs:
+                break
+            if p.text:
+                texts.append(p.text)
+        return "\n".join(texts)
+    except Exception:
+        return ""
+
+
+def check_file_contains_keywords_with_pages(file_path: Path, keywords: List[str], max_pages: int) -> Tuple[bool, str]:
+    """检查文件(PDF/DOCX)指定范围内是否包含任一关键词"""
+    suffix = file_path.suffix.lower()
+    if suffix == ".pdf":
+        text = extract_text_with_pages(file_path, max_pages=max_pages)
+    elif suffix == ".docx":
+        text = extract_docx_text(file_path, max_pages=max_pages)
+    elif suffix == ".doc":
+        print(" [DOC暂不支持,跳过]", end="", flush=True)
+        return False, ""
+    else:
+        return False, ""
+
+    cleaned_text = re.sub(r'\s+', '', text)
+    # 全角半角符号、常见分隔符归一,减少OCR导致的漏检
+    cleaned_text = cleaned_text.replace("(", "(").replace(")", ")").replace(":", ":")
+    cleaned_text = cleaned_text.replace(",", ",").replace("。", ".").replace("、", "")
+
+    # 1) 先做精确关键词命中
+    for keyword in keywords:
+        cleaned_keyword = re.sub(r'\s+', '', keyword)
+        if cleaned_keyword in cleaned_text:
+            return True, keyword
+
+    # 2) 再做正则模式命中(容错字间噪声/词序变化)
+    # 按调用方review_type关键词集合推断模式
+    matched_review_type = None
+    if keywords == KEYWORDS.get("expert", []):
+        matched_review_type = "expert"
+    elif keywords == KEYWORDS.get("company", []):
+        matched_review_type = "company"
+
+    if matched_review_type:
+        for pattern in KEYWORD_PATTERNS.get(matched_review_type, []):
+            if re.search(pattern, cleaned_text, re.IGNORECASE):
+                return True, f"模式命中:{pattern}"
+
+    return False, ""
+
+
+def find_candidate_files_in_directory(directory: Path) -> List[Path]:
+    """查找目录中的候选文件(PDF/DOCX/DOC)"""
+    if not directory.exists() or not directory.is_dir():
+        return []
+    files = []
+    for pattern in ("*.pdf", "*.docx", "*.doc"):
+        files.extend(directory.glob(pattern))
+    return files
+
+
+def get_top5_by_size(files: List[Path]) -> List[Path]:
+    """获取文件大小排名前5的文件"""
+    if not files:
+        return []
+    sorted_files = sorted(files, key=lambda f: get_file_size(f), reverse=True)
+    return sorted_files[:5]
+
+
+def get_newest_file(files: List[Path]) -> Optional[Path]:
+    """获取创建时间最新的文件"""
+    if not files:
+        return None
+    return max(files, key=lambda f: get_file_creation_time(f))
+
+
+def process_single_directory_phase(dir_path: Path, phase: int, max_pages: int, review_type: str) -> Tuple[bool, str, Optional[Path], List[Path], str]:
+    """
+    处理单个目录的文件筛选(指定阶段和评审类型)
+
+    Args:
+        dir_path: 目录路径
+        phase: 阶段(1或2)
+        max_pages: 检查的最大页数
+        review_type: 评审类型 ('expert' 或 'company')
+
+    Returns:
+        (是否成功, 状态信息, 选中的文件路径, 所有包含关键词的文件列表, 匹配到的关键词)
+    """
+    candidate_files = find_candidate_files_in_directory(dir_path)
+
+    if not candidate_files:
+        return False, "目录中未找到PDF/Word文件", None, [], ""
+
+    keywords = KEYWORDS.get(review_type, [])
+    if not keywords:
+        return False, f"未知的评审类型: {review_type}", None, [], ""
+
+    # 获取Top5文件
+    top5_files = get_top5_by_size(candidate_files)
+
+    # 在Top5中搜索
+    matched_in_top5 = []
+    matched_keyword_top5 = ""
+    phase_str = f"【第{phase}阶段-{review_type}】"
+    print(f"\n  {phase_str} 目录: {dir_path.name} - 共{len(candidate_files)}个文件,检查前{max_pages}页,先检查Top5...")
+
+    for file_path in top5_files:
+        size_mb = get_file_size(file_path) / (1024 * 1024)
+        print(f"    检查Top5: {file_path.name[:30]}... (大小: {size_mb:.2f}MB)", end="", flush=True)
+
+        is_match, matched_kw = check_file_contains_keywords_with_pages(file_path, keywords, max_pages)
+        if is_match:
+            print(f" -> ✓ 包含关键词[{matched_kw}]")
+            matched_in_top5.append(file_path)
+            if not matched_keyword_top5:
+                matched_keyword_top5 = matched_kw
+        else:
+            print(" -> ✗ 无关键词")
+
+    # Top5中找到匹配
+    if matched_in_top5:
+        selected = get_newest_file(matched_in_top5)
+        if len(matched_in_top5) > 1:
+            return True, f"Top5中找到{len(matched_in_top5)}个匹配,选择最新", selected, matched_in_top5, matched_keyword_top5
+        return True, "Top5中找到匹配文件", selected, matched_in_top5, matched_keyword_top5
+
+    print(f"    Top5未找到,扩展到其余{len(candidate_files) - len(top5_files)}个文件...")
+
+    # 检查其余文件
+    other_files = [f for f in candidate_files if f not in top5_files]
+    matched_in_others = []
+    matched_keyword_others = ""
+
+    for file_path in other_files:
+        size_mb = get_file_size(file_path) / (1024 * 1024)
+        print(f"    检查其他: {file_path.name[:30]}... (大小: {size_mb:.2f}MB)", end="", flush=True)
+
+        is_match, matched_kw = check_file_contains_keywords_with_pages(file_path, keywords, max_pages)
+        if is_match:
+            print(f" -> ✓ 包含关键词[{matched_kw}]")
+            matched_in_others.append(file_path)
+            if not matched_keyword_others:
+                matched_keyword_others = matched_kw
+        else:
+            print(" -> ✗ 无关键词")
+
+    if matched_in_others:
+        selected = get_newest_file(matched_in_others)
+        if len(matched_in_others) > 1:
+            return True, f"其余文件中找到{len(matched_in_others)}个匹配,选择最新", selected, matched_in_others, matched_keyword_others
+        return True, "其余文件中找到匹配文件", selected, matched_in_others, matched_keyword_others
+
+    return False, "未找到包含关键词的文件", None, [], ""
+
+
+def copy_file_to_output(file_path: Path, output_dir: Path, new_name: str) -> Path:
+    """复制文件到输出目录"""
+    output_dir.mkdir(parents=True, exist_ok=True)
+    dest_path = output_dir / new_name
+    shutil.copy2(file_path, dest_path)
+    return dest_path
+
+
+def process_review_type(dir_path: Path, phase: int, max_pages: int, review_type: str, 
+                        output_dir: Path, dir_id: str, stats: dict, results: list) -> Tuple[bool, Optional[Path]]:
+    """处理单个评审类型的筛选和输出
+    
+    Returns:
+        (是否成功, 选中的文件路径)
+    """
+    success, message, selected_file, all_matched, matched_kw = process_single_directory_phase(
+        dir_path, phase=phase, max_pages=max_pages, review_type=review_type
+    )
+
+    if success and selected_file:
+        top5_files = get_top5_by_size(find_candidate_files_in_directory(dir_path))
+        from_top5 = selected_file in top5_files
+
+        if from_top5:
+            match_source = "Top5"
+        else:
+            match_source = "其他文件"
+
+        new_filename = f"{dir_id}_{selected_file.name}"
+        try:
+            dest_path = copy_file_to_output(selected_file, output_dir, new_filename)
+            print(f"    ✅ [{review_type}] 已输出: {new_filename}")
+
+            results.append({
+                '目录ID': dir_id,
+                '评审类型': review_type,
+                '阶段': f'第{phase}阶段',
+                '原文件名': selected_file.name,
+                '新文件名': new_filename,
+                '状态': '成功',
+                '匹配来源': match_source,
+                '匹配关键词': matched_kw,
+                '匹配文件数': len(all_matched),
+                '文件大小_MB': round(get_file_size(selected_file) / (1024*1024), 2),
+                '备注': message,
+                '原路径': str(selected_file),
+                '目标路径': str(dest_path),
+                '处理时间': datetime.now().isoformat()
+            })
+            return True, selected_file
+        except Exception as e:
+            print(f"    ❌ [{review_type}] 复制失败: {e}")
+            results.append({
+                '目录ID': dir_id,
+                '评审类型': review_type,
+                '阶段': f'第{phase}阶段',
+                '原文件名': selected_file.name,
+                '新文件名': '',
+                '状态': '复制失败',
+                '匹配来源': match_source,
+                '匹配关键词': matched_kw,
+                '匹配文件数': len(all_matched),
+                '文件大小_MB': round(get_file_size(selected_file) / (1024*1024), 2),
+                '备注': str(e),
+                '原路径': str(selected_file),
+                '目标路径': '',
+                '处理时间': datetime.now().isoformat()
+            })
+            return False, None
+    else:
+        print(f"    ❌ [{review_type}] {message}")
+        return False, None
+
+
+def get_numeric_directories(base_dir: Path) -> List[Path]:
+    """获取目录下所有纯数字命名的子目录(按数字排序)"""
+    if not base_dir.exists():
+        return []
+
+    numeric_dirs = []
+    for item in base_dir.iterdir():
+        if item.is_dir() and item.name.isdigit():
+            numeric_dirs.append(item)
+
+    numeric_dirs.sort(key=lambda d: int(d.name))
+    return numeric_dirs
+
+
+def print_source_dir_diagnostics(source_dir: Path):
+    """当源目录不符合预期时,输出排查信息"""
+    try:
+        if not source_dir.exists():
+            print(f"\n  诊断: 源目录不存在: {source_dir}")
+            return
+        if not source_dir.is_dir():
+            print(f"\n  诊断: 源路径不是目录: {source_dir}")
+            return
+
+        subdirs = [d for d in source_dir.iterdir() if d.is_dir()]
+        preview = [d.name for d in subdirs[:10]]
+        print(f"\n  诊断: 当前目录下子目录数量: {len(subdirs)}")
+        if preview:
+            print(f"  诊断: 子目录示例: {preview}")
+
+        candidate_paths = []
+        for d in subdirs:
+            numeric_count = sum(1 for c in d.iterdir() if c.is_dir() and c.name.isdigit())
+            if numeric_count > 0:
+                candidate_paths.append((d, numeric_count))
+
+        if candidate_paths:
+            candidate_paths.sort(key=lambda x: x[1], reverse=True)
+            print("  诊断: 可能应使用以下源目录(包含数字子目录):")
+            for p, cnt in candidate_paths[:5]:
+                print(f"    - {p}  (数字子目录约 {cnt} 个)")
+        else:
+            print("  诊断: 当前层及下一层未发现明显的数字命名子目录")
+    except Exception as e:
+        print(f"  诊断信息生成失败: {e}")
+
+
+# ==================== 断点续传相关函数 ====================
+
+def load_progress_cache(temp_dir: Path) -> Dict:
+    """加载进度缓存文件"""
+    cache_file = temp_dir / "评审筛选进度缓存.json"
+    if cache_file.exists():
+        try:
+            with open(cache_file, 'r', encoding='utf-8') as f:
+                return json.load(f)
+        except Exception as e:
+            print(f"  警告: 加载缓存失败: {e},将重新开始")
+    return {
+        "processed_dirs": [],
+        "phase1_no_match_expert": [],  # 一级筛选未找到专家评审的目录
+        "phase1_no_match_company": [],  # 一级筛选未找到公司评审的目录
+        "results": [],
+        "stats": {},
+        "phase": 1,  # 当前阶段
+        "last_update": None
+    }
+
+
+def save_progress_cache(temp_dir: Path, cache_data: Dict):
+    """保存进度缓存文件"""
+    temp_dir.mkdir(parents=True, exist_ok=True)
+    cache_file = temp_dir / "评审筛选进度缓存.json"
+    cache_data["last_update"] = datetime.now().isoformat()
+    with open(cache_file, 'w', encoding='utf-8') as f:
+        json.dump(cache_data, f, ensure_ascii=False, indent=2)
+
+
+def print_progress_bar(current: int, total: int, width: int = 40):
+    """打印进度条"""
+    progress = current / total
+    filled = int(width * progress)
+    bar = "█" * filled + "░" * (width - filled)
+    percent = progress * 100
+    print(f"\r  [{bar}] {percent:5.1f}% | {current}/{total}", end="", flush=True)
+
+
+def resolve_config_path(path_value: str, script_dir: Path) -> Path:
+    """按配置值解析路径:绝对路径直接使用,相对路径按脚本目录解析"""
+    path = Path(path_value)
+    if path.is_absolute():
+        return path
+    return (script_dir / path).resolve()
+
+
+def main():
+    """主函数"""
+    print("=" * 70)
+    print("评审意见PDF筛选脚本 - 二级筛选+断点续传版")
+    print("=" * 70)
+
+    # 按文件首部配置组装路径(不再按项目根目录拼接)
+    script_dir = Path(__file__).parent
+    source_base_dir = resolve_config_path(SOURCE_DIR, script_dir)
+    expert_output_dir = resolve_config_path(EXPERT_OUTPUT_DIR, script_dir)
+    company_output_dir = resolve_config_path(COMPANY_OUTPUT_DIR, script_dir)
+    temp_dir = resolve_config_path(TEMP_DIR, script_dir)
+
+    print(f"\n【配置信息】")
+    print(f"  源目录: {source_base_dir}")
+    print(f"  专家评审输出目录: {expert_output_dir}")
+    print(f"  公司评审输出目录: {company_output_dir}")
+    print(f"  缓存目录: {temp_dir}")
+    print(f"  专家评审关键词: {KEYWORDS['expert']}")
+    print(f"  公司评审关键词: {KEYWORDS['company']}")
+    print(f"  一级筛选: 前{PHASE_1_PAGES}页")
+    print(f"  二级筛选: 前{PHASE_2_PAGES}页")
+
+    # 检查 PyCryptodome 库
+    try:
+        from Crypto.Cipher import AES
+        print(f"  PyCryptodome: 已安装")
+    except ImportError:
+        print(f"\n  ⚠️  警告: 未安装 PyCryptodome 库,某些加密的PDF可能无法读取")
+        print(f"     建议运行: pip install pycryptodome")
+
+    # 获取所有数字编号子目录
+    print(f"\n【步骤 1/6】扫描源目录...")
+    numeric_dirs = get_numeric_directories(source_base_dir)
+
+    if not numeric_dirs:
+        print(f"\n  错误: 在 {source_base_dir} 中未找到数字编号的子目录")
+        print_source_dir_diagnostics(source_base_dir)
+        sys.exit(1)
+
+    print(f"  找到 {len(numeric_dirs)} 个数字编号子目录")
+
+    # 加载缓存(断点续传)
+    print(f"\n【步骤 2/6】加载进度缓存...")
+    cache = load_progress_cache(temp_dir)
+    processed_dirs = set(cache.get("processed_dirs", []))
+    phase1_no_match_expert = set(cache.get("phase1_no_match_expert", []))
+    phase1_no_match_company = set(cache.get("phase1_no_match_company", []))
+    current_phase = cache.get("phase", 1)
+
+    if processed_dirs:
+        print(f"  发现缓存:")
+        print(f"    - 已处理: {len(processed_dirs)} 个目录")
+        print(f"    - 专家评审一级未找到: {len(phase1_no_match_expert)} 个目录")
+        print(f"    - 公司评审一级未找到: {len(phase1_no_match_company)} 个目录")
+        print(f"    - 当前阶段: 第{current_phase}阶段")
+    else:
+        print(f"  无缓存,将从头开始处理")
+
+    # ==================== 一级筛选 ====================
+    if current_phase == 1:
+        print(f"\n{'='*70}")
+        print("【第1阶段】一级筛选(检查前15页)")
+        print('='*70)
+
+        dirs_to_process = [d for d in numeric_dirs if d.name not in processed_dirs]
+
+        if not dirs_to_process:
+            print(f"\n  所有目录已完成一级筛选!")
+            current_phase = 2
+        else:
+            print(f"\n【步骤 3/6】一级筛选处理(剩余 {len(dirs_to_process)} 个目录)...")
+            print(f"  专家评审输出到: {expert_output_dir}")
+            print(f"  公司评审输出到: {company_output_dir}")
+            print()
+
+            results = cache.get("results", [])
+
+            default_stats = {
+                "total_directories": len(numeric_dirs),
+                "expert_success_count": 0,
+                "company_success_count": 0,
+                "expert_phase2_success_count": 0,
+                "company_phase2_success_count": 0,
+                "expert_top5_match_count": 0,
+                "company_top5_match_count": 0,
+                "expert_others_match_count": 0,
+                "company_others_match_count": 0,
+                "expert_no_match_count": 0,
+                "company_no_match_count": 0
+            }
+            stats = cache.get("stats", {})
+            for key, value in default_stats.items():
+                if key not in stats:
+                    stats[key] = value
+
+            expert_success_count = 0
+            company_success_count = 0
+            total_to_process = len(dirs_to_process)
+
+            for idx, dir_path in enumerate(dirs_to_process):
+                dir_id = dir_path.name
+                overall_idx = len(processed_dirs) + idx + 1
+
+                print(f"\n[{overall_idx}/{len(numeric_dirs)}] 当前目录: {dir_id}")
+                print_progress_bar(idx + 1, total_to_process)
+
+                # 处理专家评审
+                expert_success, expert_file = process_review_type(
+                    dir_path, phase=1, max_pages=PHASE_1_PAGES, 
+                    review_type="expert", output_dir=expert_output_dir,
+                    dir_id=dir_id, stats=stats, results=results
+                )
+                if expert_success:
+                    stats["expert_success_count"] += 1
+                    expert_success_count += 1
+                else:
+                    phase1_no_match_expert.add(dir_id)
+
+                # 处理公司评审
+                company_success, company_file = process_review_type(
+                    dir_path, phase=1, max_pages=PHASE_1_PAGES, 
+                    review_type="company", output_dir=company_output_dir,
+                    dir_id=dir_id, stats=stats, results=results
+                )
+                if company_success:
+                    stats["company_success_count"] += 1
+                    company_success_count += 1
+                else:
+                    phase1_no_match_company.add(dir_id)
+
+                processed_dirs.add(dir_id)
+
+                if (idx + 1) % 10 == 0 or idx == len(dirs_to_process) - 1:
+                    cache_data = {
+                        "processed_dirs": list(processed_dirs),
+                        "phase1_no_match_expert": list(phase1_no_match_expert),
+                        "phase1_no_match_company": list(phase1_no_match_company),
+                        "results": results,
+                        "stats": stats,
+                        "phase": 1,
+                        "total_directories": len(numeric_dirs)
+                    }
+                    save_progress_cache(temp_dir, cache_data)
+                    print(f"\n    💾 进度已缓存 (已处理 {len(processed_dirs)}/{len(numeric_dirs)} 个目录)")
+
+            print(f"\n\n【一级筛选完成】")
+            print(f"  专家评审成功: {stats['expert_success_count']} 个")
+            print(f"  公司评审成功: {stats['company_success_count']} 个")
+            print(f"  专家评审未找到: {len(phase1_no_match_expert)} 个")
+            print(f"  公司评审未找到: {len(phase1_no_match_company)} 个")
+
+            # 询问是否进行二级筛选
+            total_no_match = len(phase1_no_match_expert.union(phase1_no_match_company))
+            print(f"\n{'='*70}")
+            print("是否进行二级筛选?")
+            print(f"  - 专家评审未找到: {len(phase1_no_match_expert)} 个目录")
+            print(f"  - 公司评审未找到: {len(phase1_no_match_company)} 个目录")
+            print(f"  - 扩大检查范围到前{PHASE_2_PAGES}页")
+            print('='*70)
+
+            while True:
+                try:
+                    user_input = input("请输入 (y/n): ").strip().lower()
+                    if user_input in ['y', 'yes', '是']:
+                        enable_phase2 = True
+                        break
+                    elif user_input in ['n', 'no', '否']:
+                        enable_phase2 = False
+                        break
+                    else:
+                        print("  请输入 y 或 n")
+                except KeyboardInterrupt:
+                    print("\n\n用户中断,进度已保存")
+                    sys.exit(0)
+
+            if not enable_phase2:
+                print("\n  跳过二级筛选,直接保存结果...")
+                current_phase = 3  # 跳过二级筛选,直接保存
+            else:
+                current_phase = 2
+                # 保存进入第二阶段的标记
+                cache_data = {
+                    "processed_dirs": list(processed_dirs),
+                    "phase1_no_match_expert": list(phase1_no_match_expert),
+                    "phase1_no_match_company": list(phase1_no_match_company),
+                    "results": results,
+                    "stats": stats,
+                    "phase": 2,
+                    "total_directories": len(numeric_dirs)
+                }
+                save_progress_cache(temp_dir, cache_data)
+
+    # ==================== 二级筛选 ====================
+    if current_phase == 2:
+        print(f"\n{'='*70}")
+        print("【第2阶段】二级筛选(检查前30页)")
+        print('='*70)
+
+        # 重新加载以获取最新状态
+        cache = load_progress_cache(temp_dir)
+        results = cache.get("results", [])
+        stats = cache.get("stats", {})
+        phase1_no_match_expert = set(cache.get("phase1_no_match_expert", []))
+        phase1_no_match_company = set(cache.get("phase1_no_match_company", []))
+
+        # 获取需要二级筛选的目录(专家评审或公司评审任一未找到)
+        phase2_dirs_expert = [d for d in numeric_dirs if d.name in phase1_no_match_expert]
+        phase2_dirs_company = [d for d in numeric_dirs if d.name in phase1_no_match_company]
+        all_phase2_dirs = set(phase2_dirs_expert + phase2_dirs_company)
+
+        if not all_phase2_dirs:
+            print(f"\n  没有需要二级筛选的目录")
+        else:
+            print(f"\n【步骤 4/6】二级筛选处理...")
+            print(f"  专家评审需二级筛选: {len(phase2_dirs_expert)} 个目录")
+            print(f"  公司评审需二级筛选: {len(phase2_dirs_company)} 个目录")
+
+            expert_phase2_success = 0
+            company_phase2_success = 0
+
+            # 处理专家评审二级筛选
+            if phase2_dirs_expert:
+                print(f"\n  --- 专家评审二级筛选 ---")
+                for idx, dir_path in enumerate(phase2_dirs_expert):
+                    dir_id = dir_path.name
+                    print(f"\n[{idx+1}/{len(phase2_dirs_expert)}] 专家评审二级筛选: {dir_id}")
+
+                    success, selected_file = process_review_type(
+                        dir_path, phase=2, max_pages=PHASE_2_PAGES, 
+                        review_type="expert", output_dir=expert_output_dir,
+                        dir_id=dir_id, stats=stats, results=results
+                    )
+                    if success:
+                        stats["expert_phase2_success_count"] = stats.get("expert_phase2_success_count", 0) + 1
+                        expert_phase2_success += 1
+                    phase1_no_match_expert.discard(dir_id)
+
+                    if (idx + 1) % 10 == 0 or idx == len(phase2_dirs_expert) - 1:
+                        cache_data = {
+                            "processed_dirs": list(processed_dirs),
+                            "phase1_no_match_expert": list(phase1_no_match_expert),
+                            "phase1_no_match_company": list(phase1_no_match_company),
+                            "results": results,
+                            "stats": stats,
+                            "phase": 2,
+                            "total_directories": len(numeric_dirs)
+                        }
+                        save_progress_cache(temp_dir, cache_data)
+
+            # 处理公司评审二级筛选
+            if phase2_dirs_company:
+                print(f"\n  --- 公司评审二级筛选 ---")
+                for idx, dir_path in enumerate(phase2_dirs_company):
+                    dir_id = dir_path.name
+                    print(f"\n[{idx+1}/{len(phase2_dirs_company)}] 公司评审二级筛选: {dir_id}")
+
+                    success, selected_file = process_review_type(
+                        dir_path, phase=2, max_pages=PHASE_2_PAGES, 
+                        review_type="company", output_dir=company_output_dir,
+                        dir_id=dir_id, stats=stats, results=results
+                    )
+                    if success:
+                        stats["company_phase2_success_count"] = stats.get("company_phase2_success_count", 0) + 1
+                        company_phase2_success += 1
+                    phase1_no_match_company.discard(dir_id)
+
+                    if (idx + 1) % 10 == 0 or idx == len(phase2_dirs_company) - 1:
+                        cache_data = {
+                            "processed_dirs": list(processed_dirs),
+                            "phase1_no_match_expert": list(phase1_no_match_expert),
+                            "phase1_no_match_company": list(phase1_no_match_company),
+                            "results": results,
+                            "stats": stats,
+                            "phase": 2,
+                            "total_directories": len(numeric_dirs)
+                        }
+                        save_progress_cache(temp_dir, cache_data)
+
+            print(f"\n\n【二级筛选完成】")
+            print(f"  专家评审二级筛选成功: {expert_phase2_success} 个")
+            print(f"  公司评审二级筛选成功: {company_phase2_success} 个")
+
+        current_phase = 3
+
+    # ==================== 保存最终结果 ====================
+    print(f"\n\n【步骤 5/6】保存最终结果...")
+    expert_output_dir.mkdir(parents=True, exist_ok=True)
+    company_output_dir.mkdir(parents=True, exist_ok=True)
+
+    # 重新加载最新结果
+    cache = load_progress_cache(temp_dir)
+    results = cache.get("results", [])
+    stats = cache.get("stats", {})
+
+    # 确保所有键存在
+    default_keys = [
+        "total_directories", "expert_success_count", "company_success_count",
+        "expert_phase2_success_count", "company_phase2_success_count",
+        "expert_top5_match_count", "company_top5_match_count",
+        "expert_others_match_count", "company_others_match_count",
+        "expert_no_match_count", "company_no_match_count"
+    ]
+    for key in default_keys:
+        if key not in stats:
+            stats[key] = 0
+
+    # 保存Excel结果记录
+    results_df = pd.DataFrame(results)
+    output_base_dir = expert_output_dir.parent
+    result_file = output_base_dir / '评审筛选结果记录.xlsx'
+    results_df.to_excel(result_file, index=False, engine='openpyxl')
+    print(f"  Excel记录已保存: {result_file}")
+
+    # 计算最终统计
+    expert_phase1 = stats.get("expert_success_count", 0)
+    expert_phase2 = stats.get("expert_phase2_success_count", 0)
+    company_phase1 = stats.get("company_success_count", 0)
+    company_phase2 = stats.get("company_phase2_success_count", 0)
+
+    # 基于最终结果反推“未采集到”的目录ID,避免阶段缓存集合被覆盖导致不准确
+    all_dir_ids = {d.name for d in numeric_dirs}
+    expert_success_ids = {
+        str(r.get("目录ID"))
+        for r in results
+        if r.get("评审类型") == "expert" and r.get("状态") == "成功"
+    }
+    company_success_ids = {
+        str(r.get("目录ID"))
+        for r in results
+        if r.get("评审类型") == "company" and r.get("状态") == "成功"
+    }
+    expert_missing_ids = sorted(list(all_dir_ids - expert_success_ids), key=lambda x: int(x))
+    company_missing_ids = sorted(list(all_dir_ids - company_success_ids), key=lambda x: int(x))
+
+    expert_missing_file = output_base_dir / "未采集到专家意见目录ID.txt"
+    company_missing_file = output_base_dir / "未采集到公司审核意见目录ID.txt"
+    with open(expert_missing_file, "w", encoding="utf-8") as f:
+        f.write("\n".join(expert_missing_ids))
+    with open(company_missing_file, "w", encoding="utf-8") as f:
+        f.write("\n".join(company_missing_ids))
+
+    final_stats = {
+        "start_time": cache.get("start_time", datetime.now().isoformat()),
+        "end_time": datetime.now().isoformat(),
+        "total_directories": len(numeric_dirs),
+        "processed_count": len(processed_dirs),
+        "expert": {
+            "phase1_success": expert_phase1,
+            "phase2_success": expert_phase2,
+            "total_success": expert_phase1 + expert_phase2,
+            "top5_match": stats.get("expert_top5_match_count", 0),
+            "others_match": stats.get("expert_others_match_count", 0)
+        },
+        "company": {
+            "phase1_success": company_phase1,
+            "phase2_success": company_phase2,
+            "total_success": company_phase1 + company_phase2,
+            "top5_match": stats.get("company_top5_match_count", 0),
+            "others_match": stats.get("company_others_match_count", 0)
+        },
+        "missing": {
+            "expert_missing_count": len(expert_missing_ids),
+            "company_missing_count": len(company_missing_ids),
+            "expert_missing_ids": expert_missing_ids,
+            "company_missing_ids": company_missing_ids,
+            "expert_missing_file": str(expert_missing_file),
+            "company_missing_file": str(company_missing_file),
+        },
+        "expert_output_dir": str(expert_output_dir),
+        "company_output_dir": str(company_output_dir)
+    }
+
+    stats_file = output_base_dir / '评审筛选统计.json'
+    with open(stats_file, 'w', encoding='utf-8') as f:
+        json.dump(final_stats, f, ensure_ascii=False, indent=2)
+    print(f"  JSON统计已保存: {stats_file}")
+
+    # 清理缓存文件
+    cache_file = temp_dir / "评审筛选进度缓存.json"
+    if cache_file.exists():
+        completed_cache = temp_dir / "评审筛选进度缓存_已完成.json"
+        if completed_cache.exists():
+            completed_cache.unlink()
+        shutil.move(cache_file, completed_cache)
+        print(f"  缓存已归档: {completed_cache}")
+
+    # 统计信息
+    print(f"\n【步骤 6/6】处理统计...")
+    print(f"  总计目录: {final_stats['total_directories']} 个")
+    print(f"\n  【专家评审】")
+    print(f"    一级筛选成功: {final_stats['expert']['phase1_success']} 个")
+    print(f"    二级筛选成功: {final_stats['expert']['phase2_success']} 个")
+    print(f"    总计成功: {final_stats['expert']['total_success']} 个")
+    print(f"\n  【公司/集团评审】")
+    print(f"    一级筛选成功: {final_stats['company']['phase1_success']} 个")
+    print(f"    二级筛选成功: {final_stats['company']['phase2_success']} 个")
+    print(f"    总计成功: {final_stats['company']['total_success']} 个")
+    print(f"\n  【未采集到目录ID】")
+    print(f"    专家意见缺失: {len(expert_missing_ids)} 个 -> {expert_missing_file}")
+    print(f"    公司审核意见缺失: {len(company_missing_ids)} 个 -> {company_missing_file}")
+
+    print("\n" + "=" * 70)
+    print("处理完成!")
+    print(f"专家评审结果: {expert_output_dir}")
+    print(f"公司评审结果: {company_output_dir}")
+    print("=" * 70)
+
+
+if __name__ == '__main__':
+    try:
+        main()
+    except KeyboardInterrupt:
+        print("\n\n⚠️  用户中断,进度已缓存,可重新运行脚本继续")
+        sys.exit(1)
+    except Exception as e:
+        print(f"\n\n❌ 发生错误: {e}")
+        import traceback
+        traceback.print_exc()
+        sys.exit(1)

+ 537 - 0
src/app/scripts/ceshi/03-施工方案筛选_服务器版.py

@@ -0,0 +1,537 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+评审意见文件筛选脚本 - 服务器并发版
+
+相对原始版本的改进:
+1. 目录级并发处理(多进程),充分利用服务器CPU
+2. 无交互参数化(适合nohup/screen/任务调度)
+3. 保留断点续跑能力,周期性增量写入缓存
+4. 保持原有筛选规则:Top5优先 + 其余补充 + 两阶段页数检查
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import logging
+import multiprocessing as mp
+import os
+import re
+import shutil
+import sys
+import warnings
+from concurrent.futures import ProcessPoolExecutor, as_completed
+from datetime import datetime
+from pathlib import Path
+from typing import Dict, List, Optional, Tuple
+
+import pandas as pd
+import PyPDF2
+from docx import Document
+
+warnings.filterwarnings("ignore", category=UserWarning, module="PyPDF2")
+warnings.filterwarnings("ignore", category=Warning)
+# pypdf/PyPDF2在部分PDF字体映射异常时会输出"unknown widths"噪声日志,降级为ERROR避免刷屏。
+logging.getLogger("PyPDF2").setLevel(logging.ERROR)
+logging.getLogger("pypdf").setLevel(logging.ERROR)
+
+KEYWORDS = {
+    "expert": [
+        "专家评审意见", "专家评审记录", "专家评审结论",
+        "专家评估意见", "专家评估记录", "专家评估结论",
+        "专家审查意见", "专家审查记录", "专家审查结论",
+        "专家评审说明", "专家评估说明", "专家审查说明",
+        "专家评审建议", "专家评估建议", "专家审查建议",
+        "专家评审纪要", "专家评估纪要", "专家审查纪要",
+        "专家评审报告", "专家评估报告", "专家审查报告",
+        "专家评审审核表", "专家评估审核表", "专家审查审核表",
+        "专家评审审查表", "专家评估审查表", "专家审查审查表",
+    ],
+    "company": ["公司评审意见", "集团评审意见", "施工方案审核意见"],
+}
+
+KEYWORD_PATTERNS = {
+    "expert": [
+        r"专家(评审|评估|审查).{0,12}(意见|记录|结论|说明|建议|纪要|报告|审核表|审查表)",
+        r"(评审|评估|审查).{0,10}专家.{0,12}(意见|记录|结论|说明|建议|纪要|报告|审核表|审查表)",
+    ],
+    "company": [
+        r"(公司|集团|项目公司).{0,10}(评审|审核|审查).{0,10}(意见|说明|记录)",
+        r"施工方案.{0,10}(审核|审查|评审).{0,10}(意见|说明|记录)",
+    ],
+}
+
+
+def _extract_pdf_text_worker(pdf_path_str: str, max_pages: int, result_queue: mp.Queue) -> None:
+    try:
+        text = extract_pdf_text_core(Path(pdf_path_str), max_pages=max_pages)
+        result_queue.put({"ok": True, "text": text})
+    except Exception as e:
+        result_queue.put({"ok": False, "error": str(e)})
+
+
+def get_file_size(file_path: Path) -> int:
+    try:
+        return file_path.stat().st_size
+    except Exception:
+        return 0
+
+
+def get_file_creation_time(file_path: Path) -> float:
+    try:
+        return file_path.stat().st_ctime
+    except Exception:
+        return 0
+
+
+def normalize_text(text: str) -> str:
+    cleaned = re.sub(r"\s+", "", text)
+    cleaned = cleaned.replace("(", "(").replace(")", ")").replace(":", ":")
+    cleaned = cleaned.replace(",", ",").replace("。", ".").replace("、", "")
+    return cleaned
+
+
+def extract_pdf_text_core(pdf_path: Path, max_pages: int) -> str:
+    try:
+        with open(pdf_path, "rb") as f:
+            reader = PyPDF2.PdfReader(f)
+            pages = min(len(reader.pages), max_pages)
+            chunks: List[str] = []
+            for idx in range(pages):
+                try:
+                    t = reader.pages[idx].extract_text()
+                    if t:
+                        chunks.append(t)
+                except Exception:
+                    continue
+            return "\n".join(chunks)
+    except Exception:
+        return ""
+
+
+def extract_pdf_text(pdf_path: Path, max_pages: int, timeout_seconds: int) -> str:
+    """带超时保护的PDF提取,防止单文件卡死整个批次。"""
+    ctx = mp.get_context("spawn")
+    q: mp.Queue = ctx.Queue()
+    p = ctx.Process(target=_extract_pdf_text_worker, args=(str(pdf_path), max_pages, q))
+    p.start()
+    p.join(timeout_seconds)
+    if p.is_alive():
+        p.terminate()
+        p.join(timeout=2)
+        print(f"[WARN] PDF解析超时({timeout_seconds}s): {pdf_path}")
+        return ""
+    if q.empty():
+        return ""
+    result = q.get()
+    if not result.get("ok"):
+        return ""
+    return result.get("text", "")
+
+
+def extract_docx_text(docx_path: Path, max_pages: int) -> str:
+    try:
+        approx_max_paragraphs = max(1, max_pages * 40)
+        doc = Document(str(docx_path))
+        chunks: List[str] = []
+        for i, p in enumerate(doc.paragraphs):
+            if i >= approx_max_paragraphs:
+                break
+            if p.text:
+                chunks.append(p.text)
+        return "\n".join(chunks)
+    except Exception:
+        return ""
+
+
+def check_contains_keywords(file_path: Path, review_type: str, max_pages: int, pdf_timeout_seconds: int) -> Tuple[bool, str]:
+    suffix = file_path.suffix.lower()
+    if suffix == ".pdf":
+        text = extract_pdf_text(file_path, max_pages=max_pages, timeout_seconds=pdf_timeout_seconds)
+    elif suffix == ".docx":
+        text = extract_docx_text(file_path, max_pages=max_pages)
+    elif suffix == ".doc":
+        return False, ""
+    else:
+        return False, ""
+
+    cleaned_text = normalize_text(text)
+    keywords = KEYWORDS[review_type]
+
+    for kw in keywords:
+        if normalize_text(kw) in cleaned_text:
+            return True, kw
+
+    for pattern in KEYWORD_PATTERNS.get(review_type, []):
+        if re.search(pattern, cleaned_text, re.IGNORECASE):
+            return True, f"模式命中:{pattern}"
+    return False, ""
+
+
+def find_candidate_files(directory: Path) -> List[Path]:
+    if not directory.exists() or not directory.is_dir():
+        return []
+    files: List[Path] = []
+    for pattern in ("*.pdf", "*.docx", "*.doc"):
+        files.extend(directory.glob(pattern))
+    return files
+
+
+def get_top5_by_size(files: List[Path]) -> List[Path]:
+    return sorted(files, key=get_file_size, reverse=True)[:5]
+
+
+def get_newest_file(files: List[Path]) -> Optional[Path]:
+    if not files:
+        return None
+    return max(files, key=get_file_creation_time)
+
+
+def pick_review_file(dir_path: Path, review_type: str, max_pages: int, pdf_timeout_seconds: int) -> Tuple[bool, str, Optional[Path], List[Path], str]:
+    files = find_candidate_files(dir_path)
+    if not files:
+        return False, "目录中未找到PDF/DOCX文件", None, [], ""
+
+    print(f"[{dir_path.name}][{review_type}] 候选文件 {len(files)} 个,先检查Top5...")
+    top5 = get_top5_by_size(files)
+    matched_top5: List[Path] = []
+    matched_kw = ""
+
+    for idx, f in enumerate(top5, start=1):
+        print(f"[{dir_path.name}][{review_type}] Top5进度 {idx}/{len(top5)}: {f.name}")
+        ok, kw = check_contains_keywords(f, review_type, max_pages=max_pages, pdf_timeout_seconds=pdf_timeout_seconds)
+        if ok:
+            matched_top5.append(f)
+            if not matched_kw:
+                matched_kw = kw
+
+    if matched_top5:
+        selected = get_newest_file(matched_top5)
+        return True, "Top5命中", selected, matched_top5, matched_kw
+
+    others = [f for f in files if f not in top5]
+    matched_others: List[Path] = []
+    if others:
+        print(f"[{dir_path.name}][{review_type}] 开始检查剩余文件 {len(others)} 个...")
+    for idx, f in enumerate(others, start=1):
+        print(f"[{dir_path.name}][{review_type}] 其余进度 {idx}/{len(others)}: {f.name}")
+        ok, kw = check_contains_keywords(f, review_type, max_pages=max_pages, pdf_timeout_seconds=pdf_timeout_seconds)
+        if ok:
+            matched_others.append(f)
+            if not matched_kw:
+                matched_kw = kw
+
+    if matched_others:
+        selected = get_newest_file(matched_others)
+        return True, "其他文件命中", selected, matched_others, matched_kw
+    return False, "未找到包含关键词的文件", None, [], ""
+
+
+def process_one_directory_task(
+    dir_path_str: str,
+    phase1_pages: int,
+    phase2_pages: int,
+    do_phase2: bool,
+    pdf_timeout_seconds: int,
+) -> Dict:
+    dir_path = Path(dir_path_str)
+    dir_id = dir_path.name
+    row_results: List[Dict] = []
+    print(f"[{dir_id}] 开始处理目录")
+
+    for review_type in ("expert", "company"):
+        print(f"[{dir_id}] 开始处理类型: {review_type}")
+        ok1, msg1, selected1, all_matched1, kw1 = pick_review_file(
+            dir_path, review_type, max_pages=phase1_pages, pdf_timeout_seconds=pdf_timeout_seconds
+        )
+        if ok1 and selected1:
+            print(
+                f"[{dir_id}][{review_type}] 命中({msg1}) 选择文件: {selected1.name} 关键词: {kw1} 匹配数: {len(all_matched1)}"
+            )
+            row_results.append(
+                {
+                    "目录ID": dir_id,
+                    "评审类型": review_type,
+                    "阶段": "第1阶段",
+                    "状态": "成功",
+                    "备注": msg1,
+                    "匹配关键词": kw1,
+                    "匹配文件数": len(all_matched1),
+                    "原路径": str(selected1),
+                    "原文件名": selected1.name,
+                }
+            )
+            continue
+
+        if do_phase2:
+            ok2, msg2, selected2, all_matched2, kw2 = pick_review_file(
+                dir_path, review_type, max_pages=phase2_pages, pdf_timeout_seconds=pdf_timeout_seconds
+            )
+            if ok2 and selected2:
+                print(
+                    f"[{dir_id}][{review_type}] 命中({msg2}) 选择文件: {selected2.name} 关键词: {kw2} 匹配数: {len(all_matched2)}"
+                )
+                row_results.append(
+                    {
+                        "目录ID": dir_id,
+                        "评审类型": review_type,
+                        "阶段": "第2阶段",
+                        "状态": "成功",
+                        "备注": msg2,
+                        "匹配关键词": kw2,
+                        "匹配文件数": len(all_matched2),
+                        "原路径": str(selected2),
+                        "原文件名": selected2.name,
+                    }
+                )
+                continue
+
+            last_message = msg2
+        else:
+            last_message = msg1
+
+        print(f"[{dir_id}][{review_type}] 未找到: {last_message}")
+        row_results.append(
+            {
+                "目录ID": dir_id,
+                "评审类型": review_type,
+                "阶段": "第2阶段" if do_phase2 else "第1阶段",
+                "状态": "未找到",
+                "备注": last_message,
+                "匹配关键词": "",
+                "匹配文件数": 0,
+                "原路径": "",
+                "原文件名": "",
+            }
+        )
+
+    print(f"[{dir_id}] 目录处理结束")
+    return {"dir_id": dir_id, "results": row_results}
+
+
+def load_cache(cache_file: Path) -> Dict:
+    if cache_file.exists():
+        try:
+            return json.loads(cache_file.read_text(encoding="utf-8"))
+        except Exception:
+            pass
+    return {"processed_dirs": [], "results": [], "start_time": datetime.now().isoformat()}
+
+
+def save_cache(cache_file: Path, data: Dict) -> None:
+    cache_file.parent.mkdir(parents=True, exist_ok=True)
+    data["last_update"] = datetime.now().isoformat()
+    cache_file.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
+
+
+def copy_outputs(results: List[Dict], expert_dir: Path, company_dir: Path) -> List[Dict]:
+    expert_dir.mkdir(parents=True, exist_ok=True)
+    company_dir.mkdir(parents=True, exist_ok=True)
+    final_rows: List[Dict] = []
+
+    for row in results:
+        out = dict(row)
+        if row.get("状态") != "成功":
+            out["新文件名"] = ""
+            out["目标路径"] = ""
+            out["处理时间"] = datetime.now().isoformat()
+            final_rows.append(out)
+            continue
+
+        src = Path(row["原路径"])
+        target_dir = expert_dir if row["评审类型"] == "expert" else company_dir
+        new_name = f"{row['目录ID']}_{src.name}"
+        dst = target_dir / new_name
+        try:
+            shutil.copy2(src, dst)
+            out["新文件名"] = new_name
+            out["目标路径"] = str(dst)
+        except Exception as e:
+            out["状态"] = "复制失败"
+            out["备注"] = str(e)
+            out["新文件名"] = ""
+            out["目标路径"] = ""
+        out["处理时间"] = datetime.now().isoformat()
+        final_rows.append(out)
+    return final_rows
+
+
+def get_numeric_dirs(source_dir: Path) -> List[Path]:
+    if not source_dir.exists():
+        return []
+    dirs = [d for d in source_dir.iterdir() if d.is_dir() and d.name.isdigit()]
+    dirs.sort(key=lambda d: int(d.name))
+    return dirs
+
+
+def parse_args() -> argparse.Namespace:
+    parser = argparse.ArgumentParser(description="评审意见筛选 - 服务器并发版")
+    parser.add_argument("--source-dir", required=True, help="源目录(包含数字子目录)")
+    parser.add_argument("--expert-output-dir", required=True, help="专家评审输出目录")
+    parser.add_argument("--company-output-dir", required=True, help="公司/集团评审输出目录")
+    parser.add_argument("--temp-dir", default="./temp_server", help="缓存目录")
+    parser.add_argument("--phase1-pages", type=int, default=15, help="一级筛选页数")
+    parser.add_argument("--phase2-pages", type=int, default=30, help="二级筛选页数")
+    parser.add_argument("--disable-phase2", action="store_true", help="禁用二级筛选")
+    parser.add_argument("--workers", type=int, default=max(1, (os.cpu_count() or 4) - 1), help="并发进程数")
+    parser.add_argument("--save-every", type=int, default=20, help="每处理N个目录写一次缓存")
+    parser.add_argument("--pdf-timeout-seconds", type=int, default=45, help="单个PDF解析超时秒数")
+    parser.add_argument("--report-dir", default="", help="结果报表目录(默认使用expert输出目录的上级)")
+    parser.add_argument("--retry-failed", action="store_true", help="重跑历史状态为处理异常的目录")
+    parser.add_argument("--copy-as-you-go", action="store_true", help="每完成一个目录就复制输出文件(便于实时看到结果)")
+    return parser.parse_args()
+
+
+def main() -> None:
+    args = parse_args()
+    source_dir = Path(args.source_dir).expanduser().resolve()
+    expert_output_dir = Path(args.expert_output_dir).expanduser().resolve()
+    company_output_dir = Path(args.company_output_dir).expanduser().resolve()
+    temp_dir = Path(args.temp_dir).expanduser().resolve()
+    cache_file = temp_dir / "评审筛选进度缓存_服务器版.json"
+    report_dir = (
+        Path(args.report_dir).expanduser().resolve()
+        if args.report_dir.strip()
+        else expert_output_dir.parent
+    )
+
+    do_phase2 = not args.disable_phase2
+    copy_as_you_go = bool(getattr(args, "copy_as_you_go", False))
+
+    numeric_dirs = get_numeric_dirs(source_dir)
+    if not numeric_dirs:
+        print(f"错误: 未找到数字子目录: {source_dir}")
+        sys.exit(1)
+
+    cache = load_cache(cache_file)
+    processed = set(cache.get("processed_dirs", []))
+    all_results: List[Dict] = cache.get("results", [])
+    failed_dir_ids = {
+        str(r.get("目录ID"))
+        for r in all_results
+        if r.get("状态") == "处理异常"
+    }
+    if args.retry_failed and failed_dir_ids:
+        processed = {d for d in processed if d not in failed_dir_ids}
+    pending_dirs = [d for d in numeric_dirs if d.name not in processed]
+
+    print("=" * 70)
+    print("评审意见筛选脚本 - 服务器并发版")
+    print("=" * 70)
+    print(f"源目录: {source_dir}")
+    print(f"总目录数: {len(numeric_dirs)}")
+    print(f"待处理: {len(pending_dirs)}")
+    print(f"并发进程数: {args.workers}")
+    print(f"PDF超时秒数: {args.pdf_timeout_seconds}")
+    print(f"二级筛选: {'开启' if do_phase2 else '关闭'}")
+    print(f"缓存文件: {cache_file}")
+    print(f"报表目录: {report_dir}")
+    print(f"失败重跑: {'开启' if args.retry_failed else '关闭'}")
+    print(f"实时复制: {'开启' if copy_as_you_go else '关闭'}")
+
+    final_rows_accum: List[Dict] = []
+
+    if not pending_dirs:
+        print("无需处理,直接输出结果。")
+    else:
+        print(f"开始并发处理目录,共 {len(pending_dirs)} 个...")
+        mp_ctx = mp.get_context("spawn")
+        with ProcessPoolExecutor(max_workers=args.workers, mp_context=mp_ctx) as pool:
+            futures = {
+                pool.submit(
+                    process_one_directory_task,
+                    str(d),
+                    args.phase1_pages,
+                    args.phase2_pages,
+                    do_phase2,
+                    args.pdf_timeout_seconds,
+                ): d.name
+                for d in pending_dirs
+            }
+
+            done_count = 0
+            total = len(pending_dirs)
+            for fut in as_completed(futures):
+                done_count += 1
+                dir_id = futures[fut]
+                try:
+                    payload = fut.result()
+                    all_results.extend(payload["results"])
+                    processed.add(dir_id)
+                    if copy_as_you_go:
+                        # 只对本目录结果做复制与增强字段,避免等到全部结束才看到输出文件
+                        final_rows_accum.extend(copy_outputs(payload["results"], expert_output_dir, company_output_dir))
+                except Exception as e:
+                    all_results.append(
+                        {
+                            "目录ID": dir_id,
+                            "评审类型": "all",
+                            "阶段": "第0阶段",
+                            "状态": "处理异常",
+                            "备注": str(e),
+                            "匹配关键词": "",
+                            "匹配文件数": 0,
+                            "原路径": "",
+                            "原文件名": "",
+                        }
+                    )
+
+                if done_count % args.save_every == 0 or done_count == total:
+                    save_cache(cache_file, {"processed_dirs": sorted(processed), "results": all_results, "start_time": cache.get("start_time")})
+                print(f"[{done_count}/{total}] 完成目录 {dir_id}")
+
+    final_rows = final_rows_accum if copy_as_you_go else copy_outputs(all_results, expert_output_dir, company_output_dir)
+    df = pd.DataFrame(final_rows)
+
+    output_base = report_dir
+    output_base.mkdir(parents=True, exist_ok=True)
+    excel_file = output_base / "评审筛选结果记录_服务器版.xlsx"
+    json_file = output_base / "评审筛选统计_服务器版.json"
+    df.to_excel(excel_file, index=False, engine="openpyxl")
+
+    all_ids = {d.name for d in numeric_dirs}
+    expert_success_ids = {str(r["目录ID"]) for r in final_rows if r.get("评审类型") == "expert" and r.get("状态") == "成功"}
+    company_success_ids = {str(r["目录ID"]) for r in final_rows if r.get("评审类型") == "company" and r.get("状态") == "成功"}
+    expert_missing = sorted(list(all_ids - expert_success_ids), key=lambda x: int(x))
+    company_missing = sorted(list(all_ids - company_success_ids), key=lambda x: int(x))
+    both_missing = sorted(list(all_ids - (expert_success_ids | company_success_ids)), key=lambda x: int(x))
+
+    expert_missing_file = output_base / "无专家审查意见目录ID.txt"
+    both_missing_file = output_base / "既无专家也无公司集团审查意见目录ID.txt"
+    expert_missing_file.write_text("\n".join(expert_missing), encoding="utf-8")
+    both_missing_file.write_text("\n".join(both_missing), encoding="utf-8")
+
+    stats = {
+        "start_time": cache.get("start_time"),
+        "end_time": datetime.now().isoformat(),
+        "total_directories": len(numeric_dirs),
+        "expert_success": len(expert_success_ids),
+        "company_success": len(company_success_ids),
+        "expert_missing_count": len(expert_missing),
+        "company_missing_count": len(company_missing),
+        "both_missing_count": len(both_missing),
+        "expert_missing_ids": expert_missing,
+        "company_missing_ids": company_missing,
+        "both_missing_ids": both_missing,
+        "expert_missing_file": str(expert_missing_file),
+        "both_missing_file": str(both_missing_file),
+        "excel_file": str(excel_file),
+        "expert_output_dir": str(expert_output_dir),
+        "company_output_dir": str(company_output_dir),
+    }
+    json_file.write_text(json.dumps(stats, ensure_ascii=False, indent=2), encoding="utf-8")
+
+    save_cache(cache_file, {"processed_dirs": sorted({d.name for d in numeric_dirs}), "results": final_rows, "start_time": cache.get("start_time")})
+
+    print("=" * 70)
+    print("处理完成")
+    print(f"Excel: {excel_file}")
+    print(f"统计JSON: {json_file}")
+    print(f"无专家审查意见目录ID: {expert_missing_file}")
+    print(f"既无专家也无公司集团审查意见目录ID: {both_missing_file}")
+    print("=" * 70)
+
+
+if __name__ == "__main__":
+    main()