common_handle.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. # coding=utf-8
  2. """
  3. @project: MaxKB
  4. @Author:虎
  5. @file: tools.py
  6. @date:2024/9/11 16:41
  7. @desc:
  8. """
  9. import io
  10. import traceback
  11. from functools import reduce
  12. from io import BytesIO
  13. from xml.etree.ElementTree import fromstring
  14. from zipfile import ZipFile
  15. import uuid_utils.compat as uuid
  16. from PIL import Image as PILImage
  17. from openpyxl.drawing.image import Image as openpyxl_Image
  18. from openpyxl.packaging.relationship import get_rels_path, get_dependents
  19. from openpyxl.xml.constants import SHEET_DRAWING_NS, REL_NS, SHEET_MAIN_NS
  20. from common.utils.logger import maxkb_logger
  21. from knowledge.models import File
  22. from PIL import ImageFile
  23. ImageFile.LOAD_TRUNCATED_IMAGES = True
  24. PILImage.MAX_IMAGE_PIXELS = None
  25. def parse_element(element) -> {}:
  26. data = {}
  27. xdr_namespace = "{%s}" % SHEET_DRAWING_NS
  28. targets = level_order_traversal(element, xdr_namespace + "nvPicPr")
  29. for target in targets:
  30. cNvPr = embed = ""
  31. for child in target:
  32. if child.tag == xdr_namespace + "nvPicPr":
  33. cNvPr = child[0].attrib["name"]
  34. elif child.tag == xdr_namespace + "blipFill":
  35. _rel_embed = "{%s}embed" % REL_NS
  36. embed = child[0].attrib[_rel_embed]
  37. if cNvPr:
  38. data[cNvPr] = embed
  39. return data
  40. def parse_element_sheet_xml(element) -> []:
  41. data = []
  42. xdr_namespace = "{%s}" % SHEET_MAIN_NS
  43. targets = level_order_traversal(element, xdr_namespace + "f")
  44. for target in targets:
  45. for child in target:
  46. if child.tag == xdr_namespace + "f":
  47. data.append(child.text)
  48. return data
  49. def level_order_traversal(root, flag: str) -> []:
  50. queue = [root]
  51. targets = []
  52. while queue:
  53. node = queue.pop(0)
  54. children = [child.tag for child in node]
  55. if flag in children:
  56. targets.append(node)
  57. continue
  58. for child in node:
  59. queue.append(child)
  60. return targets
  61. def handle_images(deps, archive: ZipFile) -> []:
  62. images = []
  63. if not PILImage: # Pillow not installed, drop images
  64. return images
  65. for dep in deps:
  66. try:
  67. image_io = archive.read(dep.target)
  68. image = openpyxl_Image(BytesIO(image_io))
  69. except Exception as e:
  70. maxkb_logger.error(f"Error reading image {dep.target}: {e}, {traceback.format_exc()}")
  71. continue
  72. image.embed = dep.id # 文件rId
  73. image.target = dep.target # 文件地址
  74. images.append(image)
  75. return images
  76. def xlsx_embed_cells_images(buffer) -> {}:
  77. archive = ZipFile(buffer)
  78. # 解析cellImage.xml文件
  79. deps = get_dependents(archive, get_rels_path("xl/cellimages.xml"))
  80. image_rel = handle_images(deps=deps, archive=archive)
  81. # 工作表及其中图片ID
  82. sheet_list = {}
  83. for item in archive.namelist():
  84. if not item.startswith('xl/worksheets/sheet'):
  85. continue
  86. key = item.split('/')[-1].split('.')[0].split('sheet')[-1]
  87. sheet_list[key] = parse_element_sheet_xml(fromstring(archive.read(item)))
  88. cell_images_xml = parse_element(fromstring(archive.read("xl/cellimages.xml")))
  89. cell_images_rel = {}
  90. for image in image_rel:
  91. cell_images_rel[image.embed] = image
  92. for cnv, embed in cell_images_xml.items():
  93. cell_images_xml[cnv] = cell_images_rel.get(embed)
  94. result = {}
  95. for key, img in cell_images_xml.items():
  96. all_cells = [
  97. cell
  98. for _sheet_id, sheet in sheet_list.items()
  99. if sheet is not None
  100. for cell in sheet or []
  101. ]
  102. image_excel_id_list = [
  103. cell for cell in all_cells
  104. if isinstance(cell, str) and key in cell
  105. ]
  106. # print(key, img)
  107. if img is None:
  108. continue
  109. if len(image_excel_id_list) > 0:
  110. image_excel_id = image_excel_id_list[-1]
  111. f = archive.open(img.target)
  112. img_byte = io.BytesIO()
  113. im = PILImage.open(f).convert('RGB')
  114. im.save(img_byte, format='JPEG')
  115. image = File(id=uuid.uuid7(), file_name=img.path, meta={'debug': False, 'content': img_byte.getvalue()})
  116. result['=' + image_excel_id] = image
  117. archive.close()
  118. return result