xls_parse_table_handle.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. # coding=utf-8
  2. import logging
  3. import traceback
  4. import xlrd
  5. from common.handle.base_parse_table_handle import BaseParseTableHandle
  6. from common.utils.logger import maxkb_logger
  7. class XlsParseTableHandle(BaseParseTableHandle):
  8. def support(self, file, get_buffer):
  9. file_name: str = file.name.lower()
  10. buffer = get_buffer(file)
  11. if file_name.endswith(".xls") and xlrd.inspect_format(content=buffer):
  12. return True
  13. return False
  14. def handle(self, file, get_buffer, save_image):
  15. buffer = get_buffer(file)
  16. try:
  17. wb = xlrd.open_workbook(file_contents=buffer, formatting_info=True)
  18. result = []
  19. sheets = wb.sheets()
  20. for sheet in sheets:
  21. # 获取合并单元格的范围信息
  22. merged_cells = sheet.merged_cells
  23. data = []
  24. paragraphs = []
  25. # 获取第一行作为标题行
  26. headers = [sheet.cell_value(0, col_idx) for col_idx in range(sheet.ncols)]
  27. # 从第二行开始遍历每一行(跳过标题行)
  28. for row_idx in range(1, sheet.nrows):
  29. row_data = {}
  30. for col_idx in range(sheet.ncols):
  31. cell_value = sheet.cell_value(row_idx, col_idx)
  32. # 检查是否为空单元格,如果为空检查是否在合并区域中
  33. if cell_value == "":
  34. # 检查当前单元格是否在合并区域
  35. for (rlo, rhi, clo, chi) in merged_cells:
  36. if rlo <= row_idx < rhi and clo <= col_idx < chi:
  37. # 使用合并区域的左上角单元格的值
  38. cell_value = sheet.cell_value(rlo, clo)
  39. break
  40. # 将标题作为键,单元格的值作为值存入字典
  41. row_data[headers[col_idx]] = cell_value
  42. data.append(row_data)
  43. for row in data:
  44. row_output = "; ".join([f"{key}: {value}" for key, value in row.items()])
  45. # print(row_output)
  46. paragraphs.append({'title': '', 'content': row_output})
  47. result.append({'name': sheet.name, 'paragraphs': paragraphs})
  48. except BaseException as e:
  49. maxkb_logger.error(f"Error processing XLS file {file.name}: {e}, {traceback.format_exc()}")
  50. return [{'name': file.name, 'paragraphs': []}]
  51. return result
  52. def get_content(self, file, save_image):
  53. # 打开 .xls 文件
  54. try:
  55. workbook = xlrd.open_workbook(file_contents=file.read(), formatting_info=True)
  56. sheets = workbook.sheets()
  57. md_tables = ''
  58. for sheet in sheets:
  59. # 过滤空白的sheet
  60. if sheet.nrows == 0 or sheet.ncols == 0:
  61. continue
  62. # 获取表头和内容
  63. headers = sheet.row_values(0)
  64. data = [sheet.row_values(row_idx) for row_idx in range(1, sheet.nrows)]
  65. # 构建 Markdown 表格
  66. md_table = '| ' + ' | '.join(headers) + ' |\n'
  67. md_table += '| ' + ' | '.join(['---'] * len(headers)) + ' |\n'
  68. for row in data:
  69. # 将每个单元格中的内容替换换行符为 <br> 以保留原始格式
  70. md_table += '| ' + ' | '.join(
  71. [str(cell)
  72. .replace('\r\n', '<br>')
  73. .replace('\n', '<br>')
  74. if cell else '' for cell in row]) + ' |\n'
  75. md_tables += md_table + '\n\n'
  76. return md_tables
  77. except Exception as e:
  78. maxkb_logger.error(f'excel split handle error: {e}')
  79. return f'error: {e}'