|
|
@@ -2,22 +2,31 @@
|
|
|
文档处理器
|
|
|
负责文档解析、内容提取和结构化处理
|
|
|
集成doc_worker模块的智能处理能力
|
|
|
+
|
|
|
+重构说明:
|
|
|
+1. 使用类级别共享ChunkClassifier实例,避免重复创建LLM客户端
|
|
|
+2. 统一PDF/DOCX处理流程,消除代码重复
|
|
|
+3. 移除splits冗余数据,统一使用chunks
|
|
|
+4. 完善异常处理,记录完整堆栈信息
|
|
|
"""
|
|
|
|
|
|
import io
|
|
|
import json
|
|
|
import os
|
|
|
import tempfile
|
|
|
+from dataclasses import dataclass
|
|
|
from pathlib import Path
|
|
|
-from typing import Dict, Any, Optional, Callable
|
|
|
+from typing import Dict, Any, Optional, List
|
|
|
from datetime import datetime
|
|
|
import asyncio
|
|
|
|
|
|
from foundation.observability.logger.loggering import review_logger as logger
|
|
|
from foundation.observability.cachefiles import cache, CacheBaseDir
|
|
|
+from .constants import CategoryCode, StatusCode, StageName
|
|
|
+
|
|
|
# 引入doc_worker核心组件
|
|
|
try:
|
|
|
- from .doc_worker.interfaces import DocumentSource, TOCExtractor, FullTextExtractor, TextSplitter
|
|
|
+ from .doc_worker.interfaces import DocumentSource, TOCExtractor, FullTextExtractor, TextSplitter, HierarchyClassifier
|
|
|
from .doc_worker.pdf_worker.toc_extractor import PdfTOCExtractor
|
|
|
from .doc_worker.pdf_worker.fulltext_extractor import PdfFullTextExtractor
|
|
|
from .doc_worker.pdf_worker.text_splitter import PdfTextSplitter
|
|
|
@@ -29,7 +38,7 @@ try:
|
|
|
from .doc_worker.classification.chunk_classifier import ChunkClassifier
|
|
|
from .doc_worker.config.provider import default_config_provider
|
|
|
except ImportError:
|
|
|
- from core.construction_review.component.doc_worker.interfaces import DocumentSource, TOCExtractor, FullTextExtractor, TextSplitter
|
|
|
+ from core.construction_review.component.doc_worker.interfaces import DocumentSource, TOCExtractor, FullTextExtractor, TextSplitter, HierarchyClassifier
|
|
|
from core.construction_review.component.doc_worker.pdf_worker.toc_extractor import PdfTOCExtractor
|
|
|
from core.construction_review.component.doc_worker.pdf_worker.fulltext_extractor import PdfFullTextExtractor
|
|
|
from core.construction_review.component.doc_worker.pdf_worker.text_splitter import PdfTextSplitter
|
|
|
@@ -41,53 +50,81 @@ except ImportError:
|
|
|
from core.construction_review.component.doc_worker.classification.chunk_classifier import ChunkClassifier
|
|
|
from core.construction_review.component.doc_worker.config.provider import default_config_provider
|
|
|
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class DocumentComponents:
|
|
|
+ """文档处理组件集合,统一封装各类型文档的处理组件"""
|
|
|
+ toc_extractor: TOCExtractor
|
|
|
+ classifier: HierarchyClassifier
|
|
|
+ fulltext_extractor: FullTextExtractor
|
|
|
+ text_splitter: TextSplitter
|
|
|
+
|
|
|
class DocumentProcessor:
|
|
|
- """文档处理器"""
|
|
|
+ """
|
|
|
+ 文档处理器
|
|
|
+
|
|
|
+ 改进说明:
|
|
|
+ 1. 使用类级别共享 _shared_chunk_classifier,避免重复创建LLM客户端
|
|
|
+ 2. 使用 DocumentComponents 统一管理处理组件
|
|
|
+ 3. 统一处理流程 _parse_content 消除代码重复
|
|
|
+ """
|
|
|
+
|
|
|
+ # 类级别共享的ChunkClassifier实例,避免重复创建LLM客户端
|
|
|
+ _shared_chunk_classifier: Optional[ChunkClassifier] = None
|
|
|
|
|
|
def __init__(self):
|
|
|
self.supported_types = ['pdf', 'docx']
|
|
|
- # 初始化doc_worker组件
|
|
|
self.config = default_config_provider
|
|
|
- # PDF组件
|
|
|
- self.pdf_toc_extractor = PdfTOCExtractor()
|
|
|
- self.pdf_fulltext_extractor = PdfFullTextExtractor()
|
|
|
- self.pdf_text_splitter = PdfTextSplitter()
|
|
|
- self.pdf_classifier = PdfHierarchyClassifier()
|
|
|
- # DOCX组件
|
|
|
- self.docx_toc_extractor = DocxTOCExtractor()
|
|
|
- self.docx_fulltext_extractor = DocxFullTextExtractor(
|
|
|
- paragraphs_per_page=int(self.config.get("toc_extraction.paragraphs_per_page", 30))
|
|
|
- )
|
|
|
- self.docx_text_splitter = DocxTextSplitter()
|
|
|
- self.docx_classifier = DocxHierarchyClassifier()
|
|
|
- # 二三级分类器(通用)
|
|
|
- self.chunk_classifier = ChunkClassifier()
|
|
|
-
|
|
|
- async def process_document(self, file_content: bytes, file_type: str,
|
|
|
- # progress_callback: Optional[Callable[[int, str], None]] = None
|
|
|
- ) -> Dict[str, Any]:
|
|
|
+
|
|
|
+ # 初始化各类型文档的处理组件
|
|
|
+ self._components: Dict[str, DocumentComponents] = {
|
|
|
+ 'pdf': DocumentComponents(
|
|
|
+ toc_extractor=PdfTOCExtractor(),
|
|
|
+ classifier=PdfHierarchyClassifier(),
|
|
|
+ fulltext_extractor=PdfFullTextExtractor(),
|
|
|
+ text_splitter=PdfTextSplitter()
|
|
|
+ ),
|
|
|
+ 'docx': DocumentComponents(
|
|
|
+ toc_extractor=DocxTOCExtractor(),
|
|
|
+ classifier=DocxHierarchyClassifier(),
|
|
|
+ fulltext_extractor=DocxFullTextExtractor(
|
|
|
+ paragraphs_per_page=int(self.config.get("toc_extraction.paragraphs_per_page", 30))
|
|
|
+ ),
|
|
|
+ text_splitter=DocxTextSplitter()
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _get_chunk_classifier(cls) -> ChunkClassifier:
|
|
|
+ """获取共享的ChunkClassifier实例"""
|
|
|
+ if cls._shared_chunk_classifier is None:
|
|
|
+ cls._shared_chunk_classifier = ChunkClassifier()
|
|
|
+ return cls._shared_chunk_classifier
|
|
|
+
|
|
|
+ async def process_document(self, file_content: bytes, file_type: str) -> Dict[str, Any]:
|
|
|
"""
|
|
|
处理文档
|
|
|
|
|
|
Args:
|
|
|
- file_content: 文件内容
|
|
|
- file_type: 文件类型
|
|
|
- progress_callback: 进度回调函数
|
|
|
+ file_content: 文件内容(字节流)
|
|
|
+ file_type: 文件类型(pdf/docx)
|
|
|
|
|
|
Returns:
|
|
|
- Dict: 解析结果
|
|
|
+ Dict: 结构化的解析结果
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ ValueError: 不支持的文件类型
|
|
|
+ RuntimeError: 文档处理完全失败
|
|
|
"""
|
|
|
try:
|
|
|
logger.info(f"开始处理文档,类型: {file_type}")
|
|
|
- # if progress_callback:
|
|
|
- # progress_callback(20, "开始文档处理")
|
|
|
- # 简化处理:直接解析
|
|
|
- if file_type.lower() == 'pdf':
|
|
|
- result = await self.parse_pdf_content(file_content)
|
|
|
- elif file_type.lower() == 'docx':
|
|
|
- result = await self.parse_docx_content(file_content)
|
|
|
- else:
|
|
|
- raise ValueError(f"不支持的文件类型: {file_type}")
|
|
|
+
|
|
|
+ file_type_lower = file_type.lower()
|
|
|
+ if file_type_lower not in self.supported_types:
|
|
|
+ raise ValueError(f"不支持的文件类型: {file_type},支持的类型: {self.supported_types}")
|
|
|
+
|
|
|
+ # 统一调用解析方法
|
|
|
+ result = await self._parse_content(file_content, file_type_lower)
|
|
|
|
|
|
# 结构化内容
|
|
|
structured_result = self.structure_content(result)
|
|
|
@@ -95,378 +132,278 @@ class DocumentProcessor:
|
|
|
return structured_result
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"文档处理失败: {str(e)}")
|
|
|
+ logger.error(f"文档处理失败: {str(e)}", exc_info=True)
|
|
|
raise
|
|
|
|
|
|
- async def parse_pdf_content(self, file_content: bytes) -> Dict[str, Any]:
|
|
|
- """解析PDF内容,使用doc_worker的智能处理能力"""
|
|
|
+ async def _parse_content(self, file_content: bytes, file_type: str) -> Dict[str, Any]:
|
|
|
+ """
|
|
|
+ 统一的文档解析方法(消除PDF/DOCX代码重复)
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_content: 文件内容
|
|
|
+ file_type: 文件类型(pdf/docx)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Dict: 解析结果
|
|
|
+ """
|
|
|
+ components = self._components.get(file_type)
|
|
|
+ if not components:
|
|
|
+ raise ValueError(f"未找到 {file_type} 类型的处理组件")
|
|
|
+
|
|
|
try:
|
|
|
- logger.info("开始使用doc_worker处理PDF文档(内存模式)")
|
|
|
+ logger.info(f"开始使用doc_worker处理{file_type.upper()}文档(内存模式)")
|
|
|
|
|
|
- # 创建DocumentSource(纯内存模式,不使用临时文件)
|
|
|
+ # 创建DocumentSource(纯内存模式)
|
|
|
source = DocumentSource(
|
|
|
path=None,
|
|
|
content=file_content,
|
|
|
- file_type='pdf'
|
|
|
+ file_type=file_type
|
|
|
)
|
|
|
|
|
|
# 步骤1: 提取目录
|
|
|
- logger.info("步骤1: 提取文档目录")
|
|
|
- toc_info = self.pdf_toc_extractor.extract_toc(source)
|
|
|
-
|
|
|
+ logger.info(f"{StageName.TOC_EXTRACTION.value}: 提取文档目录")
|
|
|
+ toc_info = components.toc_extractor.extract_toc(source)
|
|
|
+
|
|
|
if toc_info.get('toc_count', 0) == 0:
|
|
|
logger.warning("未检测到目录,使用基础处理模式")
|
|
|
- return await self._fallback_pdf_processing(file_content)
|
|
|
+ return await self._fallback_processing(file_content, file_type)
|
|
|
|
|
|
logger.info(f"成功提取 {toc_info['toc_count']} 个目录项")
|
|
|
|
|
|
# 步骤2: 分类目录项
|
|
|
target_level = int(self.config.get("text_splitting.target_level", 1))
|
|
|
- logger.info(f"步骤2: 对{target_level}级目录进行分类")
|
|
|
-
|
|
|
- classification_result = await self.pdf_classifier.classify_async(
|
|
|
+ logger.info(f"{StageName.CLASSIFICATION.value}: 对{target_level}级目录进行分类")
|
|
|
+
|
|
|
+ classification_result = await components.classifier.classify_async(
|
|
|
toc_info['toc_items'],
|
|
|
target_level=target_level
|
|
|
)
|
|
|
-
|
|
|
+
|
|
|
classified_items = classification_result.get('items', [])
|
|
|
if not classified_items:
|
|
|
logger.warning("分类结果为空,使用原始目录项")
|
|
|
classified_items = [
|
|
|
- item for item in toc_info['toc_items']
|
|
|
+ item for item in toc_info['toc_items']
|
|
|
if item.get('level') == target_level
|
|
|
]
|
|
|
# 为每个目录项添加默认分类信息
|
|
|
for item in classified_items:
|
|
|
item['category'] = '未分类'
|
|
|
- item['category_code'] = 'other'
|
|
|
+ item['category_code'] = CategoryCode.OTHER.value
|
|
|
else:
|
|
|
logger.info(f"分类完成,共分类 {len(classified_items)} 个目录项")
|
|
|
|
|
|
- # 步骤3: 提取文档全文
|
|
|
- logger.info("步骤3: 提取文档全文")
|
|
|
- # 将同步CPU/IO密集操作放入线程池,避免阻塞事件循环
|
|
|
+ # 步骤3: 提取文档全文(使用线程池避免阻塞事件循环)
|
|
|
+ logger.info(f"{StageName.TEXT_EXTRACTION.value}: 提取文档全文")
|
|
|
pages_content = await asyncio.to_thread(
|
|
|
- self.pdf_fulltext_extractor.extract_full_text, source
|
|
|
+ components.fulltext_extractor.extract_full_text, source
|
|
|
)
|
|
|
-
|
|
|
+
|
|
|
if not pages_content:
|
|
|
logger.warning("无法提取文档全文,使用基础处理模式")
|
|
|
- return await self._fallback_pdf_processing(file_content)
|
|
|
+ return await self._fallback_processing(file_content, file_type)
|
|
|
|
|
|
total_chars = sum(len(page.get('text', '')) for page in pages_content)
|
|
|
logger.info(f"提取完成,共 {len(pages_content)} 页,{total_chars} 个字符")
|
|
|
|
|
|
- # 步骤4: 按分类标题智能切分文本
|
|
|
- logger.info("步骤4: 按分类标题智能切分文本")
|
|
|
+ # 步骤4: 按分类标题智能切分文本(使用线程池避免阻塞)
|
|
|
+ logger.info(f"{StageName.TEXT_SPLITTING.value}: 按分类标题智能切分文本")
|
|
|
max_chunk_size = int(self.config.get("text_splitting.max_chunk_size", 3000))
|
|
|
min_chunk_size = int(self.config.get("text_splitting.min_chunk_size", 50))
|
|
|
-
|
|
|
- chunks = self.pdf_text_splitter.split_by_hierarchy(
|
|
|
+
|
|
|
+ chunks = await asyncio.to_thread(
|
|
|
+ components.text_splitter.split_by_hierarchy,
|
|
|
classified_items,
|
|
|
pages_content,
|
|
|
toc_info,
|
|
|
- target_level=target_level,
|
|
|
- max_chunk_size=max_chunk_size,
|
|
|
- min_chunk_size=min_chunk_size
|
|
|
+ target_level,
|
|
|
+ max_chunk_size,
|
|
|
+ min_chunk_size
|
|
|
)
|
|
|
|
|
|
if not chunks:
|
|
|
logger.warning("未能生成任何文本块,使用基础处理模式")
|
|
|
- return await self._fallback_pdf_processing(file_content)
|
|
|
+ return await self._fallback_processing(file_content, file_type)
|
|
|
|
|
|
logger.info(f"切分完成,共生成 {len(chunks)} 个文本块")
|
|
|
|
|
|
# 步骤5: 对chunks进行二级分类
|
|
|
- logger.info("步骤5: 对内容块进行二级分类")
|
|
|
- try:
|
|
|
- chunks = await self.chunk_classifier.classify_chunks_secondary_async(chunks)
|
|
|
- logger.info("二级分类完成")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"二级分类失败: {str(e)},跳过二级分类")
|
|
|
+ chunks = await self._classify_chunks_secondary(chunks)
|
|
|
|
|
|
# 步骤6: 对chunks进行三级分类
|
|
|
- logger.info("步骤6: 对内容块进行三级分类")
|
|
|
- try:
|
|
|
- chunks = await self.chunk_classifier.classify_chunks_tertiary_async(chunks)
|
|
|
- logger.info("三级分类完成")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"三级分类失败: {str(e)},跳过三级分类")
|
|
|
+ chunks = await self._classify_chunks_tertiary(chunks)
|
|
|
|
|
|
- # 适配返回格式
|
|
|
- return {
|
|
|
- 'document_type': 'pdf',
|
|
|
- 'total_pages': len(pages_content),
|
|
|
- 'total_chunks': len(chunks),
|
|
|
- 'chunks': [
|
|
|
- {
|
|
|
- 'page': chunk.get('element_tag', {}).get('page', 0),
|
|
|
- 'content': chunk.get('review_chunk_content', ''),
|
|
|
- 'metadata': {
|
|
|
- 'chunk_id': chunk.get('chunk_id', ''),
|
|
|
- 'section_label': chunk.get('section_label', ''),
|
|
|
- 'project_plan_type': chunk.get('project_plan_type', ''),
|
|
|
- 'chapter_classification': chunk.get('chapter_classification', ''),
|
|
|
- 'secondary_category_cn': chunk.get('secondary_category_cn', ''),
|
|
|
- 'secondary_category_code': chunk.get('secondary_category_code', ''),
|
|
|
- 'tertiary_category_cn': chunk.get('tertiary_category_cn', ''),
|
|
|
- 'tertiary_category_code': chunk.get('tertiary_category_code', ''),
|
|
|
- 'element_tag': chunk.get('element_tag', {})
|
|
|
- }
|
|
|
- }
|
|
|
- for chunk in chunks
|
|
|
- ],
|
|
|
- 'splits': [
|
|
|
- {
|
|
|
- 'content': chunk.get('review_chunk_content', ''),
|
|
|
- 'metadata': {
|
|
|
- 'chunk_id': chunk.get('chunk_id', ''),
|
|
|
- 'section_label': chunk.get('section_label', ''),
|
|
|
- 'page': chunk.get('element_tag', {}).get('page', 0)
|
|
|
- }
|
|
|
- }
|
|
|
- for chunk in chunks
|
|
|
- ],
|
|
|
- 'toc_info': toc_info,
|
|
|
- 'classification': {
|
|
|
- 'items': classified_items,
|
|
|
- 'target_level': target_level
|
|
|
- } if classified_items else None
|
|
|
- }
|
|
|
+ # 构建返回结果(移除splits冗余,统一使用chunks)
|
|
|
+ return self._build_parse_result(
|
|
|
+ file_type, chunks, pages_content, toc_info,
|
|
|
+ classified_items, target_level, total_chars
|
|
|
+ )
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"PDF解析失败: {str(e)}")
|
|
|
+ logger.error(f"{file_type.upper()}解析失败: {str(e)}", exc_info=True)
|
|
|
# 如果智能处理失败,尝试基础处理
|
|
|
try:
|
|
|
logger.info("尝试使用基础处理模式")
|
|
|
- return await self._fallback_pdf_processing(file_content)
|
|
|
+ return await self._fallback_processing(file_content, file_type)
|
|
|
except Exception as fallback_error:
|
|
|
- logger.error(f"基础处理模式也失败: {str(fallback_error)}")
|
|
|
- raise
|
|
|
-
|
|
|
- async def parse_docx_content(self, file_content: bytes) -> Dict[str, Any]:
|
|
|
- """解析DOCX内容,使用doc_worker的智能处理能力"""
|
|
|
+ logger.error(f"基础处理模式也失败: {str(fallback_error)}", exc_info=True)
|
|
|
+ raise RuntimeError(
|
|
|
+ f"文档处理完全失败: {file_type.upper()}智能处理({str(e)}) + 基础处理({str(fallback_error)})"
|
|
|
+ ) from e
|
|
|
+
|
|
|
+ async def _classify_chunks_secondary(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
+ """对chunks进行二级分类"""
|
|
|
+ logger.info(f"{StageName.SECONDARY_CLASSIFICATION.value}: 对内容块进行二级分类")
|
|
|
try:
|
|
|
- logger.info("开始使用doc_worker处理DOCX文档(内存模式)")
|
|
|
-
|
|
|
- # 创建DocumentSource(纯内存模式,不使用临时文件)
|
|
|
- source = DocumentSource(
|
|
|
- path=None,
|
|
|
- content=file_content,
|
|
|
- file_type='docx'
|
|
|
- )
|
|
|
-
|
|
|
- # 步骤1: 提取目录
|
|
|
- logger.info("步骤1: 提取文档目录")
|
|
|
- toc_info = self.docx_toc_extractor.extract_toc(source)
|
|
|
-
|
|
|
- if toc_info.get('toc_count', 0) == 0:
|
|
|
- logger.warning("未检测到目录,使用基础处理模式")
|
|
|
- return await self._fallback_docx_processing(file_content)
|
|
|
-
|
|
|
- logger.info(f"成功提取 {toc_info['toc_count']} 个目录项")
|
|
|
-
|
|
|
- # 步骤2: 分类目录项
|
|
|
- target_level = int(self.config.get("text_splitting.target_level", 1))
|
|
|
- logger.info(f"步骤2: 对{target_level}级目录进行分类")
|
|
|
-
|
|
|
- classification_result = await self.docx_classifier.classify_async(
|
|
|
- toc_info['toc_items'],
|
|
|
- target_level=target_level
|
|
|
- )
|
|
|
-
|
|
|
- classified_items = classification_result.get('items', [])
|
|
|
- if not classified_items:
|
|
|
- logger.warning("分类结果为空,使用原始目录项")
|
|
|
- classified_items = [
|
|
|
- item for item in toc_info['toc_items']
|
|
|
- if item.get('level') == target_level
|
|
|
- ]
|
|
|
- # 为每个目录项添加默认分类信息
|
|
|
- for item in classified_items:
|
|
|
- item['category'] = '未分类'
|
|
|
- item['category_code'] = 'other'
|
|
|
- else:
|
|
|
- logger.info(f"分类完成,共分类 {len(classified_items)} 个目录项")
|
|
|
-
|
|
|
- # 步骤3: 提取文档全文
|
|
|
- logger.info("步骤3: 提取文档全文")
|
|
|
- # 将同步CPU/IO密集操作放入线程池,避免阻塞事件循环
|
|
|
- pages_content = await asyncio.to_thread(
|
|
|
- self.docx_fulltext_extractor.extract_full_text, source
|
|
|
- )
|
|
|
-
|
|
|
- if not pages_content:
|
|
|
- logger.warning("无法提取文档全文,使用基础处理模式")
|
|
|
- return await self._fallback_docx_processing(file_content)
|
|
|
-
|
|
|
- total_chars = sum(len(page.get('text', '')) for page in pages_content)
|
|
|
- logger.info(f"提取完成,共 {len(pages_content)} 页,{total_chars} 个字符")
|
|
|
+ chunk_classifier = self._get_chunk_classifier()
|
|
|
+ chunks = await chunk_classifier.classify_chunks_secondary_async(chunks)
|
|
|
+ logger.info("二级分类完成")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"二级分类失败: {str(e)},跳过二级分类", exc_info=True)
|
|
|
+ return chunks
|
|
|
|
|
|
- # 步骤4: 按分类标题智能切分文本
|
|
|
- logger.info("步骤4: 按分类标题智能切分文本")
|
|
|
- max_chunk_size = int(self.config.get("text_splitting.max_chunk_size", 3000))
|
|
|
- min_chunk_size = int(self.config.get("text_splitting.min_chunk_size", 50))
|
|
|
-
|
|
|
- chunks = self.docx_text_splitter.split_by_hierarchy(
|
|
|
- classified_items,
|
|
|
- pages_content,
|
|
|
- toc_info,
|
|
|
- target_level=target_level,
|
|
|
- max_chunk_size=max_chunk_size,
|
|
|
- min_chunk_size=min_chunk_size
|
|
|
- )
|
|
|
+ async def _classify_chunks_tertiary(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
|
+ """对chunks进行三级分类"""
|
|
|
+ logger.info(f"{StageName.TERTIARY_CLASSIFICATION.value}: 对内容块进行三级分类")
|
|
|
+ try:
|
|
|
+ chunk_classifier = self._get_chunk_classifier()
|
|
|
+ chunks = await chunk_classifier.classify_chunks_tertiary_async(chunks)
|
|
|
+ logger.info("三级分类完成")
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"三级分类失败: {str(e)},跳过三级分类", exc_info=True)
|
|
|
+ return chunks
|
|
|
+
|
|
|
+ def _build_parse_result(
|
|
|
+ self,
|
|
|
+ file_type: str,
|
|
|
+ chunks: List[Dict[str, Any]],
|
|
|
+ pages_content: List[Dict[str, Any]],
|
|
|
+ toc_info: Dict[str, Any],
|
|
|
+ classified_items: List[Dict[str, Any]],
|
|
|
+ target_level: int,
|
|
|
+ total_chars: int
|
|
|
+ ) -> Dict[str, Any]:
|
|
|
+ """
|
|
|
+ 构建解析结果(移除splits冗余)
|
|
|
|
|
|
- if not chunks:
|
|
|
- logger.warning("未能生成任何文本块,使用基础处理模式")
|
|
|
- return await self._fallback_docx_processing(file_content)
|
|
|
+ 改进: 不再生成splits字段,统一使用chunks
|
|
|
+ """
|
|
|
+ result = {
|
|
|
+ 'document_type': file_type,
|
|
|
+ 'total_pages': len(pages_content),
|
|
|
+ 'total_chunks': len(chunks),
|
|
|
+ 'chunks': [
|
|
|
+ {
|
|
|
+ 'page': chunk.get('element_tag', {}).get('page', 0),
|
|
|
+ 'content': chunk.get('review_chunk_content', ''),
|
|
|
+ 'metadata': {
|
|
|
+ 'chunk_id': chunk.get('chunk_id', ''),
|
|
|
+ 'section_label': chunk.get('section_label', ''),
|
|
|
+ 'project_plan_type': chunk.get('project_plan_type', ''),
|
|
|
+ 'chapter_classification': chunk.get('chapter_classification', ''),
|
|
|
+ 'secondary_category_cn': chunk.get('secondary_category_cn', ''),
|
|
|
+ 'secondary_category_code': chunk.get('secondary_category_code', ''),
|
|
|
+ 'tertiary_category_cn': chunk.get('tertiary_category_cn', ''),
|
|
|
+ 'tertiary_category_code': chunk.get('tertiary_category_code', ''),
|
|
|
+ 'element_tag': chunk.get('element_tag', {})
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for chunk in chunks
|
|
|
+ ],
|
|
|
+ 'toc_info': toc_info,
|
|
|
+ 'classification': {
|
|
|
+ 'items': classified_items,
|
|
|
+ 'target_level': target_level
|
|
|
+ } if classified_items else None,
|
|
|
+ 'metadata': {
|
|
|
+ 'total_pages': len(pages_content),
|
|
|
+ 'total_chars': total_chars
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- logger.info(f"切分完成,共生成 {len(chunks)} 个文本块")
|
|
|
+ # DOCX额外保留full_text字段
|
|
|
+ if file_type == 'docx':
|
|
|
+ result['full_text'] = ''.join([page.get('text', '') for page in pages_content])
|
|
|
|
|
|
- # 步骤5: 对chunks进行二级分类
|
|
|
- logger.info("步骤5: 对内容块进行二级分类")
|
|
|
- try:
|
|
|
- chunks = await self.chunk_classifier.classify_chunks_secondary_async(chunks)
|
|
|
- logger.info("二级分类完成")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"二级分类失败: {str(e)},跳过二级分类")
|
|
|
+ return result
|
|
|
|
|
|
- # 步骤6: 对chunks进行三级分类
|
|
|
- logger.info("步骤6: 对内容块进行三级分类")
|
|
|
- try:
|
|
|
- chunks = await self.chunk_classifier.classify_chunks_tertiary_async(chunks)
|
|
|
- logger.info("三级分类完成")
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"三级分类失败: {str(e)},跳过三级分类")
|
|
|
+ async def _fallback_processing(self, file_content: bytes, file_type: str) -> Dict[str, Any]:
|
|
|
+ """
|
|
|
+ 统一的基础处理模式(当智能处理失败时使用)
|
|
|
|
|
|
- # 适配返回格式
|
|
|
- return {
|
|
|
- 'document_type': 'docx',
|
|
|
- 'total_pages': len(pages_content),
|
|
|
- 'total_chunks': len(chunks),
|
|
|
- 'chunks': [
|
|
|
- {
|
|
|
- 'page': chunk.get('element_tag', {}).get('page', 0),
|
|
|
- 'content': chunk.get('review_chunk_content', ''),
|
|
|
- 'metadata': {
|
|
|
- 'chunk_id': chunk.get('chunk_id', ''),
|
|
|
- 'section_label': chunk.get('section_label', ''),
|
|
|
- 'project_plan_type': chunk.get('project_plan_type', ''),
|
|
|
- 'chapter_classification': chunk.get('chapter_classification', ''),
|
|
|
- 'secondary_category_cn': chunk.get('secondary_category_cn', ''),
|
|
|
- 'secondary_category_code': chunk.get('secondary_category_code', ''),
|
|
|
- 'tertiary_category_cn': chunk.get('tertiary_category_cn', ''),
|
|
|
- 'tertiary_category_code': chunk.get('tertiary_category_code', ''),
|
|
|
- 'element_tag': chunk.get('element_tag', {})
|
|
|
- }
|
|
|
- }
|
|
|
- for chunk in chunks
|
|
|
- ],
|
|
|
- 'splits': [
|
|
|
- {
|
|
|
- 'content': chunk.get('review_chunk_content', ''),
|
|
|
- 'metadata': {
|
|
|
- 'chunk_id': chunk.get('chunk_id', ''),
|
|
|
- 'section_label': chunk.get('section_label', ''),
|
|
|
- 'page': chunk.get('element_tag', {}).get('page', 0)
|
|
|
- }
|
|
|
- }
|
|
|
- for chunk in chunks
|
|
|
- ],
|
|
|
- 'full_text': ''.join([page.get('text', '') for page in pages_content]),
|
|
|
- 'toc_info': toc_info,
|
|
|
- 'classification': {
|
|
|
- 'items': classified_items,
|
|
|
- 'target_level': target_level
|
|
|
- } if classified_items else None,
|
|
|
- 'metadata': {
|
|
|
- 'total_pages': len(pages_content),
|
|
|
- 'total_chars': total_chars
|
|
|
- }
|
|
|
- }
|
|
|
+ Args:
|
|
|
+ file_content: 文件内容
|
|
|
+ file_type: 文件类型(pdf/docx)
|
|
|
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"DOCX解析失败: {str(e)}")
|
|
|
- # 如果智能处理失败,尝试基础处理
|
|
|
- try:
|
|
|
- logger.info("尝试使用基础处理模式")
|
|
|
- return await self._fallback_docx_processing(file_content)
|
|
|
- except Exception as fallback_error:
|
|
|
- logger.error(f"基础处理模式也失败: {str(fallback_error)}")
|
|
|
- raise
|
|
|
+ Returns:
|
|
|
+ Dict: 基础处理结果
|
|
|
+ """
|
|
|
+ if file_type == 'pdf':
|
|
|
+ return await self._fallback_pdf_processing(file_content)
|
|
|
+ else:
|
|
|
+ return await self._fallback_docx_processing(file_content)
|
|
|
|
|
|
async def _fallback_pdf_processing(self, file_content: bytes) -> Dict[str, Any]:
|
|
|
"""PDF基础处理模式(当智能处理失败时使用)"""
|
|
|
- temp_file_path = None
|
|
|
try:
|
|
|
from langchain_community.document_loaders import PyPDFLoader
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|
|
-
|
|
|
+
|
|
|
logger.info("使用基础PDF处理模式")
|
|
|
-
|
|
|
+
|
|
|
# PyPDFLoader需要文件路径,创建临时文件
|
|
|
- with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file:
|
|
|
+ with tempfile.NamedTemporaryFile(delete=True, suffix='.pdf') as temp_file:
|
|
|
temp_file.write(file_content)
|
|
|
+ temp_file.flush()
|
|
|
temp_file_path = temp_file.name
|
|
|
-
|
|
|
- loader = PyPDFLoader(temp_file_path)
|
|
|
- documents = loader.load()
|
|
|
-
|
|
|
- # 文本分块
|
|
|
- text_splitter = RecursiveCharacterTextSplitter(
|
|
|
- chunk_size=1000,
|
|
|
- chunk_overlap=200,
|
|
|
- separators=["\n\n", "\n", " ", ""]
|
|
|
- )
|
|
|
- splits = text_splitter.split_documents(documents)
|
|
|
|
|
|
- # 过滤空内容切块
|
|
|
- valid_splits = []
|
|
|
- for split in splits:
|
|
|
- content = split.page_content.strip()
|
|
|
- if content:
|
|
|
- split.page_content = content
|
|
|
- valid_splits.append(split)
|
|
|
+ loader = PyPDFLoader(temp_file_path)
|
|
|
+ documents = loader.load()
|
|
|
|
|
|
- logger.info(f"基础处理完成,有效分块数量: {len(valid_splits)}")
|
|
|
+ # 文本分块
|
|
|
+ text_splitter = RecursiveCharacterTextSplitter(
|
|
|
+ chunk_size=1000,
|
|
|
+ chunk_overlap=200,
|
|
|
+ separators=["\n\n", "\n", " ", ""]
|
|
|
+ )
|
|
|
+ splits = text_splitter.split_documents(documents)
|
|
|
+
|
|
|
+ # 过滤空内容切块
|
|
|
+ valid_splits = []
|
|
|
+ for split in splits:
|
|
|
+ content = split.page_content.strip()
|
|
|
+ if content:
|
|
|
+ split.page_content = content
|
|
|
+ valid_splits.append(split)
|
|
|
+
|
|
|
+ logger.info(f"基础处理完成,有效分块数量: {len(valid_splits)}")
|
|
|
+
|
|
|
+ # 不再生成splits冗余字段
|
|
|
+ return {
|
|
|
+ 'document_type': 'pdf',
|
|
|
+ 'total_pages': len(documents),
|
|
|
+ 'total_chunks': len(valid_splits),
|
|
|
+ 'chunks': [
|
|
|
+ {
|
|
|
+ 'chunk_id': f'chunk_{i+1}',
|
|
|
+ 'page': split.metadata.get('page', 0),
|
|
|
+ 'content': split.page_content,
|
|
|
+ 'metadata': split.metadata
|
|
|
+ }
|
|
|
+ for i, split in enumerate(valid_splits)
|
|
|
+ ]
|
|
|
+ }
|
|
|
|
|
|
- return {
|
|
|
- 'document_type': 'pdf',
|
|
|
- 'total_pages': len(documents),
|
|
|
- 'total_chunks': len(valid_splits),
|
|
|
- 'chunks': [
|
|
|
- {
|
|
|
- 'page': doc.metadata.get('page', 0),
|
|
|
- 'content': doc.page_content,
|
|
|
- 'metadata': doc.metadata
|
|
|
- }
|
|
|
- for doc in documents
|
|
|
- ],
|
|
|
- 'splits': [
|
|
|
- {
|
|
|
- 'content': split.page_content,
|
|
|
- 'metadata': split.metadata
|
|
|
- }
|
|
|
- for split in valid_splits
|
|
|
- ]
|
|
|
- }
|
|
|
except Exception as e:
|
|
|
- logger.error(f"基础PDF处理失败: {str(e)}")
|
|
|
+ logger.error(f"基础PDF处理失败: {str(e)}", exc_info=True)
|
|
|
raise
|
|
|
- finally:
|
|
|
- # 清理临时文件
|
|
|
- if temp_file_path and os.path.exists(temp_file_path):
|
|
|
- try:
|
|
|
- os.unlink(temp_file_path)
|
|
|
- except Exception as e:
|
|
|
- logger.warning(f"清理临时文件失败: {str(e)}")
|
|
|
|
|
|
async def _fallback_docx_processing(self, file_content: bytes) -> Dict[str, Any]:
|
|
|
"""DOCX基础处理模式(当智能处理失败时使用)"""
|
|
|
try:
|
|
|
from docx import Document
|
|
|
from io import BytesIO
|
|
|
-
|
|
|
+
|
|
|
logger.info("使用基础DOCX处理模式(内存模式)")
|
|
|
doc = Document(BytesIO(file_content))
|
|
|
full_text = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
|
|
|
@@ -498,7 +435,7 @@ class DocumentProcessor:
|
|
|
}
|
|
|
}
|
|
|
except Exception as e:
|
|
|
- logger.error(f"基础DOCX处理失败: {str(e)}")
|
|
|
+ logger.error(f"基础DOCX处理失败: {str(e)}", exc_info=True)
|
|
|
raise
|
|
|
|
|
|
def structure_content(self, raw_content: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
@@ -582,7 +519,7 @@ class DocumentProcessor:
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"内容结构化失败: {str(e)}")
|
|
|
+ logger.error(f"内容结构化失败: {str(e)}", exc_info=True)
|
|
|
raise
|
|
|
|
|
|
def _create_outline_from_toc(self, toc_info: Dict[str, Any], classification: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
|
@@ -643,7 +580,7 @@ class DocumentProcessor:
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
- logger.error(f"大纲结构化处理失败: {str(e)}")
|
|
|
+ logger.error(f"大纲结构化处理失败: {str(e)}", exc_info=True)
|
|
|
return {
|
|
|
'chapters': [],
|
|
|
'total_chapters': 0
|