md_parse_qa_handle.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. # coding=utf-8
  2. """
  3. @project: maxkb
  4. @Author:虎
  5. @file: md_parse_qa_handle.py
  6. @date:2024/5/21 14:59
  7. @desc:
  8. """
  9. import re
  10. import traceback
  11. from typing import Any
  12. from charset_normalizer import detect
  13. from common.handle.base_parse_qa_handle import BaseParseQAHandle, get_title_row_index_dict, get_row_value
  14. from common.utils.logger import maxkb_logger
  15. class MarkdownParseQAHandle(BaseParseQAHandle):
  16. def support(self, file, get_buffer):
  17. file_name: str = file.name.lower()
  18. if file_name.endswith(".md") or file_name.endswith(".markdown"):
  19. return True
  20. return False
  21. def parse_markdown_table(self, content):
  22. """解析 Markdown 表格,返回表格数据列表"""
  23. tables = []
  24. lines = content.split('\n')
  25. i = 0
  26. while i < len(lines):
  27. line = lines[i].strip()
  28. # 检测表格开始(包含 | 符号)
  29. if '|' in line and line.startswith('|'):
  30. table_data = []
  31. # 读取表头
  32. header = [cell.strip() for cell in line.split('|')[1:-1]]
  33. table_data.append(header)
  34. i += 1
  35. # 跳过分隔行 (例如: | --- | --- |)
  36. if i < len(lines) and re.match(r'\s*\|[\s\-:]+\|\s*', lines[i]):
  37. i += 1
  38. # 读取数据行
  39. while i < len(lines):
  40. line = lines[i].strip()
  41. if not line.startswith('|'):
  42. break
  43. row = [self._unescape_cell_content(cell) for cell in line.split('|')[1:-1]]
  44. if len(row) > 0:
  45. table_data.append(row)
  46. i += 1
  47. if len(table_data) > 1: # 至少有表头和一行数据
  48. tables.append(table_data)
  49. else:
  50. i += 1
  51. return tables
  52. def _unescape_cell_content(self, cell) -> Any:
  53. text = cell.strip().replace('&#124;', '|')
  54. text = text.replace('|<br>|', '|\n|')
  55. return text
  56. def handle(self, file, get_buffer, save_image):
  57. buffer = get_buffer(file)
  58. try:
  59. # 检测编码并读取文件内容
  60. encoding = detect(buffer)['encoding']
  61. content = buffer.decode(encoding if encoding else 'utf-8')
  62. # 按 sheet 分割内容
  63. sheet_sections = self.split_by_sheets(content)
  64. result = []
  65. for sheet_name, sheet_content in sheet_sections:
  66. # 解析该 sheet 的表格
  67. tables = self.parse_markdown_table(sheet_content)
  68. paragraph_list = []
  69. # 处理每个表格
  70. for table in tables:
  71. if len(table) < 2:
  72. continue
  73. title_row_list = table[0]
  74. title_row_index_dict = get_title_row_index_dict(title_row_list)
  75. # 处理表格的每一行数据
  76. for row in table[1:]:
  77. content_text = get_row_value(row, title_row_index_dict, 'content')
  78. if content_text is None:
  79. continue
  80. problem = get_row_value(row, title_row_index_dict, 'problem_list')
  81. problem = str(problem) if problem is not None else ''
  82. problem_list = [{'content': p[0:255]} for p in problem.split('\n') if len(p.strip()) > 0]
  83. title = get_row_value(row, title_row_index_dict, 'title')
  84. title = str(title) if title is not None else ''
  85. paragraph_list.append({
  86. 'title': title[0:255],
  87. 'content': content_text[0:102400],
  88. 'problem_list': problem_list
  89. })
  90. result.append({'name': sheet_name, 'paragraphs': paragraph_list})
  91. return result if result else [{'name': file.name, 'paragraphs': []}]
  92. except Exception as e:
  93. maxkb_logger.error(f"Error processing Markdown file {file.name}: {e}, {traceback.format_exc()}")
  94. return [{'name': file.name, 'paragraphs': []}]
  95. def split_by_sheets(self, content):
  96. """按二级标题(##)分割 sheet"""
  97. lines = content.split('\n')
  98. sheets = []
  99. current_sheet_name = None
  100. current_content = []
  101. for line in lines:
  102. # 检测二级标题作为 sheet 名称
  103. if line.strip().startswith('## '):
  104. if current_sheet_name is not None:
  105. sheets.append((current_sheet_name, '\n'.join(current_content)))
  106. current_sheet_name = line.strip()[3:].strip()
  107. current_content = []
  108. else:
  109. current_content.append(line)
  110. # 添加最后一个 sheet
  111. if current_sheet_name is not None:
  112. sheets.append((current_sheet_name, '\n'.join(current_content)))
  113. # 如果没有找到 sheet 标题,返回整个内容
  114. if not sheets:
  115. sheets.append(('default', content))
  116. return sheets