|
|
@@ -4,10 +4,10 @@
|
|
|
集成doc_worker模块的智能处理能力
|
|
|
|
|
|
重构说明:
|
|
|
-1. 使用类级别共享ChunkClassifier实例,避免重复创建LLM客户端
|
|
|
-2. 统一PDF处理流程,消除代码重复
|
|
|
-3. 移除splits冗余数据,统一使用chunks
|
|
|
-4. 完善异常处理,记录完整堆栈信息
|
|
|
+1. 使用 UnifiedDocumentStructure 统一数据结构
|
|
|
+2. 使用类级别共享ChunkClassifier实例,避免重复创建LLM客户端
|
|
|
+3. 统一PDF处理流程,消除代码重复
|
|
|
+4. 移除splits冗余数据,统一使用chunks
|
|
|
|
|
|
注意: DOCX/DOC 文件应在上传层转换为 PDF,本模块不再直接处理 DOCX
|
|
|
"""
|
|
|
@@ -16,11 +16,13 @@ import io
|
|
|
import json
|
|
|
import os
|
|
|
import tempfile
|
|
|
+from collections import defaultdict
|
|
|
from dataclasses import dataclass
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, Any, Optional, List
|
|
|
from datetime import datetime
|
|
|
import asyncio
|
|
|
+import uuid
|
|
|
|
|
|
from foundation.observability.logger.loggering import review_logger as logger
|
|
|
from foundation.observability.cachefiles import cache, CacheBaseDir
|
|
|
@@ -30,21 +32,29 @@ from .constants import CategoryCode, StatusCode, StageName
|
|
|
try:
|
|
|
from .doc_worker.interfaces import DocumentSource, TOCExtractor, FullTextExtractor, TextSplitter, HierarchyClassifier
|
|
|
from .doc_worker.pdf_worker.toc_extractor import PdfTOCExtractor
|
|
|
+ from .doc_worker.pdf_worker.fulltext_extractor import PdfFullTextExtractor
|
|
|
from .doc_worker.pdf_worker.hybrid_extractor import HybridFullTextExtractor
|
|
|
from .doc_worker.pdf_worker.text_splitter import PdfTextSplitter
|
|
|
- from .doc_worker.pdf_worker.classifier import PdfHierarchyClassifier
|
|
|
- from .doc_worker.classification.hierarchy_classifier import HierarchyClassifier as DocxHierarchyClassifier
|
|
|
+ from .doc_worker.classification.hierarchy_classifier import HierarchyClassifier
|
|
|
from .doc_worker.classification.chunk_classifier import ChunkClassifier
|
|
|
from .doc_worker.config.provider import default_config_provider
|
|
|
+ from .doc_worker.models import (
|
|
|
+ UnifiedDocumentStructure,
|
|
|
+ )
|
|
|
+ from .minimal_pipeline import SimpleDocumentProcessor
|
|
|
except ImportError:
|
|
|
from core.construction_review.component.doc_worker.interfaces import DocumentSource, TOCExtractor, FullTextExtractor, TextSplitter, HierarchyClassifier
|
|
|
from core.construction_review.component.doc_worker.pdf_worker.toc_extractor import PdfTOCExtractor
|
|
|
+ from core.construction_review.component.doc_worker.pdf_worker.fulltext_extractor import PdfFullTextExtractor
|
|
|
from core.construction_review.component.doc_worker.pdf_worker.hybrid_extractor import HybridFullTextExtractor
|
|
|
from core.construction_review.component.doc_worker.pdf_worker.text_splitter import PdfTextSplitter
|
|
|
- from core.construction_review.component.doc_worker.pdf_worker.classifier import PdfHierarchyClassifier
|
|
|
- from core.construction_review.component.doc_worker.classification.hierarchy_classifier import HierarchyClassifier as DocxHierarchyClassifier
|
|
|
+ from core.construction_review.component.doc_worker.classification.hierarchy_classifier import HierarchyClassifier
|
|
|
from core.construction_review.component.doc_worker.classification.chunk_classifier import ChunkClassifier
|
|
|
from core.construction_review.component.doc_worker.config.provider import default_config_provider
|
|
|
+ from core.construction_review.component.doc_worker.models import (
|
|
|
+ UnifiedDocumentStructure,
|
|
|
+ )
|
|
|
+ from core.construction_review.component.minimal_pipeline import SimpleDocumentProcessor
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
@@ -56,160 +66,57 @@ class DocumentComponents:
|
|
|
text_splitter: TextSplitter
|
|
|
|
|
|
|
|
|
-# 二级分类标题关键词映射(用于outline的subsection分类)
|
|
|
-# 基于 StandardCategoryTable.csv,严格匹配标准目录名
|
|
|
-SECONDARY_CATEGORY_KEYWORDS = {
|
|
|
- # 编制依据 (basis)
|
|
|
- "basis": {
|
|
|
- "LawsAndRegulations": ["法律法规"], # 严格匹配
|
|
|
- "StandardsAndSpecifications": ["标准规范"], # 严格匹配
|
|
|
- "DocumentSystems": ["文件制度"], # 严格匹配
|
|
|
- "CompilationPrinciples": ["编制原则"], # 严格匹配
|
|
|
- "CompilationScope": ["编制范围"], # 严格匹配
|
|
|
- },
|
|
|
- # 工程概况 (overview)
|
|
|
- "overview": {
|
|
|
- "DesignSummary": ["设计概况"], # 严格匹配
|
|
|
- "GeologyWeather": ["工程地质与水文气象"], # 严格匹配标准目录名
|
|
|
- "Surroundings": ["周边环境"], # 严格匹配
|
|
|
- "LayoutPlan": ["施工平面及立面布置"], # 严格匹配标准目录名
|
|
|
- "RequirementsTech": ["施工要求和技术保证条件"], # 严格匹配标准目录名
|
|
|
- "RiskLevel": ["风险辨识与分级"], # 严格匹配标准目录名
|
|
|
- "Stakeholders": ["参建各方责任主体单位"], # 严格匹配标准目录名
|
|
|
- },
|
|
|
- # 施工计划 (plan)
|
|
|
- "plan": {
|
|
|
- "Schedule": ["施工进度计划"], # 严格匹配标准目录名
|
|
|
- "Materials": ["施工材料计划"], # 严格匹配标准目录名
|
|
|
- "Equipment": ["施工设备计划"], # 严格匹配标准目录名
|
|
|
- "Workforce": ["劳动力计划"], # 严格匹配
|
|
|
- "SafetyCost": ["安全生产费用使用计划"], # 严格匹配标准目录名
|
|
|
- },
|
|
|
- # 施工工艺技术 (technology)
|
|
|
- "technology": {
|
|
|
- # 按标准目录严格匹配,优先匹配完整名称避免歧义
|
|
|
- "MethodsOverview": ["主要施工方法概述", "施工方法概述"], # 不包含"施工方法"避免与Operations冲突
|
|
|
- "TechParams": ["技术参数"], # 不包含"参数"避免过于宽泛
|
|
|
- "Process": ["工艺流程"], # 不包含"流程"避免过于宽泛
|
|
|
- "PrepWork": ["施工准备"], # 不包含"准备"避免过于宽泛
|
|
|
- "Operations": ["施工方法及操作要求", "施工方案及操作要求", "操作要求", "施工方案"], # 最具体的放前面
|
|
|
- "Inspection": ["检查要求"], # 不包含"检查""验收"避免与其他章节冲突
|
|
|
- },
|
|
|
- # 安全保证措施 (safety)
|
|
|
- "safety": {
|
|
|
- "SafetySystem": ["安全保证体系"], # 严格匹配标准目录名
|
|
|
- "Organization": ["组织保证措施"], # 严格匹配
|
|
|
- "TechMeasures": ["技术保障措施"], # 严格匹配
|
|
|
- "Monitoring": ["监测监控措施"], # 严格匹配
|
|
|
- "Emergency": ["应急处置措施"], # 严格匹配
|
|
|
- },
|
|
|
- # 质量保证措施 (quality)
|
|
|
- "quality": {
|
|
|
- "QualitySystem": ["质量保证体系"], # 严格匹配
|
|
|
- "QualityGoals": ["质量目标"], # 严格匹配
|
|
|
- "Excellence": ["工程创优规划"], # 严格匹配
|
|
|
- "QualityControl": ["质量控制程序与具体措施"], # 严格匹配标准目录名
|
|
|
- },
|
|
|
- # 环境保证措施 (environment)
|
|
|
- "environment": {
|
|
|
- "EnvSystem": ["环境保证体系"], # 严格匹配
|
|
|
- "EnvOrg": ["环境保护组织机构"], # 严格匹配
|
|
|
- "EnvProtection": ["环境保护及文明施工措施"], # 严格匹配标准目录名
|
|
|
- },
|
|
|
- # 施工管理及作业人员配备与分工 (management)
|
|
|
- "management": {
|
|
|
- "Managers": ["施工管理人员"], # 严格匹配
|
|
|
- "SafetyStaff": ["专职安全生产管理人员"], # 严格匹配标准目录名
|
|
|
- "SpecialWorkers": ["特种作业人员"], # 严格匹配
|
|
|
- "OtherWorkers": ["其他作业人员"], # 严格匹配
|
|
|
- },
|
|
|
- # 验收要求 (acceptance)
|
|
|
- "acceptance": {
|
|
|
- "Standards": ["验收标准"], # 严格匹配
|
|
|
- "Procedure": ["验收程序"], # 严格匹配
|
|
|
- "Content": ["验收内容"], # 严格匹配
|
|
|
- "Timing": ["验收时间"], # 严格匹配
|
|
|
- "Personnel": ["验收人员"], # 严格匹配
|
|
|
- },
|
|
|
- # 其他资料 (other)
|
|
|
- "other": {
|
|
|
- "Calculations": ["计算书"], # 严格匹配
|
|
|
- "Drawings": ["相关施工图纸"], # 严格匹配标准目录名
|
|
|
- "Tables": ["附图附表"], # 严格匹配
|
|
|
- "Team": ["编制及审核人员情况"], # 严格匹配标准目录名
|
|
|
- },
|
|
|
-}
|
|
|
-
|
|
|
class DocumentProcessor:
|
|
|
"""
|
|
|
文档处理器
|
|
|
|
|
|
改进说明:
|
|
|
- 1. 使用类级别共享 _shared_chunk_classifier,避免重复创建LLM客户端
|
|
|
- 2. 使用 DocumentComponents 统一管理处理组件
|
|
|
- 3. 统一处理流程 _parse_content 消除代码重复
|
|
|
+ 1. 使用 UnifiedDocumentStructure 统一数据结构
|
|
|
+ 2. 使用类级别共享 _shared_chunk_classifier,避免重复创建LLM客户端
|
|
|
+ 3. 使用 DocumentComponents 统一管理处理组件
|
|
|
+ 4. 统一处理流程 _parse_content 消除代码重复
|
|
|
"""
|
|
|
|
|
|
# 类级别共享的ChunkClassifier实例,避免重复创建LLM客户端
|
|
|
_shared_chunk_classifier: Optional[ChunkClassifier] = None
|
|
|
|
|
|
- def __init__(self, progress_manager=None, callback_task_id: str = None, progress_state: dict = None):
|
|
|
+ def __init__(self, progress_manager=None, callback_task_id: str = None, progress_state: dict = None, use_ocr: bool = False):
|
|
|
+ """
|
|
|
+ 初始化文档处理器
|
|
|
+
|
|
|
+ Args:
|
|
|
+ progress_manager: 进度管理器
|
|
|
+ callback_task_id: 回调任务ID
|
|
|
+ progress_state: 进度状态字典
|
|
|
+ use_ocr: 是否启用 OCR 模式(表格页使用 OCR 识别)
|
|
|
+ """
|
|
|
self.supported_types = ['pdf'] # DOCX/DOC 应在上传层转换为 PDF
|
|
|
self.config = default_config_provider
|
|
|
+ self.use_ocr = use_ocr
|
|
|
# SSE 进度推送(由 DocumentWorkflow 注入)
|
|
|
self._progress_manager = progress_manager
|
|
|
self._callback_task_id = callback_task_id
|
|
|
# 与心跳协程共享的状态字典,更新后心跳自动反映新阶段
|
|
|
self._progress_state = progress_state
|
|
|
|
|
|
- # 初始化PDF文档的处理组件
|
|
|
+ # 选择提取器
|
|
|
+ if use_ocr:
|
|
|
+ logger.info("DocumentProcessor 使用 OCR 模式(表格页检测 + 识别)")
|
|
|
+ extractor = HybridFullTextExtractor()
|
|
|
+ else:
|
|
|
+ logger.info("DocumentProcessor 使用本地提取模式(PyMuPDF)")
|
|
|
+ extractor = PdfFullTextExtractor()
|
|
|
+
|
|
|
+ # 初始化PDF文档的处理组件(简化版)
|
|
|
self._components: Dict[str, DocumentComponents] = {
|
|
|
'pdf': DocumentComponents(
|
|
|
toc_extractor=PdfTOCExtractor(),
|
|
|
- classifier=PdfHierarchyClassifier(),
|
|
|
- fulltext_extractor=HybridFullTextExtractor(),
|
|
|
+ classifier=HierarchyClassifier(),
|
|
|
+ fulltext_extractor=extractor,
|
|
|
text_splitter=PdfTextSplitter()
|
|
|
)
|
|
|
}
|
|
|
|
|
|
- # 加载标准分类表并创建序号映射
|
|
|
- self._load_category_seq_mappings()
|
|
|
-
|
|
|
- def _load_category_seq_mappings(self):
|
|
|
- """加载标准分类表CSV,创建code到seq的映射"""
|
|
|
- self._first_seq_map: Dict[str, int] = {} # first_code -> first_seq
|
|
|
- self._second_seq_map: Dict[str, int] = {} # second_code -> second_seq
|
|
|
-
|
|
|
- try:
|
|
|
- import csv
|
|
|
- csv_path = Path(__file__).parent / 'doc_worker' / 'config' / 'StandardCategoryTable.csv'
|
|
|
- if not csv_path.exists():
|
|
|
- logger.warning(f"标准分类表不存在: {csv_path}")
|
|
|
- return
|
|
|
-
|
|
|
- with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
|
- reader = csv.DictReader(f)
|
|
|
- for row in reader:
|
|
|
- first_code = row.get('first_code', '').strip()
|
|
|
- second_code = row.get('second_code', '').strip()
|
|
|
- try:
|
|
|
- first_seq = int(row.get('first_seq', 0) or 0)
|
|
|
- except (ValueError, TypeError):
|
|
|
- first_seq = 0
|
|
|
- try:
|
|
|
- second_seq = int(row.get('second_seq', 0) or 0)
|
|
|
- except (ValueError, TypeError):
|
|
|
- second_seq = 0
|
|
|
-
|
|
|
- if first_code and first_code not in self._first_seq_map:
|
|
|
- self._first_seq_map[first_code] = first_seq
|
|
|
- if second_code and second_code not in self._second_seq_map:
|
|
|
- self._second_seq_map[second_code] = second_seq
|
|
|
-
|
|
|
- logger.debug(f"加载分类序号映射: 一级 {len(self._first_seq_map)} 个, 二级 {len(self._second_seq_map)} 个")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"加载分类序号映射失败: {e}")
|
|
|
-
|
|
|
@classmethod
|
|
|
def _get_chunk_classifier(cls) -> ChunkClassifier:
|
|
|
"""获取共享的ChunkClassifier实例"""
|
|
|
@@ -217,7 +124,7 @@ class DocumentProcessor:
|
|
|
cls._shared_chunk_classifier = ChunkClassifier()
|
|
|
return cls._shared_chunk_classifier
|
|
|
|
|
|
- async def process_document(self, file_content: bytes, file_type: str) -> Dict[str, Any]:
|
|
|
+ async def process_document(self, file_content: bytes, file_type: str) -> UnifiedDocumentStructure:
|
|
|
"""
|
|
|
处理文档
|
|
|
|
|
|
@@ -226,7 +133,7 @@ class DocumentProcessor:
|
|
|
file_type: 文件类型(pdf/docx)
|
|
|
|
|
|
Returns:
|
|
|
- Dict: 结构化的解析结果
|
|
|
+ UnifiedDocumentStructure: 统一文档结构
|
|
|
|
|
|
Raises:
|
|
|
ValueError: 不支持的文件类型
|
|
|
@@ -240,120 +147,55 @@ class DocumentProcessor:
|
|
|
raise ValueError(f"不支持的文件类型: {file_type},支持的类型: {self.supported_types}")
|
|
|
|
|
|
# 统一调用解析方法
|
|
|
- result = await self._parse_content(file_content, file_type_lower)
|
|
|
+ unified_doc = await self._parse_content(file_content, file_type_lower)
|
|
|
+
|
|
|
+ # 保存到缓存
|
|
|
+ cache.document_temp(
|
|
|
+ unified_doc.to_dict(),
|
|
|
+ base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW
|
|
|
+ )
|
|
|
|
|
|
- # 结构化内容
|
|
|
- structured_result = self.structure_content(result)
|
|
|
- cache.document_temp(structured_result, base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW)
|
|
|
- return structured_result
|
|
|
+ return unified_doc
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"文档处理失败: {str(e)}", exc_info=True)
|
|
|
raise
|
|
|
|
|
|
- async def _parse_content(self, file_content: bytes, file_type: str) -> Dict[str, Any]:
|
|
|
+ async def _parse_content(self, file_content: bytes, file_type: str) -> UnifiedDocumentStructure:
|
|
|
"""
|
|
|
- 统一的文档解析方法(消除PDF/DOCX代码重复)
|
|
|
+ 统一的文档解析方法
|
|
|
|
|
|
Args:
|
|
|
file_content: 文件内容
|
|
|
file_type: 文件类型(pdf/docx)
|
|
|
|
|
|
Returns:
|
|
|
- Dict: 解析结果
|
|
|
+ UnifiedDocumentStructure: 统一文档结构
|
|
|
"""
|
|
|
components = self._components.get(file_type)
|
|
|
if not components:
|
|
|
raise ValueError(f"未找到 {file_type} 类型的处理组件")
|
|
|
|
|
|
try:
|
|
|
- logger.info(f"开始使用doc_worker处理{file_type.upper()}文档(内存模式)")
|
|
|
-
|
|
|
- # 创建DocumentSource(纯内存模式)
|
|
|
- source = DocumentSource(
|
|
|
- path=None,
|
|
|
- content=file_content,
|
|
|
- file_type=file_type
|
|
|
- )
|
|
|
-
|
|
|
- # 步骤1: 提取目录
|
|
|
- logger.info(f"{StageName.TOC_EXTRACTION.value}: 提取文档目录")
|
|
|
- toc_info = components.toc_extractor.extract_toc(source)
|
|
|
-
|
|
|
- if toc_info.get('toc_count', 0) == 0:
|
|
|
- logger.warning("未检测到目录,使用基础处理模式")
|
|
|
- return await self._fallback_processing(file_content, file_type)
|
|
|
-
|
|
|
- logger.info(f"成功提取 {toc_info['toc_count']} 个目录项")
|
|
|
+ logger.info(f"开始使用最简流程处理{file_type.upper()}文档")
|
|
|
|
|
|
- # 步骤2: 分类目录项
|
|
|
- target_level = int(self.config.get("text_splitting.target_level", 1))
|
|
|
- logger.info(f"{StageName.CLASSIFICATION.value}: 对{target_level}级目录进行分类")
|
|
|
-
|
|
|
- classification_result = await components.classifier.classify_async(
|
|
|
- toc_info['toc_items'],
|
|
|
- target_level=target_level
|
|
|
- )
|
|
|
-
|
|
|
- classified_items = classification_result.get('items', [])
|
|
|
- if not classified_items:
|
|
|
- logger.warning("分类结果为空,使用原始目录项")
|
|
|
- classified_items = [
|
|
|
- item for item in toc_info['toc_items']
|
|
|
- if item.get('level') == target_level
|
|
|
- ]
|
|
|
- # 为每个目录项添加默认分类信息
|
|
|
- for item in classified_items:
|
|
|
- item['category'] = '未分类'
|
|
|
- item['category_code'] = CategoryCode.OTHER.value
|
|
|
- else:
|
|
|
- logger.info(f"分类完成,共分类 {len(classified_items)} 个目录项")
|
|
|
-
|
|
|
- # 步骤3: 提取文档全文(使用线程池避免阻塞事件循环)
|
|
|
- logger.info(f"{StageName.TEXT_EXTRACTION.value}: 提取文档全文")
|
|
|
- pages_content = await asyncio.to_thread(
|
|
|
- components.fulltext_extractor.extract_full_text, source
|
|
|
- )
|
|
|
-
|
|
|
- if not pages_content:
|
|
|
- logger.warning("无法提取文档全文,使用基础处理模式")
|
|
|
- return await self._fallback_processing(file_content, file_type)
|
|
|
+ async def _progress_adapter(stage: str, current: int, message: str):
|
|
|
+ await self._push_classification_progress(
|
|
|
+ stage=stage, current=current, message=message
|
|
|
+ )
|
|
|
|
|
|
- total_chars = sum(len(page.get('text', '')) for page in pages_content)
|
|
|
- logger.info(f"提取完成,共 {len(pages_content)} 页,{total_chars} 个字符")
|
|
|
-
|
|
|
- # 步骤4: 按分类标题智能切分文本(使用线程池避免阻塞)
|
|
|
- logger.info(f"{StageName.TEXT_SPLITTING.value}: 按分类标题智能切分文本")
|
|
|
- max_chunk_size = int(self.config.get("text_splitting.max_chunk_size", 3000))
|
|
|
- min_chunk_size = int(self.config.get("text_splitting.min_chunk_size", 50))
|
|
|
-
|
|
|
- chunks = await asyncio.to_thread(
|
|
|
- components.text_splitter.split_by_hierarchy,
|
|
|
- classified_items,
|
|
|
- pages_content,
|
|
|
- toc_info,
|
|
|
- target_level,
|
|
|
- max_chunk_size,
|
|
|
- min_chunk_size
|
|
|
+ simple_processor = SimpleDocumentProcessor()
|
|
|
+ unified_doc = await simple_processor.process_unified(
|
|
|
+ file_content=file_content,
|
|
|
+ file_name=f"document_{uuid.uuid4().hex[:8]}",
|
|
|
+ progress_callback=_progress_adapter,
|
|
|
)
|
|
|
|
|
|
- if not chunks:
|
|
|
- logger.warning("未能生成任何文本块,使用基础处理模式")
|
|
|
- return await self._fallback_processing(file_content, file_type)
|
|
|
-
|
|
|
- logger.info(f"切分完成,共生成 {len(chunks)} 个文本块")
|
|
|
+ # 缓存结果
|
|
|
+ await self._cache_unified_structure(unified_doc)
|
|
|
+ await self._cache_tertiary_results(unified_doc, [])
|
|
|
|
|
|
- # 步骤5: 对chunks进行二级分类
|
|
|
- chunks = await self._classify_chunks_secondary(chunks)
|
|
|
-
|
|
|
- # 步骤6: 对chunks进行三级分类
|
|
|
- chunks = await self._classify_chunks_tertiary(chunks)
|
|
|
-
|
|
|
- # 构建返回结果(移除splits冗余,统一使用chunks)
|
|
|
- return self._build_parse_result(
|
|
|
- file_type, chunks, pages_content, toc_info,
|
|
|
- classified_items, target_level, total_chars
|
|
|
- )
|
|
|
+ return unified_doc
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"{file_type.upper()}解析失败: {str(e)}", exc_info=True)
|
|
|
@@ -367,6 +209,303 @@ class DocumentProcessor:
|
|
|
f"文档处理完全失败: {file_type.upper()}智能处理({str(e)}) + 基础处理({str(fallback_error)})"
|
|
|
) from e
|
|
|
|
|
|
+ def _build_unified_structure(
|
|
|
+ self,
|
|
|
+ primary_result: Dict[str, Any],
|
|
|
+ secondary_result: Optional[Dict[str, Any]],
|
|
|
+ chunks: List[Dict[str, Any]],
|
|
|
+ pages_content: List[Dict[str, Any]],
|
|
|
+ toc_info: Dict[str, Any],
|
|
|
+ document_name: str,
|
|
|
+ ) -> UnifiedDocumentStructure:
|
|
|
+ """
|
|
|
+ 构建统一文档结构(一二级分类)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ primary_result: 一级分类结果
|
|
|
+ secondary_result: 二级分类结果
|
|
|
+ chunks: 文档切分结果
|
|
|
+ pages_content: 页面内容
|
|
|
+ toc_info: 目录信息
|
|
|
+ document_name: 文档名称
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ UnifiedDocumentStructure: 统一文档结构
|
|
|
+ """
|
|
|
+ # 计算总行数
|
|
|
+ total_lines = 0
|
|
|
+ for chunk in chunks:
|
|
|
+ content = chunk.get("review_chunk_content", "")
|
|
|
+ total_lines += len(content.split("\n"))
|
|
|
+
|
|
|
+ # 创建统一结构
|
|
|
+ unified_doc = build_unified_structure(
|
|
|
+ primary_result=primary_result,
|
|
|
+ secondary_result=secondary_result or {"items": []},
|
|
|
+ chunks=chunks,
|
|
|
+ document_name=document_name,
|
|
|
+ total_pages=len(pages_content),
|
|
|
+ )
|
|
|
+
|
|
|
+ # 更新总行数
|
|
|
+ unified_doc.total_lines = total_lines
|
|
|
+ unified_doc.document_id = str(uuid.uuid4())
|
|
|
+
|
|
|
+ # 构建大纲(从二级分类中提取)
|
|
|
+ outline_items = []
|
|
|
+ for sec in unified_doc.secondary_classifications:
|
|
|
+ outline_items.append(OutlineItem(
|
|
|
+ first_seq=sec.first_seq,
|
|
|
+ first_code=sec.first_code,
|
|
|
+ first_name=sec.first_name,
|
|
|
+ second_seq=sec.second_seq,
|
|
|
+ second_code=sec.second_code,
|
|
|
+ second_name=sec.second_name,
|
|
|
+ raw_title=sec.section_label,
|
|
|
+ page=sec.page_start,
|
|
|
+ ))
|
|
|
+
|
|
|
+ unified_doc.outline = Outline(items=outline_items)
|
|
|
+
|
|
|
+ # 保存原始元数据
|
|
|
+ unified_doc.raw_metadata = {
|
|
|
+ "toc_info": toc_info,
|
|
|
+ "processing_info": {
|
|
|
+ "chunks_count": len(chunks),
|
|
|
+ "pages_count": len(pages_content),
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return unified_doc
|
|
|
+
|
|
|
+ def _merge_tertiary_results(
|
|
|
+ self,
|
|
|
+ unified_doc: UnifiedDocumentStructure,
|
|
|
+ tertiary_results: List[Dict[str, Any]],
|
|
|
+ chunks: List[Dict[str, Any]],
|
|
|
+ ) -> UnifiedDocumentStructure:
|
|
|
+ """
|
|
|
+ 将三级分类结果合并到统一文档结构
|
|
|
+
|
|
|
+ Args:
|
|
|
+ unified_doc: 统一文档结构
|
|
|
+ tertiary_results: 三级分类结果列表
|
|
|
+ chunks: 原始chunks(已完成三级分类)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ UnifiedDocumentStructure: 更新后的结构
|
|
|
+ """
|
|
|
+ from collections import defaultdict
|
|
|
+
|
|
|
+ # 按二级分类代码分组收集三级分类项
|
|
|
+ secondary_groups: Dict[str, Dict] = defaultdict(lambda: {
|
|
|
+ "first_code": "",
|
|
|
+ "first_name": "",
|
|
|
+ "second_name": "",
|
|
|
+ "section_label": "",
|
|
|
+ "second_content": "",
|
|
|
+ "third_items": []
|
|
|
+ })
|
|
|
+
|
|
|
+ # 遍历所有chunks,按二级分类分组
|
|
|
+ for chunk in chunks:
|
|
|
+ first_code = chunk.get("chapter_classification", "")
|
|
|
+ second_code = chunk.get("secondary_category_code", "")
|
|
|
+
|
|
|
+ if not second_code or second_code == "none":
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 获取或创建分组
|
|
|
+ group = secondary_groups[second_code]
|
|
|
+ group["first_code"] = first_code
|
|
|
+ group["first_name"] = chunk.get("first_name", "")
|
|
|
+ group["second_name"] = chunk.get("second_name", "")
|
|
|
+ group["section_label"] = chunk.get("section_label", "")
|
|
|
+
|
|
|
+ # 合并内容
|
|
|
+ content = chunk.get("review_chunk_content", "") or chunk.get("content", "")
|
|
|
+ if content:
|
|
|
+ if group["second_content"]:
|
|
|
+ group["second_content"] += "\n\n" + content
|
|
|
+ else:
|
|
|
+ group["second_content"] = content
|
|
|
+
|
|
|
+ # 收集三级分类详情
|
|
|
+ details = chunk.get("tertiary_classification_details", [])
|
|
|
+ for idx, detail in enumerate(details, 1):
|
|
|
+ group["third_items"].append(TertiaryItem(
|
|
|
+ third_seq=len(group["third_items"]) + 1,
|
|
|
+ third_code=detail.get("third_category_code", ""),
|
|
|
+ third_name=detail.get("third_category_name", ""),
|
|
|
+ line_start=detail.get("start_line", 0),
|
|
|
+ line_end=detail.get("end_line", 0),
|
|
|
+ content=detail.get("content", ""),
|
|
|
+ confidence=1.0
|
|
|
+ ))
|
|
|
+
|
|
|
+ # 构建tertiary_classifications列表
|
|
|
+ tertiary_list = []
|
|
|
+ second_seq = 0
|
|
|
+
|
|
|
+ # 调试日志
|
|
|
+ logger.info(f"[_merge_tertiary_results] 共有 {len(secondary_groups)} 个二级分类组")
|
|
|
+
|
|
|
+ for second_code, group in secondary_groups.items():
|
|
|
+ logger.info(f"[_merge_tertiary_results] 处理二级分类: {second_code}, 三级项数: {len(group['third_items'])}")
|
|
|
+ if not group["third_items"]:
|
|
|
+ continue
|
|
|
+
|
|
|
+ second_seq += 1
|
|
|
+
|
|
|
+ # 计算行数统计
|
|
|
+ total_lines = len(group["second_content"].split("\n")) if group["second_content"] else 0
|
|
|
+ classified_lines = sum(
|
|
|
+ item.line_end - item.line_start + 1
|
|
|
+ for item in group["third_items"]
|
|
|
+ )
|
|
|
+
|
|
|
+ # 查找对应的一级seq
|
|
|
+ first_seq = self._get_first_seq(group["first_code"], group["first_name"])
|
|
|
+
|
|
|
+ tertiary_list.append(TertiaryClassification(
|
|
|
+ first_seq=first_seq,
|
|
|
+ first_code=group["first_code"],
|
|
|
+ first_name=group["first_name"],
|
|
|
+ second_seq=second_seq,
|
|
|
+ second_code=second_code,
|
|
|
+ second_name=group["second_name"],
|
|
|
+ third_items=group["third_items"],
|
|
|
+ total_lines=total_lines,
|
|
|
+ classified_lines=classified_lines
|
|
|
+ ))
|
|
|
+
|
|
|
+ unified_doc.tertiary_classifications = tertiary_list
|
|
|
+
|
|
|
+ # 同时更新secondary_classifications(从chunks重新构建以确保一致性)
|
|
|
+ self._rebuild_secondary_from_chunks(unified_doc, chunks)
|
|
|
+
|
|
|
+ return unified_doc
|
|
|
+
|
|
|
+ def _get_first_seq(self, first_code: str, first_name: str) -> int:
|
|
|
+ """根据一级代码或名称获取序号"""
|
|
|
+ order_map = {
|
|
|
+ "basis": 1,
|
|
|
+ "overview": 2,
|
|
|
+ "plan": 3,
|
|
|
+ "technology": 4,
|
|
|
+ "safety": 5,
|
|
|
+ "quality": 6,
|
|
|
+ "environment": 7,
|
|
|
+ "management": 8,
|
|
|
+ "acceptance": 9,
|
|
|
+ "other": 10,
|
|
|
+ }
|
|
|
+
|
|
|
+ if first_code in order_map:
|
|
|
+ return order_map[first_code]
|
|
|
+
|
|
|
+ name_map = {
|
|
|
+ "编制依据": 1,
|
|
|
+ "工程概况": 2,
|
|
|
+ "施工计划": 3,
|
|
|
+ "施工工艺技术": 4,
|
|
|
+ "安全保证措施": 5,
|
|
|
+ "质量保证措施": 6,
|
|
|
+ "环境保证措施": 7,
|
|
|
+ "施工管理及作业人员配备与分工": 8,
|
|
|
+ "验收要求": 9,
|
|
|
+ "其它资料": 10,
|
|
|
+ "其他资料": 10,
|
|
|
+ }
|
|
|
+
|
|
|
+ return name_map.get(first_name, 99)
|
|
|
+
|
|
|
+ def _rebuild_secondary_from_chunks(
|
|
|
+ self,
|
|
|
+ unified_doc: UnifiedDocumentStructure,
|
|
|
+ chunks: List[Dict[str, Any]]
|
|
|
+ ) -> None:
|
|
|
+ """从chunks重新构建secondary_classifications以确保数据一致性"""
|
|
|
+ from collections import defaultdict
|
|
|
+
|
|
|
+ # 按二级分类分组
|
|
|
+ groups: Dict[str, Dict] = defaultdict(lambda: {
|
|
|
+ "first_code": "",
|
|
|
+ "first_name": "",
|
|
|
+ "second_name": "",
|
|
|
+ "section_label": "",
|
|
|
+ "content": "",
|
|
|
+ "page_start": 0,
|
|
|
+ "page_end": 0,
|
|
|
+ "metadata": {}
|
|
|
+ })
|
|
|
+
|
|
|
+ for chunk in chunks:
|
|
|
+ second_code = chunk.get("secondary_category_code", "")
|
|
|
+ if not second_code or second_code == "none":
|
|
|
+ continue
|
|
|
+
|
|
|
+ group = groups[second_code]
|
|
|
+ group["first_code"] = chunk.get("chapter_classification", "")
|
|
|
+ group["first_name"] = chunk.get("first_name", "")
|
|
|
+ group["second_name"] = chunk.get("second_name", "")
|
|
|
+ group["section_label"] = chunk.get("section_label", "")
|
|
|
+ group["metadata"] = chunk.get("metadata", {})
|
|
|
+
|
|
|
+ # 合并内容
|
|
|
+ content = chunk.get("review_chunk_content", "") or chunk.get("content", "")
|
|
|
+ if content:
|
|
|
+ if group["content"]:
|
|
|
+ group["content"] += "\n\n" + content
|
|
|
+ else:
|
|
|
+ group["content"] = content
|
|
|
+
|
|
|
+ # 更新页码
|
|
|
+ page = chunk.get("page", 0)
|
|
|
+ if page:
|
|
|
+ if group["page_start"] == 0 or page < group["page_start"]:
|
|
|
+ group["page_start"] = page
|
|
|
+ if page > group["page_end"]:
|
|
|
+ group["page_end"] = page
|
|
|
+
|
|
|
+ # 重建secondary_classifications
|
|
|
+ secondary_list = []
|
|
|
+ second_seq = 0
|
|
|
+
|
|
|
+ for second_code, group in groups.items():
|
|
|
+ second_seq += 1
|
|
|
+ first_seq = self._get_first_seq(group["first_code"], group["first_name"])
|
|
|
+
|
|
|
+ # 计算行号
|
|
|
+ lines = group["content"].split("\n") if group["content"] else []
|
|
|
+ line_start = 0
|
|
|
+ line_end = len(lines) - 1 if lines else 0
|
|
|
+
|
|
|
+ secondary_list.append(SecondaryClassification(
|
|
|
+ first_seq=first_seq,
|
|
|
+ first_code=group["first_code"],
|
|
|
+ first_name=group["first_name"],
|
|
|
+ second_seq=second_seq,
|
|
|
+ second_code=second_code,
|
|
|
+ second_name=group["second_name"],
|
|
|
+ second_content=group["content"],
|
|
|
+ section_label=group["section_label"],
|
|
|
+ page_start=group["page_start"],
|
|
|
+ page_end=group["page_end"],
|
|
|
+ line_start=line_start,
|
|
|
+ line_end=line_end,
|
|
|
+ metadata=group["metadata"]
|
|
|
+ ))
|
|
|
+
|
|
|
+ # 按一级分类和二级分类排序
|
|
|
+ secondary_list.sort(key=lambda x: (x.first_seq, x.second_seq))
|
|
|
+
|
|
|
+ # 重新编号
|
|
|
+ for idx, sec in enumerate(secondary_list, 1):
|
|
|
+ sec.second_seq = idx
|
|
|
+
|
|
|
+ unified_doc.secondary_classifications = secondary_list
|
|
|
+
|
|
|
async def _classify_chunks_secondary(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
"""对chunks进行二级分类"""
|
|
|
logger.info(f"{StageName.SECONDARY_CLASSIFICATION.value}: 对内容块进行二级分类")
|
|
|
@@ -384,13 +523,24 @@ class DocumentProcessor:
|
|
|
return chunks
|
|
|
|
|
|
async def _classify_chunks_tertiary(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
- """对chunks进行三级分类"""
|
|
|
+ """对chunks进行三级分类,返回处理后的chunks"""
|
|
|
logger.info(f"{StageName.TERTIARY_CLASSIFICATION.value}: 对内容块进行三级分类")
|
|
|
await self._push_classification_progress(
|
|
|
stage="文档分类",
|
|
|
current=60,
|
|
|
message=f"正在进行三级分类,共 {len(chunks)} 个内容块..."
|
|
|
)
|
|
|
+
|
|
|
+ try:
|
|
|
+ cache.save(
|
|
|
+ data=chunks,
|
|
|
+ subdir="document_temp",
|
|
|
+ filename="三级分类输入结果",
|
|
|
+ base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW
|
|
|
+ )
|
|
|
+ logger.info("[三级分类] 输入结果已保存到缓存: temp/construction_review/document_temp/三级分类输入结果.json")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"[三级分类] 保存缓存失败: {e}")
|
|
|
try:
|
|
|
chunk_classifier = self._get_chunk_classifier()
|
|
|
|
|
|
@@ -413,6 +563,84 @@ class DocumentProcessor:
|
|
|
logger.warning(f"三级分类失败: {str(e)},跳过三级分类", exc_info=True)
|
|
|
return chunks
|
|
|
|
|
|
+ async def _cache_unified_structure(self, unified_doc: UnifiedDocumentStructure) -> None:
|
|
|
+ """
|
|
|
+ 缓存统一文档结构(二级分类后、三级分类前)
|
|
|
+
|
|
|
+ 保存路径:temp/construction_review/document_temp/统一文档结构.json
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ cache_path = cache.save(
|
|
|
+ data=unified_doc.to_dict(),
|
|
|
+ subdir='document_temp',
|
|
|
+ filename='统一文档结构',
|
|
|
+ base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"[缓存] 统一文档结构已保存: {cache_path}")
|
|
|
+ logger.debug(f"[缓存] 包含 {unified_doc.secondary_count} 个二级分类")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"[缓存] 保存统一文档结构失败: {e}", exc_info=True)
|
|
|
+
|
|
|
+ async def _cache_tertiary_results(
|
|
|
+ self,
|
|
|
+ unified_doc: UnifiedDocumentStructure,
|
|
|
+ chunks: List[Dict[str, Any]]
|
|
|
+ ) -> None:
|
|
|
+ """
|
|
|
+ 缓存三级分类结果
|
|
|
+
|
|
|
+ 保存路径:
|
|
|
+ - temp/construction_review/document_temp/三级分类结果.json
|
|
|
+ - temp/construction_review/document_temp/三级分类_chunks.json
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 缓存统一文档结构
|
|
|
+ cache_path = cache.save(
|
|
|
+ data=unified_doc.to_dict(),
|
|
|
+ subdir='document_temp',
|
|
|
+ filename='三级分类结果',
|
|
|
+ base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"[缓存] 三级分类结果已保存: {cache_path}")
|
|
|
+ logger.info(f"[缓存] 包含 {unified_doc.secondary_count} 个二级分类, {unified_doc.tertiary_count} 个三级分类")
|
|
|
+
|
|
|
+ # 详细统计
|
|
|
+ for t in unified_doc.tertiary_classifications:
|
|
|
+ logger.info(f"[缓存] 三级分类 {t.second_code}: {len(t.third_items)} 个细项")
|
|
|
+
|
|
|
+ # 缓存chunks(简化版,只保留关键字段)
|
|
|
+ # 如果外部未传入 chunks,从 legacy_dict 中提取
|
|
|
+ source_chunks = chunks if chunks else unified_doc.to_legacy_dict().get("chunks", [])
|
|
|
+ chunks_summary = []
|
|
|
+ for chunk in source_chunks:
|
|
|
+ summary = {
|
|
|
+ "chunk_id": chunk.get("chunk_id"),
|
|
|
+ "chapter_classification": chunk.get("chapter_classification"),
|
|
|
+ "secondary_category_code": chunk.get("secondary_category_code"),
|
|
|
+ "section_label": chunk.get("section_label"),
|
|
|
+ "content_length": len(chunk.get("review_chunk_content", "") or chunk.get("content", "")),
|
|
|
+ "tertiary_classification_details": chunk.get("tertiary_classification_details", []),
|
|
|
+ }
|
|
|
+ chunks_summary.append(summary)
|
|
|
+
|
|
|
+ chunks_cache_path = cache.save(
|
|
|
+ data={
|
|
|
+ "total_chunks": len(source_chunks),
|
|
|
+ "chunks": chunks_summary
|
|
|
+ },
|
|
|
+ subdir='document_temp',
|
|
|
+ filename='三级分类_chunks',
|
|
|
+ base_cache_dir=CacheBaseDir.CONSTRUCTION_REVIEW
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.info(f"[缓存] 三级分类chunks已保存: {chunks_cache_path}")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"[缓存] 保存三级分类结果失败: {e}", exc_info=True)
|
|
|
+
|
|
|
async def _push_classification_progress(self, stage: str, current: int, message: str) -> None:
|
|
|
"""推送分类阶段进度,并同步更新心跳共享状态"""
|
|
|
if self._progress_state is not None:
|
|
|
@@ -432,59 +660,7 @@ class DocumentProcessor:
|
|
|
except Exception as e:
|
|
|
logger.warning(f"分类进度推送失败: {e}")
|
|
|
|
|
|
- def _build_parse_result(
|
|
|
- self,
|
|
|
- file_type: str,
|
|
|
- chunks: List[Dict[str, Any]],
|
|
|
- pages_content: List[Dict[str, Any]],
|
|
|
- toc_info: Dict[str, Any],
|
|
|
- classified_items: List[Dict[str, Any]],
|
|
|
- target_level: int,
|
|
|
- total_chars: int
|
|
|
- ) -> Dict[str, Any]:
|
|
|
- """
|
|
|
- 构建解析结果(移除splits冗余)
|
|
|
-
|
|
|
- 改进: 不再生成splits字段,统一使用chunks
|
|
|
- """
|
|
|
- result = {
|
|
|
- 'document_type': file_type,
|
|
|
- 'total_pages': len(pages_content),
|
|
|
- 'total_chunks': len(chunks),
|
|
|
- 'chunks': [
|
|
|
- {
|
|
|
- 'page': chunk.get('element_tag', {}).get('page', 0),
|
|
|
- 'content': chunk.get('review_chunk_content', ''),
|
|
|
- 'metadata': {
|
|
|
- 'chunk_id': chunk.get('chunk_id', ''),
|
|
|
- 'section_label': chunk.get('section_label', ''),
|
|
|
- 'project_plan_type': chunk.get('project_plan_type', ''),
|
|
|
- 'chapter_classification': chunk.get('chapter_classification', ''),
|
|
|
- 'secondary_category_cn': chunk.get('secondary_category_cn', ''),
|
|
|
- 'secondary_category_code': chunk.get('secondary_category_code', ''),
|
|
|
- 'tertiary_category_cn': chunk.get('tertiary_category_cn', ''),
|
|
|
- 'tertiary_category_code': chunk.get('tertiary_category_code', ''),
|
|
|
- # 三级分类详情列表(包含该二级分类下的所有三级分类)
|
|
|
- 'tertiary_classification_details': chunk.get('tertiary_classification_details', []),
|
|
|
- 'element_tag': chunk.get('element_tag', {})
|
|
|
- }
|
|
|
- }
|
|
|
- for chunk in chunks
|
|
|
- ],
|
|
|
- 'toc_info': toc_info,
|
|
|
- 'classification': {
|
|
|
- 'items': classified_items,
|
|
|
- 'target_level': target_level
|
|
|
- } if classified_items else None,
|
|
|
- 'metadata': {
|
|
|
- 'total_pages': len(pages_content),
|
|
|
- 'total_chars': total_chars
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return result
|
|
|
-
|
|
|
- async def _fallback_processing(self, file_content: bytes, file_type: str) -> Dict[str, Any]:
|
|
|
+ async def _fallback_processing(self, file_content: bytes, file_type: str) -> UnifiedDocumentStructure:
|
|
|
"""
|
|
|
统一的基础处理模式(当智能处理失败时使用)
|
|
|
|
|
|
@@ -493,15 +669,15 @@ class DocumentProcessor:
|
|
|
file_type: 文件类型(仅支持 pdf)
|
|
|
|
|
|
Returns:
|
|
|
- Dict: 基础处理结果
|
|
|
+ UnifiedDocumentStructure: 基础处理结果
|
|
|
"""
|
|
|
return await self._fallback_pdf_processing(file_content)
|
|
|
|
|
|
- async def _fallback_pdf_processing(self, file_content: bytes) -> Dict[str, Any]:
|
|
|
+ async def _fallback_pdf_processing(self, file_content: bytes) -> UnifiedDocumentStructure:
|
|
|
"""PDF基础处理模式(当智能处理失败时使用)"""
|
|
|
try:
|
|
|
from langchain_community.document_loaders import PyPDFLoader
|
|
|
- from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|
|
+ from langchain_text_splitters import RecursiveCharacterTextSplitter
|
|
|
|
|
|
logger.info("使用基础PDF处理模式")
|
|
|
|
|
|
@@ -527,270 +703,34 @@ class DocumentProcessor:
|
|
|
for split in splits:
|
|
|
content = split.page_content.strip()
|
|
|
if content:
|
|
|
- split.page_content = content
|
|
|
valid_splits.append(split)
|
|
|
|
|
|
logger.info(f"基础处理完成,有效分块数量: {len(valid_splits)}")
|
|
|
|
|
|
- # 不再生成splits冗余字段
|
|
|
- return {
|
|
|
- 'document_type': 'pdf',
|
|
|
- 'total_pages': len(documents),
|
|
|
- 'total_chunks': len(valid_splits),
|
|
|
- 'chunks': [
|
|
|
- {
|
|
|
- 'chunk_id': f'chunk_{i+1}',
|
|
|
- 'page': split.metadata.get('page', 0),
|
|
|
- 'content': split.page_content,
|
|
|
- 'metadata': split.metadata
|
|
|
- }
|
|
|
- for i, split in enumerate(valid_splits)
|
|
|
- ]
|
|
|
- }
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"基础PDF处理失败: {str(e)}", exc_info=True)
|
|
|
- raise
|
|
|
-
|
|
|
- def structure_content(self, raw_content: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
- """结构化处理,适配doc_worker返回的格式"""
|
|
|
- try:
|
|
|
- document_type = raw_content.get('document_type', 'unknown')
|
|
|
-
|
|
|
- # 检查是否使用了doc_worker的智能处理(有toc_info或classification字段)
|
|
|
- is_smart_processing = 'toc_info' in raw_content or 'classification' in raw_content
|
|
|
-
|
|
|
- if is_smart_processing:
|
|
|
- # 使用doc_worker智能处理的结果
|
|
|
- chunks = []
|
|
|
- for chunk in raw_content.get('chunks', []):
|
|
|
- content = chunk.get('content', '').strip()
|
|
|
- if content:
|
|
|
- metadata = chunk.get('metadata', {})
|
|
|
- element_tag = metadata.get('element_tag', {})
|
|
|
- chapter_classification = metadata.get('chapter_classification', '')
|
|
|
- secondary_category_code = metadata.get('secondary_category_code', '')
|
|
|
-
|
|
|
- # 获取序号
|
|
|
- first_seq = self._first_seq_map.get(chapter_classification, 0)
|
|
|
- second_seq = self._second_seq_map.get(secondary_category_code, 0)
|
|
|
-
|
|
|
- chunks.append({
|
|
|
- 'chunk_id': metadata.get('chunk_id', ''),
|
|
|
- 'page': chunk.get('page', 0),
|
|
|
- 'content': content,
|
|
|
- 'section_label': metadata.get('section_label', ''),
|
|
|
- 'project_plan_type': metadata.get('project_plan_type', ''),
|
|
|
- 'chapter_classification': chapter_classification,
|
|
|
- 'first_seq': first_seq,
|
|
|
- 'secondary_category_cn': metadata.get('secondary_category_cn', ''),
|
|
|
- 'secondary_category_code': secondary_category_code,
|
|
|
- 'second_seq': second_seq,
|
|
|
- 'tertiary_category_cn': metadata.get('tertiary_category_cn', ''),
|
|
|
- 'tertiary_category_code': metadata.get('tertiary_category_code', ''),
|
|
|
- # 三级分类详情列表(包含该二级分类下的所有三级分类)
|
|
|
- 'tertiary_classification_details': metadata.get('tertiary_classification_details', []),
|
|
|
- 'element_tag': element_tag,
|
|
|
- 'chapter': metadata.get('section_label', f'第{chunk.get("page", 0)}页'),
|
|
|
- 'title': metadata.get('section_label', ''),
|
|
|
- 'original_content': content[:100] + '...' if len(content) > 100 else content
|
|
|
- })
|
|
|
- else:
|
|
|
- # 使用基础处理的结果
|
|
|
- if document_type == 'pdf':
|
|
|
- chunks = []
|
|
|
- for i, chunk in enumerate(raw_content.get('chunks', [])):
|
|
|
- content = chunk.get('content', '').strip() if isinstance(chunk, dict) else str(chunk).strip()
|
|
|
- if content:
|
|
|
- page = chunk.get('page', 0) if isinstance(chunk, dict) else 0
|
|
|
- chunks.append({
|
|
|
- 'chunk_id': f'chunk_{i+1}',
|
|
|
- 'page': page,
|
|
|
- 'content': content,
|
|
|
- 'chapter': f'第{page}页',
|
|
|
- 'title': f'内容块{i+1}',
|
|
|
- 'original_content': content[:100] + '...' if len(content) > 100 else content
|
|
|
- })
|
|
|
- else:
|
|
|
- # 基础处理结果为空
|
|
|
- chunks = []
|
|
|
-
|
|
|
- # 构建返回结果
|
|
|
- result = {
|
|
|
- 'document_name': f"施工方案文档_{document_type}",
|
|
|
- 'document_type': document_type,
|
|
|
- 'total_chunks': len(chunks),
|
|
|
- 'chunks': chunks,
|
|
|
- 'metadata': raw_content.get('metadata', {})
|
|
|
- }
|
|
|
-
|
|
|
- # 如果使用了智能处理,保留额外信息
|
|
|
- if is_smart_processing:
|
|
|
- result['outline'] = self._create_outline_from_toc(
|
|
|
- raw_content.get('toc_info', {}),
|
|
|
- raw_content.get('classification')
|
|
|
+ # 构建基础版统一文档结构
|
|
|
+ secondary_list = []
|
|
|
+ for i, split in enumerate(valid_splits, 1):
|
|
|
+ secondary_list.append(SecondaryClassification(
|
|
|
+ first_seq=1,
|
|
|
+ first_code="unknown",
|
|
|
+ first_name="未分类",
|
|
|
+ second_seq=i,
|
|
|
+ second_code=f"chunk_{i}",
|
|
|
+ second_name=f"内容块{i}",
|
|
|
+ second_content=split.page_content,
|
|
|
+ page_start=split.metadata.get("page", 0),
|
|
|
+ page_end=split.metadata.get("page", 0),
|
|
|
+ ))
|
|
|
+
|
|
|
+ unified_doc = UnifiedDocumentStructure(
|
|
|
+ document_id=str(uuid.uuid4()),
|
|
|
+ document_name="基础处理文档.pdf",
|
|
|
+ total_pages=len(documents),
|
|
|
+ secondary_classifications=secondary_list,
|
|
|
)
|
|
|
|
|
|
- return result
|
|
|
+ return unified_doc
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"内容结构化失败: {str(e)}", exc_info=True)
|
|
|
+ logger.error(f"基础PDF处理失败: {str(e)}", exc_info=True)
|
|
|
raise
|
|
|
-
|
|
|
- def _create_outline_from_toc(self, toc_info: Dict[str, Any], classification: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
|
- """
|
|
|
- 从toc_info创建简化的大纲结构,只包含:
|
|
|
- 1. 所有的1级标题(章节目录)
|
|
|
- 2. 各个章节的次级目录
|
|
|
- 3. 各个章节的分类信息(chapter_classification)
|
|
|
-
|
|
|
- Args:
|
|
|
- toc_info: doc_worker返回的目录信息
|
|
|
- classification: 分类信息,包含已分类的目录项
|
|
|
-
|
|
|
- Returns:
|
|
|
- Dict: 简化的大纲数据
|
|
|
- """
|
|
|
- try:
|
|
|
- toc_items = toc_info.get('toc_items', [])
|
|
|
- if not toc_items:
|
|
|
- return {
|
|
|
- 'chapters': [],
|
|
|
- 'total_chapters': 0
|
|
|
- }
|
|
|
-
|
|
|
- # 提取所有1级标题(章节目录)
|
|
|
- level1_items = [item for item in toc_items if item.get('level') == 1]
|
|
|
-
|
|
|
- # 构建一级目录标题到分类信息的映射
|
|
|
- classification_map = {}
|
|
|
- if classification and 'items' in classification:
|
|
|
- for item in classification['items']:
|
|
|
- if item.get('level') == 1:
|
|
|
- title = item.get('title', '')
|
|
|
- classification_map[title] = item.get('category_code', '')
|
|
|
-
|
|
|
- chapters = []
|
|
|
- for idx, level1_item in enumerate(level1_items, 1):
|
|
|
- # 获取一级目录的分类信息
|
|
|
- title = level1_item.get('title', '')
|
|
|
- chapter_classification = classification_map.get(title, '')
|
|
|
-
|
|
|
- # 查找当前1级标题下的所有次级目录(传入chapter_classification用于二级分类映射)
|
|
|
- sub_items = self._find_sub_items(toc_items, level1_item, level1_item, chapter_classification)
|
|
|
-
|
|
|
- chapter_info = {
|
|
|
- 'index': idx,
|
|
|
- 'title': level1_item['title'],
|
|
|
- 'page': level1_item['page'],
|
|
|
- 'original': level1_item.get('original', level1_item['title']),
|
|
|
- 'chapter_classification': chapter_classification, # 一级目录的所属分类
|
|
|
- 'subsections': sub_items # 次级目录(包含secondary_category_code)
|
|
|
- }
|
|
|
- chapters.append(chapter_info)
|
|
|
-
|
|
|
- return {
|
|
|
- 'chapters': chapters,
|
|
|
- 'total_chapters': len(chapters)
|
|
|
- }
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"大纲结构化处理失败: {str(e)}", exc_info=True)
|
|
|
- return {
|
|
|
- 'chapters': [],
|
|
|
- 'total_chapters': 0
|
|
|
- }
|
|
|
-
|
|
|
- def _find_sub_items(self, toc_items: list, parent_item: dict, root_item: dict,
|
|
|
- chapter_classification: str = "") -> list:
|
|
|
- """
|
|
|
- 查找指定父级目录下的所有次级目录,并映射二级分类编码
|
|
|
-
|
|
|
- Args:
|
|
|
- toc_items: 所有目录项
|
|
|
- parent_item: 父级目录项
|
|
|
- root_item: 根级目录项(用于查找次级)
|
|
|
- chapter_classification: 一级分类编码,用于二级分类映射
|
|
|
-
|
|
|
- Returns:
|
|
|
- list: 次级目录列表(包含secondary_category_code)
|
|
|
- """
|
|
|
- sub_items = []
|
|
|
- current_index = toc_items.index(parent_item)
|
|
|
- parent_level = parent_item.get('level', 1)
|
|
|
- root_level = root_item.get('level', 1)
|
|
|
-
|
|
|
- # 从当前位置开始查找次级目录
|
|
|
- for i in range(current_index + 1, len(toc_items)):
|
|
|
- item = toc_items[i]
|
|
|
- item_level = item.get('level', 1)
|
|
|
-
|
|
|
- # 如果遇到同级或更高级的目录,停止查找
|
|
|
- if item_level <= parent_level:
|
|
|
- break
|
|
|
-
|
|
|
- # 只收集次级目录(比父级高)
|
|
|
- if item_level > parent_level:
|
|
|
- sub_item = {
|
|
|
- 'title': item['title'],
|
|
|
- 'page': item['page'],
|
|
|
- 'level': item_level,
|
|
|
- 'original': item.get('original', item['title'])
|
|
|
- }
|
|
|
-
|
|
|
- # 添加二级分类编码映射
|
|
|
- if chapter_classification:
|
|
|
- secondary_code = self._map_title_to_secondary_code(
|
|
|
- item['title'], chapter_classification
|
|
|
- )
|
|
|
- if secondary_code:
|
|
|
- sub_item['secondary_category_code'] = secondary_code
|
|
|
-
|
|
|
- sub_items.append(sub_item)
|
|
|
-
|
|
|
- return sub_items
|
|
|
-
|
|
|
- def _map_title_to_secondary_code(self, title: str, chapter_classification: str) -> Optional[str]:
|
|
|
- """
|
|
|
- 根据小节标题和一级分类,映射到二级分类编码
|
|
|
-
|
|
|
- Args:
|
|
|
- title: 小节标题(如"五、施工方案及操作要求")
|
|
|
- chapter_classification: 一级分类编码(如"technology")
|
|
|
-
|
|
|
- Returns:
|
|
|
- str: 二级分类编码,如"Operations",未匹配则返回None
|
|
|
- """
|
|
|
- if not title or not chapter_classification:
|
|
|
- return None
|
|
|
-
|
|
|
- # 清理标题(去除序号,如"一、""1.""(1)"等)
|
|
|
- import re
|
|
|
- cleaned_title = re.sub(r'^[((]?[一二三四五六七八九十0-9]+[))]?[、.\s]*', '', title)
|
|
|
- cleaned_title = re.sub(r'^\d+[.\s]+', '', cleaned_title)
|
|
|
- cleaned_title = cleaned_title.strip()
|
|
|
-
|
|
|
- # 获取该一级分类下的关键词映射
|
|
|
- category_keywords = SECONDARY_CATEGORY_KEYWORDS.get(chapter_classification, {})
|
|
|
- if not category_keywords:
|
|
|
- return None
|
|
|
-
|
|
|
- # 基于关键词匹配
|
|
|
- best_match = None
|
|
|
- best_score = 0
|
|
|
-
|
|
|
- for code, keywords in category_keywords.items():
|
|
|
- score = 0
|
|
|
- for keyword in keywords:
|
|
|
- if keyword in cleaned_title:
|
|
|
- score += len(keyword) # 关键词越长,权重越高
|
|
|
-
|
|
|
- # 完全匹配加分
|
|
|
- if cleaned_title in keywords:
|
|
|
- score += 10
|
|
|
-
|
|
|
- if score > best_score:
|
|
|
- best_score = score
|
|
|
- best_match = code
|
|
|
-
|
|
|
- return best_match if best_score > 0 else None
|