| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- # coding=utf-8
- """
- @project: maxkb
- @Author:虎
- @file: md_parse_qa_handle.py
- @date:2024/5/21 14:59
- @desc:
- """
- import re
- import traceback
- from typing import Any
- from charset_normalizer import detect
- from common.handle.base_parse_qa_handle import BaseParseQAHandle, get_title_row_index_dict, get_row_value
- from common.utils.logger import maxkb_logger
- class MarkdownParseQAHandle(BaseParseQAHandle):
- def support(self, file, get_buffer):
- file_name: str = file.name.lower()
- if file_name.endswith(".md") or file_name.endswith(".markdown"):
- return True
- return False
- def parse_markdown_table(self, content):
- """解析 Markdown 表格,返回表格数据列表"""
- tables = []
- lines = content.split('\n')
- i = 0
- while i < len(lines):
- line = lines[i].strip()
- # 检测表格开始(包含 | 符号)
- if '|' in line and line.startswith('|'):
- table_data = []
- # 读取表头
- header = [cell.strip() for cell in line.split('|')[1:-1]]
- table_data.append(header)
- i += 1
- # 跳过分隔行 (例如: | --- | --- |)
- if i < len(lines) and re.match(r'\s*\|[\s\-:]+\|\s*', lines[i]):
- i += 1
- # 读取数据行
- while i < len(lines):
- line = lines[i].strip()
- if not line.startswith('|'):
- break
- row = [self._unescape_cell_content(cell) for cell in line.split('|')[1:-1]]
- if len(row) > 0:
- table_data.append(row)
- i += 1
- if len(table_data) > 1: # 至少有表头和一行数据
- tables.append(table_data)
- else:
- i += 1
- return tables
- def _unescape_cell_content(self, cell) -> Any:
- text = cell.strip().replace('|', '|')
- text = text.replace('|<br>|', '|\n|')
- return text
- def handle(self, file, get_buffer, save_image):
- buffer = get_buffer(file)
- try:
- # 检测编码并读取文件内容
- encoding = detect(buffer)['encoding']
- content = buffer.decode(encoding if encoding else 'utf-8')
- # 按 sheet 分割内容
- sheet_sections = self.split_by_sheets(content)
- result = []
- for sheet_name, sheet_content in sheet_sections:
- # 解析该 sheet 的表格
- tables = self.parse_markdown_table(sheet_content)
- paragraph_list = []
- # 处理每个表格
- for table in tables:
- if len(table) < 2:
- continue
- title_row_list = table[0]
- title_row_index_dict = get_title_row_index_dict(title_row_list)
- # 处理表格的每一行数据
- for row in table[1:]:
- content_text = get_row_value(row, title_row_index_dict, 'content')
- if content_text is None:
- continue
- problem = get_row_value(row, title_row_index_dict, 'problem_list')
- problem = str(problem) if problem is not None else ''
- problem_list = [{'content': p[0:255]} for p in problem.split('\n') if len(p.strip()) > 0]
- title = get_row_value(row, title_row_index_dict, 'title')
- title = str(title) if title is not None else ''
- paragraph_list.append({
- 'title': title[0:255],
- 'content': content_text[0:102400],
- 'problem_list': problem_list
- })
- result.append({'name': sheet_name, 'paragraphs': paragraph_list})
- return result if result else [{'name': file.name, 'paragraphs': []}]
- except Exception as e:
- maxkb_logger.error(f"Error processing Markdown file {file.name}: {e}, {traceback.format_exc()}")
- return [{'name': file.name, 'paragraphs': []}]
- def split_by_sheets(self, content):
- """按二级标题(##)分割 sheet"""
- lines = content.split('\n')
- sheets = []
- current_sheet_name = None
- current_content = []
- for line in lines:
- # 检测二级标题作为 sheet 名称
- if line.strip().startswith('## '):
- if current_sheet_name is not None:
- sheets.append((current_sheet_name, '\n'.join(current_content)))
- current_sheet_name = line.strip()[3:].strip()
- current_content = []
- else:
- current_content.append(line)
- # 添加最后一个 sheet
- if current_sheet_name is not None:
- sheets.append((current_sheet_name, '\n'.join(current_content)))
- # 如果没有找到 sheet 标题,返回整个内容
- if not sheets:
- sheets.append(('default', content))
- return sheets
|