| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- # coding=utf-8
- """
- @project: maxkb
- @Author:虎
- @file: text_split_handle.py
- @date:2024/3/27 18:19
- @desc:
- """
- import os
- import re
- import tempfile
- import time
- import traceback
- from typing import List
- import fitz
- from django.utils.translation import gettext_lazy as _
- from common.handle.base_split_handle import BaseSplitHandle
- from common.utils.logger import maxkb_logger
- from common.utils.split_model import SplitModel, smart_split_paragraph
- default_pattern_list = [re.compile('(?<=^)# .*|(?<=\\n)# .*'),
- re.compile('(?<=\\n)(?<!#)## (?!#).*|(?<=^)(?<!#)## (?!#).*'),
- re.compile("(?<=\\n)(?<!#)### (?!#).*|(?<=^)(?<!#)### (?!#).*"),
- re.compile("(?<=\\n)(?<!#)#### (?!#).*|(?<=^)(?<!#)#### (?!#).*"),
- re.compile("(?<=\\n)(?<!#)##### (?!#).*|(?<=^)(?<!#)##### (?!#).*"),
- re.compile("(?<=\\n)(?<!#)###### (?!#).*|(?<=^)(?<!#)###### (?!#).*"),
- re.compile("(?<!\n)\n\n+")]
- def check_links_in_pdf(doc):
- for page_number in range(len(doc)):
- page = doc[page_number]
- links = page.get_links()
- if links:
- for link in links:
- if link['kind'] == 1:
- return True
- return False
- class PdfSplitHandle(BaseSplitHandle):
- def handle(self, file, pattern_list: List, with_filter: bool, limit: int, get_buffer, save_image):
- with tempfile.NamedTemporaryFile(delete=False) as temp_file:
- # 将上传的文件保存到临时文件中
- for chunk in file.chunks():
- temp_file.write(chunk)
- # 获取临时文件的路径
- temp_file_path = temp_file.name
- pdf_document = fitz.open(temp_file_path)
- try:
- if type(limit) is str:
- limit = int(limit)
- if type(with_filter) is str:
- with_filter = with_filter.lower() == 'true'
- # 处理有目录的pdf
- result = self.handle_toc(pdf_document, limit)
- if result is not None:
- return {'name': file.name, 'content': result}
- # 没目录但是有链接的pdf
- result = self.handle_links(pdf_document, pattern_list, with_filter, limit)
- if result is not None and len(result) > 0:
- return {'name': file.name, 'content': result}
- # 没有目录的pdf
- content = self.handle_pdf_content(file, pdf_document)
- if pattern_list is not None and len(pattern_list) > 0:
- split_model = SplitModel(pattern_list, with_filter, limit)
- else:
- split_model = SplitModel(default_pattern_list, with_filter=with_filter, limit=limit)
- except BaseException as e:
- maxkb_logger.error(f"File: {file.name}, error: {e}, {traceback.format_exc()}")
- return {
- 'name': file.name,
- 'content': []
- }
- finally:
- pdf_document.close()
- # 处理完后可以删除临时文件
- os.remove(temp_file_path)
- return {
- 'name': file.name,
- 'content': split_model.parse(content)
- }
- @staticmethod
- def handle_pdf_content(file, pdf_document):
- # 第一步:收集所有字体大小
- font_sizes = []
- for page_num in range(len(pdf_document)):
- page = pdf_document.load_page(page_num)
- blocks = page.get_text("dict")["blocks"]
- for block in blocks:
- if block["type"] == 0:
- for line in block["lines"]:
- for span in line["spans"]:
- if span["size"] > 0:
- font_sizes.append(span["size"])
- # 计算正文字体大小(众数)
- if not font_sizes:
- body_font_size = 12
- else:
- from collections import Counter
- body_font_size = Counter(font_sizes).most_common(1)[0][0]
- # 第二步:提取内容
- content = ""
- for page_num in range(len(pdf_document)):
- start_time = time.time()
- page = pdf_document.load_page(page_num)
- blocks = page.get_text("dict")["blocks"]
- for block in blocks:
- if block["type"] == 0: # 文本块
- for line in block["lines"]:
- if not line["spans"]:
- continue
- text = "".join([span["text"] for span in line["spans"]])
- font_size = line["spans"][0]["size"]
- # 根据与正文字体的差值判断
- size_diff = font_size - body_font_size
- if size_diff > 2: # 明显大于正文
- content += f"## {text}\n\n"
- elif size_diff > 0.5: # 略大于正文
- content += f"### {text}\n\n"
- else: # 正文
- content += f"{text}\n"
- elif block["type"] == 1: # 图片块
- content += f"\n\n"
- content = content.replace('\0', '')
- elapsed_time = time.time() - start_time
- maxkb_logger.debug(
- f"File: {file.name}, Page: {page_num + 1}, Time: {elapsed_time:.3f}s")
- return content
- @staticmethod
- def handle_toc(doc, limit):
- # 找到目录
- toc = doc.get_toc()
- if toc is None or len(toc) == 0:
- return None
- # 创建存储章节内容的数组
- chapters = []
- # 遍历目录并按章节提取文本
- for i, entry in enumerate(toc):
- level, title, start_page = entry
- start_page -= 1 # PyMuPDF 页码从 0 开始,书签页码从 1 开始
- chapter_title = title
- # 确定结束页码,如果是最后一个章节则到文档末尾
- if i + 1 < len(toc):
- end_page = toc[i + 1][2] - 1
- else:
- end_page = doc.page_count - 1
- # 去掉标题中的符号
- title = PdfSplitHandle.handle_chapter_title(title)
- # 提取该章节的文本内容
- chapter_text = ""
- for page_num in range(start_page, end_page + 1):
- page = doc.load_page(page_num) # 加载页面
- text = page.get_text("text")
- text = re.sub(r'(?<!。)\n+', '', text)
- text = re.sub(r'(?<!.)\n+', '', text)
- # print(f'title: {title}')
- idx = text.find(title)
- if idx > -1:
- text = text[idx + len(title):]
- if i + 1 < len(toc):
- l, next_title, next_start_page = toc[i + 1]
- next_title = PdfSplitHandle.handle_chapter_title(next_title)
- # print(f'next_title: {next_title}')
- idx = text.find(next_title)
- if idx > -1:
- text = text[:idx]
- chapter_text += text # 提取文本
- # Null characters are not allowed.
- chapter_text = chapter_text.replace('\0', '')
- # 限制标题长度
- real_chapter_title = chapter_title[:256]
- # 限制章节内容长度
- if 0 < limit < len(chapter_text):
- split_text = smart_split_paragraph(chapter_text, limit)
- for text in split_text:
- chapters.append({"title": real_chapter_title, "content": text})
- else:
- chapters.append(
- {"title": real_chapter_title, "content": chapter_text if chapter_text else real_chapter_title})
- # 保存章节内容和章节标题
- return chapters
- @staticmethod
- def handle_links(doc, pattern_list, with_filter, limit):
- # 检查文档是否包含内部链接
- if not check_links_in_pdf(doc):
- return
- # 创建存储章节内容的数组
- chapters = []
- toc_start_page = -1
- page_content = ""
- handle_pre_toc = True
- # 遍历 PDF 的每一页,查找带有目录链接的页
- for page_num in range(doc.page_count):
- page = doc.load_page(page_num)
- links = page.get_links()
- # 如果目录开始页码未设置,则设置为当前页码
- if len(links) > 0:
- toc_start_page = page_num
- if toc_start_page < 0:
- page_content += page.get_text('text')
- # 检查该页是否包含内部链接(即指向文档内部的页面)
- for num in range(len(links)):
- link = links[num]
- if link['kind'] == 1: # 'kind' 为 1 表示内部链接
- # 获取链接目标的页面
- dest_page = link['page']
- rect = link['from'] # 获取链接的矩形区域
- # 如果目录开始页码包括前言部分,则不处理前言部分
- if dest_page < toc_start_page:
- handle_pre_toc = False
- # 提取链接区域的文本作为标题
- link_title = page.get_text("text", clip=rect).strip().split("\n")[0].replace('.', '').strip()
- # print(f'link_title: {link_title}')
- # 提取目标页面内容作为章节开始
- start_page = dest_page
- end_page = dest_page
- # 下一个link
- next_link = links[num + 1] if num + 1 < len(links) else None
- next_link_title = None
- if next_link is not None and next_link['kind'] == 1:
- rect = next_link['from']
- next_link_title = page.get_text("text", clip=rect).strip() \
- .split("\n")[0].replace('.', '').strip()
- # print(f'next_link_title: {next_link_title}')
- end_page = next_link['page']
- # 提取章节内容
- chapter_text = ""
- for p_num in range(start_page, end_page + 1):
- p = doc.load_page(p_num)
- text = p.get_text("text")
- text = re.sub(r'(?<!。)\n+', '', text)
- text = re.sub(r'(?<!.)\n+', '', text)
- # print(f'\n{text}\n')
- idx = text.find(link_title)
- if idx > -1:
- text = text[idx + len(link_title):]
- if next_link_title is not None:
- idx = text.find(next_link_title)
- if idx > -1:
- text = text[:idx]
- chapter_text += text
- # Null characters are not allowed.
- chapter_text = chapter_text.replace('\0', '')
- # 限制章节内容长度
- if 0 < limit < len(chapter_text):
- split_text = smart_split_paragraph(chapter_text, limit)
- for text in split_text:
- chapters.append({"title": link_title, "content": text})
- else:
- # 保存章节信息
- chapters.append({"title": link_title, "content": chapter_text})
- # 目录中没有前言部分,手动处理
- if handle_pre_toc:
- pre_toc = []
- lines = page_content.strip().split('\n')
- try:
- for line in lines:
- if re.match(r'^前\s*言', line):
- pre_toc.append({'title': line, 'content': ''})
- else:
- pre_toc[-1]['content'] += line
- for i in range(len(pre_toc)):
- pre_toc[i]['content'] = re.sub(r'(?<!。)\n+', '', pre_toc[i]['content'])
- pre_toc[i]['content'] = re.sub(r'(?<!.)\n+', '', pre_toc[i]['content'])
- except BaseException as e:
- maxkb_logger.error(_('This document has no preface and is treated as ordinary text: {e}').format(e=e))
- if pattern_list is not None and len(pattern_list) > 0:
- split_model = SplitModel(pattern_list, with_filter, limit)
- else:
- split_model = SplitModel(default_pattern_list, with_filter=with_filter, limit=limit)
- # 插入目录前的部分
- page_content = re.sub(r'(?<!。)\n+', '', page_content)
- page_content = re.sub(r'(?<!.)\n+', '', page_content)
- page_content = page_content.strip()
- pre_toc = split_model.parse(page_content)
- chapters = pre_toc + chapters
- return chapters
- @staticmethod
- def handle_chapter_title(title):
- title = re.sub(r'[一二三四五六七八九十\s*]、\s*', '', title)
- title = re.sub(r'第[一二三四五六七八九十]章\s*', '', title)
- return title
- def support(self, file, get_buffer):
- file_name: str = file.name.lower()
- if file_name.endswith(".pdf") or file_name.endswith(".PDF"):
- return True
- return False
- def get_content(self, file, save_image):
- with tempfile.NamedTemporaryFile(delete=False) as temp_file:
- # 将上传的文件保存到临时文件中
- temp_file.write(file.read())
- # 获取临时文件的路径
- temp_file_path = temp_file.name
- pdf_document = fitz.open(temp_file_path)
- try:
- return self.handle_pdf_content(file, pdf_document)
- except BaseException as e:
- traceback.print_exception(e)
- return f'{e}'
|