#!/usr/bin/env python # -*- coding: utf-8 -*- """ 独立最小化管线运行入口 用法: python run.py -p [-o <输出目录>] [--skip-tertiary] [--ocr] 示例: python utils_test/minimal_pipeline/run.py \ -p "D:/wx_work/sichuan_luqiao/lu_sgsc_testfile/测试模版.pdf" \ -o ./output \ --skip-tertiary """ import argparse import asyncio import json import os import sys import time from pathlib import Path # 添加项目根目录到路径(用于读取 config.ini 等,但本模块本身不依赖 core/foundation) PROJECT_ROOT = Path(__file__).parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT)) os.chdir(PROJECT_ROOT) from utils_test.minimal_pipeline import MinimalPipeline from utils_test.minimal_pipeline.models import PipelineResult def parse_args(): parser = argparse.ArgumentParser(description="独立最小化文档处理管线") parser.add_argument("-p", "--pdf", required=True, help="PDF 文件路径") parser.add_argument("-o", "--output", default="./output", help="输出目录(默认 ./output)") parser.add_argument("--skip-tertiary", action="store_true", help="跳过三级分类(节省 LLM 调用)") parser.add_argument("--api-key", default=os.environ.get("DASHSCOPE_API_KEY", ""), help="API Key(默认从环境变量 DASHSCOPE_API_KEY 读取)") parser.add_argument("--base-url", default="https://dashscope.aliyuncs.com/compatible-mode/v1", help="API Base URL") parser.add_argument("--model", default="qwen3.5-122b-a10b", help="模型名称") parser.add_argument("--csv", default=None, help="StandardCategoryTable.csv 路径(默认自动查找)") return parser.parse_args() def print_progress(stage: str, percent: int, message: str): """进度回调""" bar_len = 30 filled = int(bar_len * percent / 100) bar = "█" * filled + "░" * (bar_len - filled) print(f"\r[{bar}] {percent:3d}% | {stage:10s} | {message}", end="", flush=True) if percent >= 100: print() def print_result(result: PipelineResult, elapsed: float): """打印结果摘要""" print("\n" + "=" * 80) print("处理结果摘要") print("=" * 80) print(f"文档名称: {result.document_name}") print(f"总页数: {result.total_pages}") print(f"处理耗时: {elapsed:.2f} 秒") print(f"\n一级章节数: {len(result.primary_items)}") for item in result.primary_items: print(f" [{item.category_code:15s}] {item.title}") print(f"\nChunks 数: {len(result.chunks)}") for chunk in result.chunks[:5]: print(f" {chunk.chunk_id} | {chunk.section_label} | " f"一级={chunk.first_name} 二级={chunk.secondary_category_cn} " f"三级={chunk.tertiary_category_cn}") if len(result.chunks) > 5: print(f" ... 共 {len(result.chunks)} 个 chunks") print(f"\n质量检查:") qc = result.quality_check l1 = qc.get("l1_chapter_quality", {}) l2 = qc.get("l2_subsection_quality", {}) print(f" 一级提取率: {l1.get('extraction_rate', 0):.1f}% ({l1.get('extracted_count', 0)}/{l1.get('expected_count', 0)})") print(f" 二级提取率: {l2.get('extraction_rate', 0):.1f}% ({l2.get('extracted_count', 0)}/{l2.get('expected_count', 0)})") print(f"\n分类统计:") for level, stats in result.stats.items(): if isinstance(stats, dict) and stats: print(f" {level}:") for cat, count in stats.items(): print(f" {cat}: {count}") print("=" * 80) def main(): args = parse_args() pdf_path = Path(args.pdf) if not pdf_path.exists(): print(f"[错误] PDF 文件不存在: {pdf_path}") return 1 if not args.api_key: print("[错误] 未提供 API Key。请通过 --api-key 参数或 DASHSCOPE_API_KEY 环境变量设置。") return 1 output_dir = Path(args.output) output_dir.mkdir(parents=True, exist_ok=True) print(f"[信息] 处理文档: {pdf_path}") print(f"[信息] 输出目录: {output_dir}") print(f"[信息] 模型: {args.model}") print(f"[信息] 跳过三级分类: {args.skip_tertiary}") print() # 读取 PDF with open(pdf_path, "rb") as f: file_content = f.read() # 初始化管线 pipeline = MinimalPipeline( api_key=args.api_key, base_url=args.base_url, model=args.model, concurrency=10, csv_path=args.csv, ) # 运行管线 start_time = time.time() try: result = asyncio.run(pipeline.process( file_content=file_content, file_name=pdf_path.name, skip_tertiary=args.skip_tertiary, progress_callback=print_progress, )) except Exception as e: print(f"\n[错误] 处理失败: {e}") import traceback traceback.print_exc() return 1 elapsed = time.time() - start_time # 打印结果 print_result(result, elapsed) # 保存结果 output_file = output_dir / f"{pdf_path.stem}_result.json" with open(output_file, "w", encoding="utf-8") as f: json.dump(result.to_dict(), f, ensure_ascii=False, indent=2) print(f"[信息] 结果已保存到: {output_file}") # 保存 chunks 明细 chunks_file = output_dir / f"{pdf_path.stem}_chunks.jsonl" with open(chunks_file, "w", encoding="utf-8") as f: for chunk in result.chunks: f.write(json.dumps({ "chunk_id": chunk.chunk_id, "section_label": chunk.section_label, "chapter_classification": chunk.chapter_classification, "first_name": chunk.first_name, "secondary_category_code": chunk.secondary_category_code, "secondary_category_cn": chunk.secondary_category_cn, "tertiary_category_code": chunk.tertiary_category_code, "tertiary_category_cn": chunk.tertiary_category_cn, "page_start": chunk.page_start, "page_end": chunk.page_end, "content_preview": chunk.review_chunk_content[:200] + "...", }, ensure_ascii=False) + "\n") print(f"[信息] Chunks 明细已保存到: {chunks_file}") return 0 if __name__ == "__main__": sys.exit(main())