| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- # coding=utf-8
- """
- @project: MaxKB
- @Author:虎
- @file: tools.py
- @date:2024/9/11 16:41
- @desc:
- """
- import io
- import traceback
- from functools import reduce
- from io import BytesIO
- from xml.etree.ElementTree import fromstring
- from zipfile import ZipFile
- import uuid_utils.compat as uuid
- from PIL import Image as PILImage
- from openpyxl.drawing.image import Image as openpyxl_Image
- from openpyxl.packaging.relationship import get_rels_path, get_dependents
- from openpyxl.xml.constants import SHEET_DRAWING_NS, REL_NS, SHEET_MAIN_NS
- from common.utils.logger import maxkb_logger
- from knowledge.models import File
- from PIL import ImageFile
- ImageFile.LOAD_TRUNCATED_IMAGES = True
- PILImage.MAX_IMAGE_PIXELS = None
- def parse_element(element) -> {}:
- data = {}
- xdr_namespace = "{%s}" % SHEET_DRAWING_NS
- targets = level_order_traversal(element, xdr_namespace + "nvPicPr")
- for target in targets:
- cNvPr = embed = ""
- for child in target:
- if child.tag == xdr_namespace + "nvPicPr":
- cNvPr = child[0].attrib["name"]
- elif child.tag == xdr_namespace + "blipFill":
- _rel_embed = "{%s}embed" % REL_NS
- embed = child[0].attrib[_rel_embed]
- if cNvPr:
- data[cNvPr] = embed
- return data
- def parse_element_sheet_xml(element) -> []:
- data = []
- xdr_namespace = "{%s}" % SHEET_MAIN_NS
- targets = level_order_traversal(element, xdr_namespace + "f")
- for target in targets:
- for child in target:
- if child.tag == xdr_namespace + "f":
- data.append(child.text)
- return data
- def level_order_traversal(root, flag: str) -> []:
- queue = [root]
- targets = []
- while queue:
- node = queue.pop(0)
- children = [child.tag for child in node]
- if flag in children:
- targets.append(node)
- continue
- for child in node:
- queue.append(child)
- return targets
- def handle_images(deps, archive: ZipFile) -> []:
- images = []
- if not PILImage: # Pillow not installed, drop images
- return images
- for dep in deps:
- try:
- image_io = archive.read(dep.target)
- image = openpyxl_Image(BytesIO(image_io))
- except Exception as e:
- maxkb_logger.error(f"Error reading image {dep.target}: {e}, {traceback.format_exc()}")
- continue
- image.embed = dep.id # 文件rId
- image.target = dep.target # 文件地址
- images.append(image)
- return images
- def xlsx_embed_cells_images(buffer) -> {}:
- archive = ZipFile(buffer)
- # 解析cellImage.xml文件
- deps = get_dependents(archive, get_rels_path("xl/cellimages.xml"))
- image_rel = handle_images(deps=deps, archive=archive)
- # 工作表及其中图片ID
- sheet_list = {}
- for item in archive.namelist():
- if not item.startswith('xl/worksheets/sheet'):
- continue
- key = item.split('/')[-1].split('.')[0].split('sheet')[-1]
- sheet_list[key] = parse_element_sheet_xml(fromstring(archive.read(item)))
- cell_images_xml = parse_element(fromstring(archive.read("xl/cellimages.xml")))
- cell_images_rel = {}
- for image in image_rel:
- cell_images_rel[image.embed] = image
- for cnv, embed in cell_images_xml.items():
- cell_images_xml[cnv] = cell_images_rel.get(embed)
- result = {}
- for key, img in cell_images_xml.items():
- all_cells = [
- cell
- for _sheet_id, sheet in sheet_list.items()
- if sheet is not None
- for cell in sheet or []
- ]
- image_excel_id_list = [
- cell for cell in all_cells
- if isinstance(cell, str) and key in cell
- ]
- # print(key, img)
- if img is None:
- continue
- if len(image_excel_id_list) > 0:
- image_excel_id = image_excel_id_list[-1]
- f = archive.open(img.target)
- img_byte = io.BytesIO()
- im = PILImage.open(f).convert('RGB')
- im.save(img_byte, format='JPEG')
- image = File(id=uuid.uuid7(), file_name=img.path, meta={'debug': False, 'content': img_byte.getvalue()})
- result['=' + image_excel_id] = image
- archive.close()
- return result
|