pipeline.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. """
  2. 管线编排:调度 PDF 提取 → 目录识别 → 切分 → 分类
  3. """
  4. import asyncio
  5. from pathlib import Path
  6. from typing import Any, Dict, List, Optional
  7. from .pdf_extractor import SimplePdfExtractor
  8. from .toc_builder import build_toc_items_from_structure
  9. from .chunk_assembler import assemble_chunks
  10. from .classifier import SimpleClassifier
  11. from .models import PipelineResult, ClassificationItem
  12. class MinimalPipeline:
  13. """独立最小化文档处理管线"""
  14. def __init__(
  15. self,
  16. api_key: str,
  17. base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
  18. model: str = "qwen3.5-122b-a10b",
  19. concurrency: int = 10,
  20. csv_path: Optional[str] = None,
  21. ):
  22. self.extractor = SimplePdfExtractor()
  23. self.classifier = SimpleClassifier(
  24. api_key=api_key,
  25. base_url=base_url,
  26. model=model,
  27. concurrency=concurrency,
  28. csv_path=csv_path,
  29. )
  30. async def process(
  31. self,
  32. file_content: bytes,
  33. file_name: str = "",
  34. skip_tertiary: bool = False,
  35. progress_callback: Optional[callable] = None,
  36. ) -> PipelineResult:
  37. """
  38. 处理 PDF 文档。
  39. Args:
  40. file_content: PDF 文件字节内容
  41. file_name: 文件名(用于报告)
  42. skip_tertiary: 是否跳过三级分类(节省 LLM 调用)
  43. progress_callback: 进度回调函数 (stage, percent, message) -> None
  44. Returns:
  45. PipelineResult
  46. """
  47. result = PipelineResult(document_name=file_name, total_pages=0)
  48. # 1. PDF 结构提取
  49. if progress_callback:
  50. progress_callback("文档提取", 0, "开始 PDF 结构提取...")
  51. structure = self.extractor.extract(file_content)
  52. result.total_pages = structure.get("total_pages", 0)
  53. result.chapters = structure.get("chapters", {})
  54. if progress_callback:
  55. chapter_count = len([k for k in result.chapters.keys() if k != "quality_check"])
  56. progress_callback("文档提取", 20, f"PDF 提取完成,共 {chapter_count} 个一级章节")
  57. # 2. 目录构建
  58. toc_items = build_toc_items_from_structure(structure)
  59. if not toc_items:
  60. result.quality_check = {"error": "未提取到有效目录结构"}
  61. return result
  62. if progress_callback:
  63. progress_callback("文档分类", 25, f"构建目录完成,共 {len(toc_items)} 个目录项")
  64. # 3. 一级分类
  65. primary_result = await self.classifier.classify_primary(toc_items)
  66. result.primary_items = [
  67. ClassificationItem(
  68. title=item["title"],
  69. page=item["page"],
  70. level=item["level"],
  71. category=item["category"],
  72. category_code=item["category_code"],
  73. confidence=item["confidence"],
  74. original=item["original"],
  75. level2_titles=item.get("level2_titles", []),
  76. )
  77. for item in primary_result.get("items", [])
  78. ]
  79. if progress_callback:
  80. progress_callback("文档分类", 40, f"一级分类完成,共 {len(result.primary_items)} 项")
  81. # 4. 二级分类
  82. secondary_result = await self.classifier.classify_secondary(primary_result)
  83. result.secondary_items = secondary_result.get("items", [])
  84. if progress_callback:
  85. progress_callback("文档分类", 55, f"二级分类完成,共 {secondary_result.get('total_count', 0)} 项")
  86. # 5. 组装 chunks
  87. chunks = assemble_chunks(structure, primary_result, secondary_result)
  88. if not chunks:
  89. result.quality_check = {"error": "无可用的 chunks"}
  90. return result
  91. if progress_callback:
  92. progress_callback("文档切分", 60, f"组装完成,共 {len(chunks)} 个内容块")
  93. # 6. 三级分类(可选)
  94. if not skip_tertiary:
  95. chunks = await self.classifier.classify_tertiary(chunks)
  96. if progress_callback:
  97. progress_callback("文档分类", 90, "三级分类完成")
  98. else:
  99. for chunk in chunks:
  100. chunk["tertiary_category_code"] = "skipped"
  101. chunk["tertiary_category_cn"] = "已跳过"
  102. if progress_callback:
  103. progress_callback("文档分类", 90, "已跳过三级分类")
  104. # 7. 转换为 ChunkItem
  105. from .models import ChunkItem
  106. result.chunks = [
  107. ChunkItem(
  108. chunk_id=c["chunk_id"],
  109. section_label=c["section_label"],
  110. chapter_classification=c["chapter_classification"],
  111. first_name=c["first_name"],
  112. secondary_category_code=c["secondary_category_code"],
  113. secondary_category_cn=c["secondary_category_cn"],
  114. hierarchy_path=c["hierarchy_path"],
  115. review_chunk_content=c["review_chunk_content"],
  116. page_start=c["page_start"],
  117. page_end=c["page_end"],
  118. tertiary_category_code=c.get("tertiary_category_code", ""),
  119. tertiary_category_cn=c.get("tertiary_category_cn", ""),
  120. tertiary_classification_details=c.get("tertiary_classification_details", []),
  121. )
  122. for c in chunks
  123. ]
  124. # 8. 质量检查
  125. result.quality_check = self._build_quality_check(structure, result)
  126. # 9. 统计
  127. result.stats = {
  128. "total_pages": result.total_pages,
  129. "chapter_count": len(result.primary_items),
  130. "chunk_count": len(result.chunks),
  131. "primary_category_distribution": primary_result.get("category_stats", {}),
  132. "secondary_category_distribution": secondary_result.get("category_stats", {}),
  133. }
  134. if progress_callback:
  135. progress_callback("完成", 100, "处理完成")
  136. return result
  137. def _build_quality_check(self, structure: Dict[str, Any], result: PipelineResult) -> Dict[str, Any]:
  138. """构建质量检查结果"""
  139. chapters = structure.get("chapters", {})
  140. l1_count = len([k for k in chapters.keys() if k != "quality_check"])
  141. l2_count = 0
  142. for chapter_name, sections in chapters.items():
  143. if isinstance(sections, dict):
  144. for section_name in sections.keys():
  145. if section_name != "章节标题":
  146. l2_count += 1
  147. default_total_chapters = 10
  148. default_total_subsections = 41
  149. l1_rate = l1_count / default_total_chapters if default_total_chapters > 0 else 1.0
  150. l2_rate = l2_count / default_total_subsections if default_total_subsections > 0 else 1.0
  151. return {
  152. "l1_chapter_quality": {
  153. "extracted_count": l1_count,
  154. "expected_count": default_total_chapters,
  155. "extraction_rate": round(l1_rate * 100, 2),
  156. "threshold": 70.0,
  157. "exist_issue": l1_rate < 0.70,
  158. },
  159. "l2_subsection_quality": {
  160. "extracted_count": l2_count,
  161. "expected_count": default_total_subsections,
  162. "extraction_rate": round(l2_rate * 100, 2),
  163. "threshold": 73.0,
  164. "exist_issue": l2_rate < 0.73,
  165. },
  166. }