xlsx_parse_qa_handle.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. # coding=utf-8
  2. """
  3. @project: maxkb
  4. @Author:虎
  5. @file: xlsx_parse_qa_handle.py
  6. @date:2024/5/21 14:59
  7. @desc:
  8. """
  9. import io
  10. import traceback
  11. import openpyxl
  12. from common.handle.base_parse_qa_handle import BaseParseQAHandle, get_title_row_index_dict, get_row_value
  13. from common.handle.impl.common_handle import xlsx_embed_cells_images
  14. from common.utils.logger import maxkb_logger
  15. def handle_sheet(file_name, sheet, image_dict):
  16. rows = sheet.rows
  17. try:
  18. title_row_list = next(rows)
  19. title_row_list = [row.value for row in title_row_list]
  20. except Exception as e:
  21. return {'name': file_name, 'paragraphs': []}
  22. if len(title_row_list) == 0:
  23. return {'name': file_name, 'paragraphs': []}
  24. title_row_index_dict = get_title_row_index_dict(title_row_list)
  25. paragraph_list = []
  26. for row in rows:
  27. content = get_row_value(row, title_row_index_dict, 'content')
  28. if content is None or content.value is None:
  29. continue
  30. problem = get_row_value(row, title_row_index_dict, 'problem_list')
  31. problem = str(problem.value) if problem is not None and problem.value is not None else ''
  32. problem_list = [{'content': p[0:255]} for p in problem.split('\n') if len(p.strip()) > 0]
  33. title = get_row_value(row, title_row_index_dict, 'title')
  34. title = str(title.value) if title is not None and title.value is not None else ''
  35. content = str(content.value)
  36. image = image_dict.get(content, None)
  37. if image is not None:
  38. content = f'![](./oss/file/{image.id})'
  39. paragraph_list.append({'title': title[0:255],
  40. 'content': content[0:102400],
  41. 'problem_list': problem_list})
  42. return {'name': file_name, 'paragraphs': paragraph_list}
  43. class XlsxParseQAHandle(BaseParseQAHandle):
  44. def support(self, file, get_buffer):
  45. file_name: str = file.name.lower()
  46. if file_name.endswith(".xlsx"):
  47. return True
  48. return False
  49. def handle(self, file, get_buffer, save_image):
  50. buffer = get_buffer(file)
  51. try:
  52. workbook = openpyxl.load_workbook(io.BytesIO(buffer))
  53. try:
  54. image_dict: dict = xlsx_embed_cells_images(io.BytesIO(buffer))
  55. save_image([item for item in image_dict.values()])
  56. except Exception as e:
  57. image_dict = {}
  58. worksheets = workbook.worksheets
  59. worksheets_size = len(worksheets)
  60. return [row for row in
  61. [handle_sheet(file.name,
  62. sheet,
  63. image_dict) if worksheets_size == 1 and sheet.title == 'Sheet1' else handle_sheet(
  64. sheet.title, sheet, image_dict) for sheet
  65. in worksheets] if row is not None]
  66. except Exception as e:
  67. maxkb_logger.error(f"Error processing XLSX file {file.name}: {e}, {traceback.format_exc()}")
  68. return [{'name': file.name, 'paragraphs': []}]