""" 管线编排:调度 PDF 提取 → 目录识别 → 切分 → 分类 """ import asyncio from pathlib import Path from typing import Any, Dict, List, Optional from .pdf_extractor import SimplePdfExtractor from .toc_builder import build_toc_items_from_structure from .chunk_assembler import assemble_chunks from .classifier import SimpleClassifier from .models import PipelineResult, ClassificationItem class MinimalPipeline: """独立最小化文档处理管线""" def __init__( self, api_key: str, base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1", model: str = "qwen3.5-122b-a10b", concurrency: int = 10, csv_path: Optional[str] = None, ): self.extractor = SimplePdfExtractor() self.classifier = SimpleClassifier( api_key=api_key, base_url=base_url, model=model, concurrency=concurrency, csv_path=csv_path, ) async def process( self, file_content: bytes, file_name: str = "", skip_tertiary: bool = False, progress_callback: Optional[callable] = None, ) -> PipelineResult: """ 处理 PDF 文档。 Args: file_content: PDF 文件字节内容 file_name: 文件名(用于报告) skip_tertiary: 是否跳过三级分类(节省 LLM 调用) progress_callback: 进度回调函数 (stage, percent, message) -> None Returns: PipelineResult """ result = PipelineResult(document_name=file_name, total_pages=0) # 1. PDF 结构提取 if progress_callback: progress_callback("文档提取", 0, "开始 PDF 结构提取...") structure = self.extractor.extract(file_content) result.total_pages = structure.get("total_pages", 0) result.chapters = structure.get("chapters", {}) if progress_callback: chapter_count = len([k for k in result.chapters.keys() if k != "quality_check"]) progress_callback("文档提取", 20, f"PDF 提取完成,共 {chapter_count} 个一级章节") # 2. 目录构建 toc_items = build_toc_items_from_structure(structure) if not toc_items: result.quality_check = {"error": "未提取到有效目录结构"} return result if progress_callback: progress_callback("文档分类", 25, f"构建目录完成,共 {len(toc_items)} 个目录项") # 3. 一级分类 primary_result = await self.classifier.classify_primary(toc_items) result.primary_items = [ ClassificationItem( title=item["title"], page=item["page"], level=item["level"], category=item["category"], category_code=item["category_code"], confidence=item["confidence"], original=item["original"], level2_titles=item.get("level2_titles", []), ) for item in primary_result.get("items", []) ] if progress_callback: progress_callback("文档分类", 40, f"一级分类完成,共 {len(result.primary_items)} 项") # 4. 二级分类 secondary_result = await self.classifier.classify_secondary(primary_result) result.secondary_items = secondary_result.get("items", []) if progress_callback: progress_callback("文档分类", 55, f"二级分类完成,共 {secondary_result.get('total_count', 0)} 项") # 5. 组装 chunks chunks = assemble_chunks(structure, primary_result, secondary_result) if not chunks: result.quality_check = {"error": "无可用的 chunks"} return result if progress_callback: progress_callback("文档切分", 60, f"组装完成,共 {len(chunks)} 个内容块") # 6. 三级分类(可选) if not skip_tertiary: chunks = await self.classifier.classify_tertiary(chunks) if progress_callback: progress_callback("文档分类", 90, "三级分类完成") else: for chunk in chunks: chunk["tertiary_category_code"] = "skipped" chunk["tertiary_category_cn"] = "已跳过" if progress_callback: progress_callback("文档分类", 90, "已跳过三级分类") # 7. 转换为 ChunkItem from .models import ChunkItem result.chunks = [ ChunkItem( chunk_id=c["chunk_id"], section_label=c["section_label"], chapter_classification=c["chapter_classification"], first_name=c["first_name"], secondary_category_code=c["secondary_category_code"], secondary_category_cn=c["secondary_category_cn"], hierarchy_path=c["hierarchy_path"], review_chunk_content=c["review_chunk_content"], page_start=c["page_start"], page_end=c["page_end"], tertiary_category_code=c.get("tertiary_category_code", ""), tertiary_category_cn=c.get("tertiary_category_cn", ""), tertiary_classification_details=c.get("tertiary_classification_details", []), ) for c in chunks ] # 8. 质量检查 result.quality_check = self._build_quality_check(structure, result) # 9. 统计 result.stats = { "total_pages": result.total_pages, "chapter_count": len(result.primary_items), "chunk_count": len(result.chunks), "primary_category_distribution": primary_result.get("category_stats", {}), "secondary_category_distribution": secondary_result.get("category_stats", {}), } if progress_callback: progress_callback("完成", 100, "处理完成") return result def _build_quality_check(self, structure: Dict[str, Any], result: PipelineResult) -> Dict[str, Any]: """构建质量检查结果""" chapters = structure.get("chapters", {}) l1_count = len([k for k in chapters.keys() if k != "quality_check"]) l2_count = 0 for chapter_name, sections in chapters.items(): if isinstance(sections, dict): for section_name in sections.keys(): if section_name != "章节标题": l2_count += 1 default_total_chapters = 10 default_total_subsections = 41 l1_rate = l1_count / default_total_chapters if default_total_chapters > 0 else 1.0 l2_rate = l2_count / default_total_subsections if default_total_subsections > 0 else 1.0 return { "l1_chapter_quality": { "extracted_count": l1_count, "expected_count": default_total_chapters, "extraction_rate": round(l1_rate * 100, 2), "threshold": 70.0, "exist_issue": l1_rate < 0.70, }, "l2_subsection_quality": { "extracted_count": l2_count, "expected_count": default_total_subsections, "extraction_rate": round(l2_rate * 100, 2), "threshold": 73.0, "exist_issue": l2_rate < 0.73, }, }