run.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 独立最小化管线运行入口
  5. 用法:
  6. python run.py -p <pdf路径> [-o <输出目录>] [--skip-tertiary] [--ocr]
  7. 示例:
  8. python utils_test/minimal_pipeline/run.py \
  9. -p "D:/wx_work/sichuan_luqiao/lu_sgsc_testfile/测试模版.pdf" \
  10. -o ./output \
  11. --skip-tertiary
  12. """
  13. import argparse
  14. import asyncio
  15. import json
  16. import os
  17. import sys
  18. import time
  19. from pathlib import Path
  20. # 添加项目根目录到路径(用于读取 config.ini 等,但本模块本身不依赖 core/foundation)
  21. PROJECT_ROOT = Path(__file__).parent.parent.parent
  22. sys.path.insert(0, str(PROJECT_ROOT))
  23. os.chdir(PROJECT_ROOT)
  24. from utils_test.minimal_pipeline import MinimalPipeline
  25. from utils_test.minimal_pipeline.models import PipelineResult
  26. def parse_args():
  27. parser = argparse.ArgumentParser(description="独立最小化文档处理管线")
  28. parser.add_argument("-p", "--pdf", required=True, help="PDF 文件路径")
  29. parser.add_argument("-o", "--output", default="./output", help="输出目录(默认 ./output)")
  30. parser.add_argument("--skip-tertiary", action="store_true", help="跳过三级分类(节省 LLM 调用)")
  31. parser.add_argument("--api-key", default=os.environ.get("DASHSCOPE_API_KEY", ""), help="API Key(默认从环境变量 DASHSCOPE_API_KEY 读取)")
  32. parser.add_argument("--base-url", default="https://dashscope.aliyuncs.com/compatible-mode/v1", help="API Base URL")
  33. parser.add_argument("--model", default="qwen3.5-122b-a10b", help="模型名称")
  34. parser.add_argument("--csv", default=None, help="StandardCategoryTable.csv 路径(默认自动查找)")
  35. return parser.parse_args()
  36. def print_progress(stage: str, percent: int, message: str):
  37. """进度回调"""
  38. bar_len = 30
  39. filled = int(bar_len * percent / 100)
  40. bar = "█" * filled + "░" * (bar_len - filled)
  41. print(f"\r[{bar}] {percent:3d}% | {stage:10s} | {message}", end="", flush=True)
  42. if percent >= 100:
  43. print()
  44. def print_result(result: PipelineResult, elapsed: float):
  45. """打印结果摘要"""
  46. print("\n" + "=" * 80)
  47. print("处理结果摘要")
  48. print("=" * 80)
  49. print(f"文档名称: {result.document_name}")
  50. print(f"总页数: {result.total_pages}")
  51. print(f"处理耗时: {elapsed:.2f} 秒")
  52. print(f"\n一级章节数: {len(result.primary_items)}")
  53. for item in result.primary_items:
  54. print(f" [{item.category_code:15s}] {item.title}")
  55. print(f"\nChunks 数: {len(result.chunks)}")
  56. for chunk in result.chunks[:5]:
  57. print(f" {chunk.chunk_id} | {chunk.section_label} | "
  58. f"一级={chunk.first_name} 二级={chunk.secondary_category_cn} "
  59. f"三级={chunk.tertiary_category_cn}")
  60. if len(result.chunks) > 5:
  61. print(f" ... 共 {len(result.chunks)} 个 chunks")
  62. print(f"\n质量检查:")
  63. qc = result.quality_check
  64. l1 = qc.get("l1_chapter_quality", {})
  65. l2 = qc.get("l2_subsection_quality", {})
  66. print(f" 一级提取率: {l1.get('extraction_rate', 0):.1f}% ({l1.get('extracted_count', 0)}/{l1.get('expected_count', 0)})")
  67. print(f" 二级提取率: {l2.get('extraction_rate', 0):.1f}% ({l2.get('extracted_count', 0)}/{l2.get('expected_count', 0)})")
  68. print(f"\n分类统计:")
  69. for level, stats in result.stats.items():
  70. if isinstance(stats, dict) and stats:
  71. print(f" {level}:")
  72. for cat, count in stats.items():
  73. print(f" {cat}: {count}")
  74. print("=" * 80)
  75. def main():
  76. args = parse_args()
  77. pdf_path = Path(args.pdf)
  78. if not pdf_path.exists():
  79. print(f"[错误] PDF 文件不存在: {pdf_path}")
  80. return 1
  81. if not args.api_key:
  82. print("[错误] 未提供 API Key。请通过 --api-key 参数或 DASHSCOPE_API_KEY 环境变量设置。")
  83. return 1
  84. output_dir = Path(args.output)
  85. output_dir.mkdir(parents=True, exist_ok=True)
  86. print(f"[信息] 处理文档: {pdf_path}")
  87. print(f"[信息] 输出目录: {output_dir}")
  88. print(f"[信息] 模型: {args.model}")
  89. print(f"[信息] 跳过三级分类: {args.skip_tertiary}")
  90. print()
  91. # 读取 PDF
  92. with open(pdf_path, "rb") as f:
  93. file_content = f.read()
  94. # 初始化管线
  95. pipeline = MinimalPipeline(
  96. api_key=args.api_key,
  97. base_url=args.base_url,
  98. model=args.model,
  99. concurrency=10,
  100. csv_path=args.csv,
  101. )
  102. # 运行管线
  103. start_time = time.time()
  104. try:
  105. result = asyncio.run(pipeline.process(
  106. file_content=file_content,
  107. file_name=pdf_path.name,
  108. skip_tertiary=args.skip_tertiary,
  109. progress_callback=print_progress,
  110. ))
  111. except Exception as e:
  112. print(f"\n[错误] 处理失败: {e}")
  113. import traceback
  114. traceback.print_exc()
  115. return 1
  116. elapsed = time.time() - start_time
  117. # 打印结果
  118. print_result(result, elapsed)
  119. # 保存结果
  120. output_file = output_dir / f"{pdf_path.stem}_result.json"
  121. with open(output_file, "w", encoding="utf-8") as f:
  122. json.dump(result.to_dict(), f, ensure_ascii=False, indent=2)
  123. print(f"[信息] 结果已保存到: {output_file}")
  124. # 保存 chunks 明细
  125. chunks_file = output_dir / f"{pdf_path.stem}_chunks.jsonl"
  126. with open(chunks_file, "w", encoding="utf-8") as f:
  127. for chunk in result.chunks:
  128. f.write(json.dumps({
  129. "chunk_id": chunk.chunk_id,
  130. "section_label": chunk.section_label,
  131. "chapter_classification": chunk.chapter_classification,
  132. "first_name": chunk.first_name,
  133. "secondary_category_code": chunk.secondary_category_code,
  134. "secondary_category_cn": chunk.secondary_category_cn,
  135. "tertiary_category_code": chunk.tertiary_category_code,
  136. "tertiary_category_cn": chunk.tertiary_category_cn,
  137. "page_start": chunk.page_start,
  138. "page_end": chunk.page_end,
  139. "content_preview": chunk.review_chunk_content[:200] + "...",
  140. }, ensure_ascii=False) + "\n")
  141. print(f"[信息] Chunks 明细已保存到: {chunks_file}")
  142. return 0
  143. if __name__ == "__main__":
  144. sys.exit(main())