| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- """
- 管线编排:调度 PDF 提取 → 目录识别 → 切分 → 分类
- """
- import asyncio
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- from .pdf_extractor import SimplePdfExtractor
- from .toc_builder import build_toc_items_from_structure
- from .chunk_assembler import assemble_chunks
- from .classifier import SimpleClassifier
- from .models import PipelineResult, ClassificationItem
- class MinimalPipeline:
- """独立最小化文档处理管线"""
- def __init__(
- self,
- api_key: str,
- base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
- model: str = "qwen3.5-122b-a10b",
- concurrency: int = 10,
- csv_path: Optional[str] = None,
- ):
- self.extractor = SimplePdfExtractor()
- self.classifier = SimpleClassifier(
- api_key=api_key,
- base_url=base_url,
- model=model,
- concurrency=concurrency,
- csv_path=csv_path,
- )
- async def process(
- self,
- file_content: bytes,
- file_name: str = "",
- skip_tertiary: bool = False,
- progress_callback: Optional[callable] = None,
- ) -> PipelineResult:
- """
- 处理 PDF 文档。
- Args:
- file_content: PDF 文件字节内容
- file_name: 文件名(用于报告)
- skip_tertiary: 是否跳过三级分类(节省 LLM 调用)
- progress_callback: 进度回调函数 (stage, percent, message) -> None
- Returns:
- PipelineResult
- """
- result = PipelineResult(document_name=file_name, total_pages=0)
- # 1. PDF 结构提取
- if progress_callback:
- progress_callback("文档提取", 0, "开始 PDF 结构提取...")
- structure = self.extractor.extract(file_content)
- result.total_pages = structure.get("total_pages", 0)
- result.chapters = structure.get("chapters", {})
- if progress_callback:
- chapter_count = len([k for k in result.chapters.keys() if k != "quality_check"])
- progress_callback("文档提取", 20, f"PDF 提取完成,共 {chapter_count} 个一级章节")
- # 2. 目录构建
- toc_items = build_toc_items_from_structure(structure)
- if not toc_items:
- result.quality_check = {"error": "未提取到有效目录结构"}
- return result
- if progress_callback:
- progress_callback("文档分类", 25, f"构建目录完成,共 {len(toc_items)} 个目录项")
- # 3. 一级分类
- primary_result = await self.classifier.classify_primary(toc_items)
- result.primary_items = [
- ClassificationItem(
- title=item["title"],
- page=item["page"],
- level=item["level"],
- category=item["category"],
- category_code=item["category_code"],
- confidence=item["confidence"],
- original=item["original"],
- level2_titles=item.get("level2_titles", []),
- )
- for item in primary_result.get("items", [])
- ]
- if progress_callback:
- progress_callback("文档分类", 40, f"一级分类完成,共 {len(result.primary_items)} 项")
- # 4. 二级分类
- secondary_result = await self.classifier.classify_secondary(primary_result)
- result.secondary_items = secondary_result.get("items", [])
- if progress_callback:
- progress_callback("文档分类", 55, f"二级分类完成,共 {secondary_result.get('total_count', 0)} 项")
- # 5. 组装 chunks
- chunks = assemble_chunks(structure, primary_result, secondary_result)
- if not chunks:
- result.quality_check = {"error": "无可用的 chunks"}
- return result
- if progress_callback:
- progress_callback("文档切分", 60, f"组装完成,共 {len(chunks)} 个内容块")
- # 6. 三级分类(可选)
- if not skip_tertiary:
- chunks = await self.classifier.classify_tertiary(chunks)
- if progress_callback:
- progress_callback("文档分类", 90, "三级分类完成")
- else:
- for chunk in chunks:
- chunk["tertiary_category_code"] = "skipped"
- chunk["tertiary_category_cn"] = "已跳过"
- if progress_callback:
- progress_callback("文档分类", 90, "已跳过三级分类")
- # 7. 转换为 ChunkItem
- from .models import ChunkItem
- result.chunks = [
- ChunkItem(
- chunk_id=c["chunk_id"],
- section_label=c["section_label"],
- chapter_classification=c["chapter_classification"],
- first_name=c["first_name"],
- secondary_category_code=c["secondary_category_code"],
- secondary_category_cn=c["secondary_category_cn"],
- hierarchy_path=c["hierarchy_path"],
- review_chunk_content=c["review_chunk_content"],
- page_start=c["page_start"],
- page_end=c["page_end"],
- tertiary_category_code=c.get("tertiary_category_code", ""),
- tertiary_category_cn=c.get("tertiary_category_cn", ""),
- tertiary_classification_details=c.get("tertiary_classification_details", []),
- )
- for c in chunks
- ]
- # 8. 质量检查
- result.quality_check = self._build_quality_check(structure, result)
- # 9. 统计
- result.stats = {
- "total_pages": result.total_pages,
- "chapter_count": len(result.primary_items),
- "chunk_count": len(result.chunks),
- "primary_category_distribution": primary_result.get("category_stats", {}),
- "secondary_category_distribution": secondary_result.get("category_stats", {}),
- }
- if progress_callback:
- progress_callback("完成", 100, "处理完成")
- return result
- def _build_quality_check(self, structure: Dict[str, Any], result: PipelineResult) -> Dict[str, Any]:
- """构建质量检查结果"""
- chapters = structure.get("chapters", {})
- l1_count = len([k for k in chapters.keys() if k != "quality_check"])
- l2_count = 0
- for chapter_name, sections in chapters.items():
- if isinstance(sections, dict):
- for section_name in sections.keys():
- if section_name != "章节标题":
- l2_count += 1
- default_total_chapters = 10
- default_total_subsections = 41
- l1_rate = l1_count / default_total_chapters if default_total_chapters > 0 else 1.0
- l2_rate = l2_count / default_total_subsections if default_total_subsections > 0 else 1.0
- return {
- "l1_chapter_quality": {
- "extracted_count": l1_count,
- "expected_count": default_total_chapters,
- "extraction_rate": round(l1_rate * 100, 2),
- "threshold": 70.0,
- "exist_issue": l1_rate < 0.70,
- },
- "l2_subsection_quality": {
- "extracted_count": l2_count,
- "expected_count": default_total_subsections,
- "extraction_rate": round(l2_rate * 100, 2),
- "threshold": 73.0,
- "exist_issue": l2_rate < 0.73,
- },
- }
|