text_split_handle.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. # coding=utf-8
  2. """
  3. @project: maxkb
  4. @Author:虎
  5. @file: text_split_handle.py
  6. @date:2024/3/27 18:19
  7. @desc:
  8. """
  9. import re
  10. import traceback
  11. from typing import List
  12. from charset_normalizer import detect
  13. from common.handle.base_split_handle import BaseSplitHandle
  14. from common.utils.logger import maxkb_logger
  15. from common.utils.split_model import SplitModel
  16. default_pattern_list = [
  17. re.compile('(?<=^)# (?!-\\*- coding:).*|(?<=\\n)# (?!-\\*- coding:).*'),
  18. re.compile('(?<=\\n)(?<!#)## (?!#).*|(?<=^)(?<!#)## (?!#).*'),
  19. re.compile("(?<=\\n)(?<!#)### (?!#).*|(?<=^)(?<!#)### (?!#).*"),
  20. re.compile("(?<=\\n)(?<!#)#### (?!#).*|(?<=^)(?<!#)#### (?!#).*"),
  21. re.compile("(?<=\\n)(?<!#)##### (?!#).*|(?<=^)(?<!#)##### (?!#).*"),
  22. re.compile("(?<=\\n)(?<!#)###### (?!#).*|(?<=^)(?<!#)###### (?!#).*")
  23. ]
  24. end = [".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm", ".mpeg", ".mpg", ".3gp", ".ts", ".rmvb",
  25. ".mp3", ".wav", ".flac", ".aac", ".ogg", ".m4a", ".wma", ".opus", ".alac", ".aiff", ".amr",
  26. ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp", ".heif", ".raw", ".ico", ".svg", ".pdf"]
  27. class TextSplitHandle(BaseSplitHandle):
  28. def support(self, file, get_buffer):
  29. file_name: str = file.name.lower()
  30. if file_name.endswith(".md") or file_name.endswith('.txt') or file_name.endswith('.TXT') or file_name.endswith(
  31. '.MD'):
  32. return True
  33. lower_name = file_name.lower()
  34. if any([True for item in end if lower_name.endswith(item)]):
  35. return False
  36. buffer = get_buffer(file)
  37. result = detect(buffer)
  38. if result['encoding'] is not None and result['confidence'] is not None and result['encoding'] != 'ascii' and \
  39. result['confidence'] > 0.5:
  40. return True
  41. return False
  42. def handle(self, file, pattern_list: List, with_filter: bool, limit: int, get_buffer, save_image):
  43. buffer = get_buffer(file)
  44. if type(limit) is str:
  45. limit = int(limit)
  46. if type(with_filter) is str:
  47. with_filter = with_filter.lower() == 'true'
  48. if pattern_list is not None and len(pattern_list) > 0:
  49. split_model = SplitModel(pattern_list, with_filter, limit)
  50. else:
  51. split_model = SplitModel(default_pattern_list, with_filter=with_filter, limit=limit)
  52. try:
  53. content = buffer.decode(detect(buffer)['encoding'])
  54. except BaseException as e:
  55. maxkb_logger.error(f"Error processing TEXT file {file.name}: {e}, {traceback.format_exc()}")
  56. return {'name': file.name, 'content': []}
  57. return {'name': file.name, 'content': split_model.parse(content)}
  58. def get_content(self, file, save_image):
  59. buffer = file.read()
  60. try:
  61. return buffer.decode(detect(buffer)['encoding'])
  62. except BaseException as e:
  63. traceback.print_exception(e)
  64. return f'{e}'