xlsx_parse_table_handle.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # coding=utf-8
  2. import io
  3. import logging
  4. import traceback
  5. from openpyxl import load_workbook
  6. from common.handle.base_parse_table_handle import BaseParseTableHandle
  7. from common.handle.impl.common_handle import xlsx_embed_cells_images
  8. from common.utils.logger import maxkb_logger
  9. class XlsxParseTableHandle(BaseParseTableHandle):
  10. def support(self, file, get_buffer):
  11. file_name: str = file.name.lower()
  12. if file_name.endswith('.xlsx'):
  13. return True
  14. return False
  15. def fill_merged_cells(self, sheet, image_dict):
  16. data = []
  17. # 获取第一行作为标题行
  18. headers = []
  19. for idx, cell in enumerate(sheet[1]):
  20. if cell.value is None:
  21. headers.append(' ' * (idx + 1))
  22. else:
  23. headers.append(cell.value)
  24. # 从第二行开始遍历每一行
  25. for row in sheet.iter_rows(min_row=2, values_only=False):
  26. row_data = {}
  27. for col_idx, cell in enumerate(row):
  28. cell_value = cell.value
  29. # 如果单元格为空,并且该单元格在合并单元格内,获取合并单元格的值
  30. if cell_value is None:
  31. for merged_range in sheet.merged_cells.ranges:
  32. if cell.coordinate in merged_range:
  33. cell_value = sheet[merged_range.min_row][merged_range.min_col - 1].value
  34. break
  35. if cell_value is None:
  36. cell_value = ''
  37. image = image_dict.get(cell_value, None)
  38. if image is not None:
  39. cell_value = f'![](./oss/file/{image.id})'
  40. # 使用标题作为键,单元格的值作为值存入字典
  41. row_data[headers[col_idx]] = cell_value
  42. data.append(row_data)
  43. return data
  44. def handle(self, file, get_buffer, save_image):
  45. buffer = get_buffer(file)
  46. try:
  47. wb = load_workbook(io.BytesIO(buffer))
  48. try:
  49. image_dict: dict = xlsx_embed_cells_images(io.BytesIO(buffer))
  50. save_image([item for item in image_dict.values()])
  51. except Exception as e:
  52. image_dict = {}
  53. result = []
  54. for sheetname in wb.sheetnames:
  55. paragraphs = []
  56. ws = wb[sheetname]
  57. data = self.fill_merged_cells(ws, image_dict)
  58. for row in data:
  59. row_output = "; ".join([f"{key}: {value}" for key, value in row.items()])
  60. # print(row_output)
  61. paragraphs.append({'title': '', 'content': row_output})
  62. result.append({'name': sheetname, 'paragraphs': paragraphs})
  63. except BaseException as e:
  64. maxkb_logger.error(f"Error processing XLSX file {file.name}: {e}, {traceback.format_exc()}")
  65. return [{'name': file.name, 'paragraphs': []}]
  66. return result
  67. def get_content(self, file, save_image):
  68. try:
  69. # 加载 Excel 文件
  70. workbook = load_workbook(file)
  71. try:
  72. image_dict: dict = xlsx_embed_cells_images(file)
  73. if len(image_dict) > 0:
  74. save_image(image_dict.values())
  75. except Exception as e:
  76. maxkb_logger.error(f'Exception: {e}')
  77. image_dict = {}
  78. md_tables = ''
  79. # 遍历所有工作表
  80. for sheetname in workbook.sheetnames:
  81. sheet = workbook[sheetname]
  82. rows = self.fill_merged_cells(sheet, image_dict)
  83. if len(rows) == 0:
  84. continue
  85. # 添加 sheet 名称作为标题
  86. md_tables += f'## {sheetname}\n\n'
  87. # 提取表头和内容
  88. headers = [f"{key}" for key, value in rows[0].items()]
  89. # 构建 Markdown 表格
  90. md_table = '| ' + ' | '.join(headers) + ' |\n'
  91. md_table += '| ' + ' | '.join(['---'] * len(headers)) + ' |\n'
  92. for row in rows:
  93. r = [f'{value}' for key, value in row.items()]
  94. md_table += '| ' + ' | '.join(
  95. [str(cell).replace('\n', '<br>') if cell is not None else '' for cell in r]) + ' |\n'
  96. md_tables += md_table + '\n\n'
  97. return md_tables
  98. except Exception as e:
  99. maxkb_logger.error(f'excel split handle error: {e}')
  100. return f'error: {e}'