| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 独立最小化管线运行入口
- 用法:
- python run.py -p <pdf路径> [-o <输出目录>] [--skip-tertiary] [--ocr]
- 示例:
- python utils_test/minimal_pipeline/run.py \
- -p "D:/wx_work/sichuan_luqiao/lu_sgsc_testfile/测试模版.pdf" \
- -o ./output \
- --skip-tertiary
- """
- import argparse
- import asyncio
- import json
- import os
- import sys
- import time
- from pathlib import Path
- PROJECT_ROOT = Path(__file__).parent.parent.parent
- os.chdir(PROJECT_ROOT)
- from utils_test.minimal_pipeline import MinimalPipeline
- from utils_test.minimal_pipeline.models import PipelineResult
- def parse_args():
- parser = argparse.ArgumentParser(description="独立最小化文档处理管线")
- parser.add_argument("-p", "--pdf", required=True, help="PDF 文件路径")
- parser.add_argument("-o", "--output", default="./output", help="输出目录(默认 ./output)")
- parser.add_argument("--skip-tertiary", action="store_true", help="跳过三级分类(节省 LLM 调用)")
- parser.add_argument("--api-key", default=os.environ.get("DASHSCOPE_API_KEY", ""), help="API Key(默认从环境变量 DASHSCOPE_API_KEY 读取)")
- parser.add_argument("--base-url", default="https://dashscope.aliyuncs.com/compatible-mode/v1", help="API Base URL")
- parser.add_argument("--model", default="qwen3.5-122b-a10b", help="模型名称")
- parser.add_argument("--csv", default=None, help="StandardCategoryTable.csv 路径(默认自动查找)")
- return parser.parse_args()
- def print_progress(stage: str, percent: int, message: str):
- """进度回调"""
- bar_len = 30
- filled = int(bar_len * percent / 100)
- bar = "█" * filled + "░" * (bar_len - filled)
- print(f"\r[{bar}] {percent:3d}% | {stage:10s} | {message}", end="", flush=True)
- if percent >= 100:
- print()
- def print_result(result: PipelineResult, elapsed: float):
- """打印结果摘要"""
- print("\n" + "=" * 80)
- print("处理结果摘要")
- print("=" * 80)
- print(f"文档名称: {result.document_name}")
- print(f"总页数: {result.total_pages}")
- print(f"处理耗时: {elapsed:.2f} 秒")
- print(f"\n一级章节数: {len(result.primary_items)}")
- for item in result.primary_items:
- print(f" [{item.category_code:15s}] {item.title}")
- print(f"\nChunks 数: {len(result.chunks)}")
- for chunk in result.chunks[:5]:
- print(f" {chunk.chunk_id} | {chunk.section_label} | "
- f"一级={chunk.first_name} 二级={chunk.secondary_category_cn} "
- f"三级={chunk.tertiary_category_cn}")
- if len(result.chunks) > 5:
- print(f" ... 共 {len(result.chunks)} 个 chunks")
- print(f"\n质量检查:")
- qc = result.quality_check
- l1 = qc.get("l1_chapter_quality", {})
- l2 = qc.get("l2_subsection_quality", {})
- print(f" 一级提取率: {l1.get('extraction_rate', 0):.1f}% ({l1.get('extracted_count', 0)}/{l1.get('expected_count', 0)})")
- print(f" 二级提取率: {l2.get('extraction_rate', 0):.1f}% ({l2.get('extracted_count', 0)}/{l2.get('expected_count', 0)})")
- print(f"\n分类统计:")
- for level, stats in result.stats.items():
- if isinstance(stats, dict) and stats:
- print(f" {level}:")
- for cat, count in stats.items():
- print(f" {cat}: {count}")
- print("=" * 80)
- def main():
- args = parse_args()
- pdf_path = Path(args.pdf)
- if not pdf_path.exists():
- print(f"[错误] PDF 文件不存在: {pdf_path}")
- return 1
- if not args.api_key:
- print("[错误] 未提供 API Key。请通过 --api-key 参数或 DASHSCOPE_API_KEY 环境变量设置。")
- return 1
- output_dir = Path(args.output)
- output_dir.mkdir(parents=True, exist_ok=True)
- print(f"[信息] 处理文档: {pdf_path}")
- print(f"[信息] 输出目录: {output_dir}")
- print(f"[信息] 模型: {args.model}")
- print(f"[信息] 跳过三级分类: {args.skip_tertiary}")
- print()
- # 读取 PDF
- with open(pdf_path, "rb") as f:
- file_content = f.read()
- # 初始化管线
- pipeline = MinimalPipeline(
- api_key=args.api_key,
- base_url=args.base_url,
- model=args.model,
- concurrency=10,
- csv_path=args.csv,
- )
- # 运行管线
- start_time = time.time()
- try:
- result = asyncio.run(pipeline.process(
- file_content=file_content,
- file_name=pdf_path.name,
- skip_tertiary=args.skip_tertiary,
- progress_callback=print_progress,
- ))
- except Exception as e:
- print(f"\n[错误] 处理失败: {e}")
- import traceback
- traceback.print_exc()
- return 1
- elapsed = time.time() - start_time
- # 打印结果
- print_result(result, elapsed)
- # 保存结果
- output_file = output_dir / f"{pdf_path.stem}_result.json"
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(result.to_dict(), f, ensure_ascii=False, indent=2)
- print(f"[信息] 结果已保存到: {output_file}")
- # 保存 chunks 明细
- chunks_file = output_dir / f"{pdf_path.stem}_chunks.jsonl"
- with open(chunks_file, "w", encoding="utf-8") as f:
- for chunk in result.chunks:
- f.write(json.dumps({
- "chunk_id": chunk.chunk_id,
- "section_label": chunk.section_label,
- "chapter_classification": chunk.chapter_classification,
- "first_name": chunk.first_name,
- "secondary_category_code": chunk.secondary_category_code,
- "secondary_category_cn": chunk.secondary_category_cn,
- "tertiary_category_code": chunk.tertiary_category_code,
- "tertiary_category_cn": chunk.tertiary_category_cn,
- "page_start": chunk.page_start,
- "page_end": chunk.page_end,
- "content_preview": chunk.review_chunk_content[:200] + "...",
- }, ensure_ascii=False) + "\n")
- print(f"[信息] Chunks 明细已保存到: {chunks_file}")
- return 0
- if __name__ == "__main__":
- sys.exit(main())
|