xlsx_split_handle.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. # coding=utf-8
  2. """
  3. @project: maxkb
  4. @Author:虎
  5. @file: xlsx_parse_qa_handle.py
  6. @date:2024/5/21 14:59
  7. @desc:
  8. """
  9. import io
  10. import traceback
  11. from typing import List
  12. import openpyxl
  13. from openpyxl import load_workbook
  14. from common.handle.base_split_handle import BaseSplitHandle
  15. from common.handle.impl.common_handle import xlsx_embed_cells_images
  16. from common.utils.logger import maxkb_logger
  17. splitter = '\n`-----------------------------------`\n'
  18. def post_cell(image_dict, cell_value):
  19. image = image_dict.get(cell_value, None)
  20. if image is not None:
  21. return f'![](./oss/file/{image.id})'
  22. return cell_value.replace('\n', '<br>').replace('|', '&#124;')
  23. def row_to_md(row, image_dict):
  24. return '| ' + ' | '.join(
  25. [post_cell(image_dict, str(cell.value if cell.value is not None else '')) if cell is not None else '' for cell
  26. in row]) + ' |\n'
  27. def handle_sheet(file_name, sheet, image_dict, limit: int):
  28. rows = sheet.rows
  29. paragraphs = []
  30. result = {'name': file_name, 'content': paragraphs}
  31. try:
  32. title_row_list = next(rows)
  33. title_md_content = row_to_md(title_row_list, image_dict)
  34. title_md_content += '| ' + ' | '.join(
  35. ['---' if cell is not None else '' for cell in title_row_list]) + ' |\n'
  36. except Exception as e:
  37. return result
  38. if len(title_row_list) == 0:
  39. return result
  40. result_item_content = ''
  41. for row in rows:
  42. next_md_content = row_to_md(row, image_dict)
  43. next_md_content_len = len(next_md_content)
  44. result_item_content_len = len(result_item_content)
  45. if len(result_item_content) == 0:
  46. result_item_content += title_md_content
  47. result_item_content += next_md_content
  48. else:
  49. if result_item_content_len + next_md_content_len < limit:
  50. result_item_content += next_md_content
  51. else:
  52. paragraphs.append({'content': result_item_content, 'title': ''})
  53. result_item_content = title_md_content + next_md_content
  54. if len(result_item_content) > 0:
  55. paragraphs.append({'content': result_item_content, 'title': ''})
  56. return result
  57. class XlsxSplitHandle(BaseSplitHandle):
  58. def fill_merged_cells(self, sheet, image_dict):
  59. data = []
  60. # 获取第一行作为标题行
  61. headers = []
  62. for idx, cell in enumerate(sheet[1]):
  63. if cell.value is None:
  64. headers.append(' ' * (idx + 1))
  65. else:
  66. headers.append(cell.value)
  67. # 从第二行开始遍历每一行
  68. for row in sheet.iter_rows(min_row=2, values_only=False):
  69. row_data = {}
  70. for col_idx, cell in enumerate(row):
  71. cell_value = cell.value
  72. # 如果单元格为空,并且该单元格在合并单元格内,获取合并单元格的值
  73. if cell_value is None:
  74. for merged_range in sheet.merged_cells.ranges:
  75. if cell.coordinate in merged_range:
  76. cell_value = sheet[merged_range.min_row][merged_range.min_col - 1].value
  77. break
  78. image = image_dict.get(cell_value, None)
  79. if image is not None:
  80. cell_value = f'![](./oss/file/{image.id})'
  81. # 使用标题作为键,单元格的值作为值存入字典
  82. row_data[headers[col_idx]] = cell_value
  83. data.append(row_data)
  84. return data
  85. def handle(self, file, pattern_list: List, with_filter: bool, limit: int, get_buffer, save_image):
  86. buffer = get_buffer(file)
  87. try:
  88. if type(limit) is str:
  89. limit = int(limit)
  90. workbook = openpyxl.load_workbook(io.BytesIO(buffer))
  91. try:
  92. image_dict: dict = xlsx_embed_cells_images(io.BytesIO(buffer))
  93. save_image([item for item in image_dict.values()])
  94. except Exception as e:
  95. image_dict = {}
  96. worksheets = workbook.worksheets
  97. worksheets_size = len(worksheets)
  98. return [row for row in
  99. [handle_sheet(file.name,
  100. sheet,
  101. image_dict,
  102. limit) if worksheets_size == 1 and sheet.title == 'Sheet1' else handle_sheet(
  103. sheet.title, sheet, image_dict, limit) for sheet
  104. in worksheets] if row is not None]
  105. except Exception as e:
  106. maxkb_logger.error(f"Error processing XLSX file {file.name}: {e}, {traceback.format_exc()}")
  107. return [{'name': file.name, 'content': []}]
  108. def get_content(self, file, save_image):
  109. try:
  110. # 加载 Excel 文件
  111. workbook = load_workbook(file)
  112. try:
  113. image_dict: dict = xlsx_embed_cells_images(file)
  114. if len(image_dict) > 0:
  115. save_image(image_dict.values())
  116. except Exception as e:
  117. maxkb_logger.error(f'Exception: {e}')
  118. image_dict = {}
  119. md_tables = ''
  120. # 遍历所有工作表
  121. for sheetname in workbook.sheetnames:
  122. sheet = workbook[sheetname]
  123. rows = self.fill_merged_cells(sheet, image_dict)
  124. if len(rows) == 0:
  125. continue
  126. # 添加 sheet 名称作为标题
  127. md_tables += f'## {sheetname}\n\n'
  128. # 提取表头和内容
  129. headers = [f"{key}" for key, value in rows[0].items()]
  130. # 构建 Markdown 表格
  131. md_table = '| ' + ' | '.join(headers) + ' |\n'
  132. md_table += '| ' + ' | '.join(['---'] * len(headers)) + ' |\n'
  133. for row in rows:
  134. r = [self._escape_cell_content(value) for key, value in row.items()]
  135. md_table += '| ' + ' | '.join(r) + ' |\n'
  136. md_tables += md_table + '\n\n'
  137. return md_tables
  138. except Exception as e:
  139. maxkb_logger.error(f'excel split handle error: {e}')
  140. return f'error: {e}'
  141. def _escape_cell_content(self, cell_value):
  142. """转义单元格内容,避免破坏 Markdown 表格结构"""
  143. if cell_value is None:
  144. return ''
  145. cell_str = str(cell_value)
  146. # 替换换行符为 <br>
  147. cell_str = cell_str.replace('\n', '<br>')
  148. # 转义管道符 | 为 HTML 实体
  149. cell_str = cell_str.replace('|', '&#124;')
  150. # 如果内容包含反引号,需要转义
  151. if '`' in cell_str:
  152. cell_str = cell_str.replace('`', '&#96;')
  153. return cell_str
  154. def support(self, file, get_buffer):
  155. file_name: str = file.name.lower()
  156. if file_name.endswith(".xlsx"):
  157. return True
  158. return False