zip_parse_qa_handle.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # coding=utf-8
  2. """
  3. @project: maxkb
  4. @Author:虎
  5. @file: text_split_handle.py
  6. @date:2024/3/27 18:19
  7. @desc:
  8. """
  9. import io
  10. import os
  11. import re
  12. import zipfile
  13. from typing import List
  14. from urllib.parse import urljoin
  15. import uuid_utils.compat as uuid
  16. from django.utils.translation import gettext_lazy as _
  17. from common.handle.base_parse_qa_handle import BaseParseQAHandle
  18. from common.handle.impl.qa.csv_parse_qa_handle import CsvParseQAHandle
  19. from common.handle.impl.qa.xls_parse_qa_handle import XlsParseQAHandle
  20. from common.handle.impl.qa.xlsx_parse_qa_handle import XlsxParseQAHandle
  21. from common.utils.common import parse_md_image
  22. from knowledge.models import File
  23. class FileBufferHandle:
  24. buffer = None
  25. def get_buffer(self, file):
  26. if self.buffer is None:
  27. self.buffer = file.read()
  28. return self.buffer
  29. split_handles = [
  30. XlsParseQAHandle(),
  31. XlsxParseQAHandle(),
  32. CsvParseQAHandle()
  33. ]
  34. def file_to_paragraph(file, save_inner_image):
  35. """
  36. 文件转换为段落列表
  37. @param file: 文件
  38. @return: {
  39. name:文件名
  40. paragraphs:段落列表
  41. }
  42. """
  43. get_buffer = FileBufferHandle().get_buffer
  44. for split_handle in split_handles:
  45. if split_handle.support(file, get_buffer):
  46. return split_handle.handle(file, get_buffer, save_inner_image)
  47. raise Exception(_("Unsupported file format"))
  48. def is_valid_uuid(uuid_str: str):
  49. """
  50. 校验字符串是否是uuid
  51. @param uuid_str: 需要校验的字符串
  52. @return: bool
  53. """
  54. try:
  55. uuid.UUID(uuid_str)
  56. except ValueError:
  57. return False
  58. return True
  59. def get_image_list(result_list: list, zip_files: List[str]):
  60. """
  61. 获取图片文件列表
  62. @param result_list:
  63. @param zip_files:
  64. @return:
  65. """
  66. image_file_list = []
  67. for result in result_list:
  68. for p in result.get('paragraphs', []):
  69. content: str = p.get('content', '')
  70. image_list = parse_md_image(content)
  71. for image in image_list:
  72. search = re.search("\(.*\)", image)
  73. if search:
  74. new_image_id = str(uuid.uuid7())
  75. source_image_path = search.group().replace('(', '').replace(')', '')
  76. image_path = urljoin(result.get('name'), '.' + source_image_path if source_image_path.startswith(
  77. '/') else source_image_path)
  78. if not zip_files.__contains__(image_path):
  79. continue
  80. if image_path.startswith('oss/file/') or image_path.startswith('oss/image/'):
  81. image_id = image_path.replace('oss/file/', '')
  82. if is_valid_uuid(image_id):
  83. image_file_list.append({'source_file': image_path,
  84. 'image_id': image_id})
  85. else:
  86. image_file_list.append({'source_file': image_path,
  87. 'image_id': new_image_id})
  88. content = content.replace(source_image_path, f'./oss/file/{new_image_id}')
  89. p['content'] = content
  90. else:
  91. image_file_list.append({'source_file': image_path,
  92. 'image_id': new_image_id})
  93. content = content.replace(source_image_path, f'./oss/file/{new_image_id}')
  94. p['content'] = content
  95. return image_file_list
  96. def filter_image_file(result_list: list, image_list):
  97. image_source_file_list = [image.get('source_file') for image in image_list]
  98. return [r for r in result_list if not image_source_file_list.__contains__(r.get('name', ''))]
  99. class ZipParseQAHandle(BaseParseQAHandle):
  100. def handle(self, file, get_buffer, save_image):
  101. buffer = get_buffer(file)
  102. bytes_io = io.BytesIO(buffer)
  103. result = []
  104. # 打开zip文件
  105. with zipfile.ZipFile(bytes_io, 'r') as zip_ref:
  106. # 获取压缩包中的文件名列表
  107. files = zip_ref.namelist()
  108. # 读取压缩包中的文件内容
  109. for file in files:
  110. # 跳过 macOS 特有的元数据目录和文件
  111. if file.endswith('/') or file.startswith('__MACOSX'):
  112. continue
  113. with zip_ref.open(file) as f:
  114. # 对文件内容进行处理
  115. try:
  116. value = file_to_paragraph(f, save_image)
  117. if isinstance(value, list):
  118. result = [*result, *value]
  119. else:
  120. result.append(value)
  121. except Exception:
  122. pass
  123. image_list = get_image_list(result, files)
  124. result = filter_image_file(result, image_list)
  125. image_mode_list = []
  126. for image in image_list:
  127. with zip_ref.open(image.get('source_file')) as f:
  128. i = File(
  129. id=image.get('image_id'),
  130. file_name=os.path.basename(image.get('source_file')),
  131. meta={'debug': False, 'content': f.read()}
  132. )
  133. image_mode_list.append(i)
  134. save_image(image_mode_list)
  135. return result
  136. def support(self, file, get_buffer):
  137. file_name: str = file.name.lower()
  138. if file_name.endswith(".zip") or file_name.endswith(".ZIP"):
  139. return True
  140. return False