fork.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import copy
  2. import re
  3. import traceback
  4. from functools import reduce
  5. from typing import List, Set
  6. from urllib.parse import urljoin, urlparse, ParseResult, urlsplit, urlunparse
  7. import html2text as ht
  8. import requests
  9. from bs4 import BeautifulSoup
  10. from common.utils.logger import maxkb_logger
  11. requests.packages.urllib3.disable_warnings()
  12. class ChildLink:
  13. def __init__(self, url, tag):
  14. self.url = url
  15. self.tag = copy.deepcopy(tag)
  16. class ForkManage:
  17. def __init__(self, base_url: str, selector_list: List[str]):
  18. self.base_url = base_url
  19. self.selector_list = selector_list
  20. def fork(self, level: int, exclude_link_url: Set[str], fork_handler):
  21. self.fork_child(ChildLink(self.base_url, None), self.selector_list, level, exclude_link_url, fork_handler)
  22. @staticmethod
  23. def fork_child(child_link: ChildLink, selector_list: List[str], level: int, exclude_link_url: Set[str],
  24. fork_handler):
  25. if level < 0:
  26. return
  27. else:
  28. child_link.url = remove_fragment(child_link.url)
  29. child_url = child_link.url[:-1] if child_link.url.endswith('/') else child_link.url
  30. if not exclude_link_url.__contains__(child_url):
  31. exclude_link_url.add(child_url)
  32. response = Fork(child_link.url, selector_list).fork()
  33. fork_handler(child_link, response)
  34. for child_link in response.child_link_list:
  35. child_url = child_link.url[:-1] if child_link.url.endswith('/') else child_link.url
  36. if not exclude_link_url.__contains__(child_url):
  37. ForkManage.fork_child(child_link, selector_list, level - 1, exclude_link_url, fork_handler)
  38. def remove_fragment(url: str) -> str:
  39. parsed_url = urlparse(url)
  40. modified_url = ParseResult(scheme=parsed_url.scheme, netloc=parsed_url.netloc, path=parsed_url.path,
  41. params=parsed_url.params, query=parsed_url.query, fragment=None)
  42. return urlunparse(modified_url)
  43. class Fork:
  44. class Response:
  45. def __init__(self, content: str, child_link_list: List[ChildLink], status, message: str):
  46. self.content = content
  47. self.child_link_list = child_link_list
  48. self.status = status
  49. self.message = message
  50. @staticmethod
  51. def success(html_content: str, child_link_list: List[ChildLink]):
  52. return Fork.Response(html_content, child_link_list, 200, '')
  53. @staticmethod
  54. def error(message: str):
  55. return Fork.Response('', [], 500, message)
  56. def __init__(self, base_fork_url: str, selector_list: List[str]):
  57. base_fork_url = remove_fragment(base_fork_url)
  58. parsed = urlparse(base_fork_url)
  59. path = parsed.path.rstrip('/')
  60. self.base_fork_url = urlunparse((
  61. parsed.scheme,
  62. parsed.netloc,
  63. path,
  64. None,
  65. None,
  66. None # fragment
  67. ))
  68. parsed = urlsplit(base_fork_url)
  69. query = parsed.query
  70. if query is not None and len(query) > 0:
  71. self.base_fork_url = self.base_fork_url + '?' + query
  72. self.selector_list = [selector for selector in selector_list if selector is not None and len(selector) > 0]
  73. self.urlparse = urlparse(self.base_fork_url)
  74. self.base_url = ParseResult(scheme=self.urlparse.scheme, netloc=self.urlparse.netloc, path='', params='',
  75. query='',
  76. fragment='').geturl()
  77. def get_child_link_list(self, bf: BeautifulSoup):
  78. pattern = "^((?!(http:|https:|tel:/|#|mailto:|javascript:))|" + self.base_fork_url + "|/).*"
  79. link_list = bf.find_all(name='a', href=re.compile(pattern))
  80. result = [ChildLink(link.get('href'), link) if link.get('href').startswith(self.base_url) else ChildLink(
  81. self.base_url + link.get('href'), link) for link in link_list]
  82. result = [row for row in result if row.url.startswith(self.base_fork_url)]
  83. return result
  84. def get_content_html(self, bf: BeautifulSoup):
  85. if self.selector_list is None or len(self.selector_list) == 0:
  86. return str(bf)
  87. params = reduce(lambda x, y: {**x, **y},
  88. [{'class_': selector.replace('.', '')} if selector.startswith('.') else
  89. {'id': selector.replace("#", "")} if selector.startswith("#") else {'name': selector} for
  90. selector in
  91. self.selector_list], {})
  92. f = bf.find_all(**params)
  93. return "\n".join([str(row) for row in f])
  94. @staticmethod
  95. def reset_url(tag, field, base_fork_url):
  96. field_value: str = tag[field]
  97. if field_value.startswith("/"):
  98. result = urlparse(base_fork_url)
  99. result_url = ParseResult(scheme=result.scheme, netloc=result.netloc, path=field_value, params='', query='',
  100. fragment='').geturl()
  101. else:
  102. result_url = urljoin(
  103. base_fork_url + '/' + (field_value if field_value.endswith('/') else field_value + '/'),
  104. ".")
  105. result_url = result_url[:-1] if result_url.endswith('/') else result_url
  106. tag[field] = result_url
  107. def reset_beautiful_soup(self, bf: BeautifulSoup):
  108. reset_config_list = [
  109. {
  110. 'field': 'href',
  111. },
  112. {
  113. 'field': 'src',
  114. }
  115. ]
  116. for reset_config in reset_config_list:
  117. field = reset_config.get('field')
  118. tag_list = bf.find_all(**{field: re.compile('^(?!(http:|https:|tel:/|#|mailto:|javascript:)).*')})
  119. for tag in tag_list:
  120. self.reset_url(tag, field, self.base_fork_url)
  121. return bf
  122. @staticmethod
  123. def get_beautiful_soup(response):
  124. encoding = response.encoding if response.encoding is not None and response.encoding != 'ISO-8859-1' else response.apparent_encoding
  125. html_content = response.content.decode(encoding)
  126. beautiful_soup = BeautifulSoup(html_content, "html.parser")
  127. meta_list = beautiful_soup.find_all('meta')
  128. charset_list = Fork.get_charset_list(meta_list)
  129. if len(charset_list) > 0:
  130. charset = charset_list[0]
  131. if charset != encoding:
  132. try:
  133. html_content = response.content.decode(charset, errors='replace')
  134. except Exception as e:
  135. maxkb_logger.error(f'{e}: {traceback.format_exc()}')
  136. return BeautifulSoup(html_content, "html.parser")
  137. return beautiful_soup
  138. @staticmethod
  139. def get_charset_list(meta_list):
  140. charset_list = []
  141. for meta in meta_list:
  142. if meta.attrs is not None:
  143. if 'charset' in meta.attrs:
  144. charset_list.append(meta.attrs.get('charset'))
  145. elif meta.attrs.get('http-equiv', '').lower() == 'content-type' and 'content' in meta.attrs:
  146. match = re.search(r'charset=([^\s;]+)', meta.attrs['content'], re.I)
  147. if match:
  148. charset_list.append(match.group(1))
  149. return charset_list
  150. def fork(self):
  151. try:
  152. headers = {
  153. 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36'
  154. }
  155. maxkb_logger.info(f'fork:{self.base_fork_url}')
  156. response = requests.get(self.base_fork_url, verify=False, headers=headers)
  157. if response.status_code != 200:
  158. maxkb_logger.error(f"url: {self.base_fork_url} code:{response.status_code}")
  159. return Fork.Response.error(f"url: {self.base_fork_url} code:{response.status_code}")
  160. bf = self.get_beautiful_soup(response)
  161. except Exception as e:
  162. maxkb_logger.error(f'{str(e)}:{traceback.format_exc()}')
  163. return Fork.Response.error(str(e))
  164. bf = self.reset_beautiful_soup(bf)
  165. link_list = self.get_child_link_list(bf)
  166. content = self.get_content_html(bf)
  167. r = ht.html2text(content)
  168. return Fork.Response.success(r, link_list)
  169. def handler(base_url, response: Fork.Response):
  170. maxkb_logger.info(base_url.url, base_url.tag.text if base_url.tag else None, response.content)
  171. # ForkManage('https://bbs.fit2cloud.com/c/de/6', ['.md-content']).fork(3, set(), handler)