| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- import copy
- import re
- import traceback
- from functools import reduce
- from typing import List, Set
- from urllib.parse import urljoin, urlparse, ParseResult, urlsplit, urlunparse
- import html2text as ht
- import requests
- from bs4 import BeautifulSoup
- from common.utils.logger import maxkb_logger
- requests.packages.urllib3.disable_warnings()
- class ChildLink:
- def __init__(self, url, tag):
- self.url = url
- self.tag = copy.deepcopy(tag)
- class ForkManage:
- def __init__(self, base_url: str, selector_list: List[str]):
- self.base_url = base_url
- self.selector_list = selector_list
- def fork(self, level: int, exclude_link_url: Set[str], fork_handler):
- self.fork_child(ChildLink(self.base_url, None), self.selector_list, level, exclude_link_url, fork_handler)
- @staticmethod
- def fork_child(child_link: ChildLink, selector_list: List[str], level: int, exclude_link_url: Set[str],
- fork_handler):
- if level < 0:
- return
- else:
- child_link.url = remove_fragment(child_link.url)
- child_url = child_link.url[:-1] if child_link.url.endswith('/') else child_link.url
- if not exclude_link_url.__contains__(child_url):
- exclude_link_url.add(child_url)
- response = Fork(child_link.url, selector_list).fork()
- fork_handler(child_link, response)
- for child_link in response.child_link_list:
- child_url = child_link.url[:-1] if child_link.url.endswith('/') else child_link.url
- if not exclude_link_url.__contains__(child_url):
- ForkManage.fork_child(child_link, selector_list, level - 1, exclude_link_url, fork_handler)
- def remove_fragment(url: str) -> str:
- parsed_url = urlparse(url)
- modified_url = ParseResult(scheme=parsed_url.scheme, netloc=parsed_url.netloc, path=parsed_url.path,
- params=parsed_url.params, query=parsed_url.query, fragment=None)
- return urlunparse(modified_url)
- class Fork:
- class Response:
- def __init__(self, content: str, child_link_list: List[ChildLink], status, message: str):
- self.content = content
- self.child_link_list = child_link_list
- self.status = status
- self.message = message
- @staticmethod
- def success(html_content: str, child_link_list: List[ChildLink]):
- return Fork.Response(html_content, child_link_list, 200, '')
- @staticmethod
- def error(message: str):
- return Fork.Response('', [], 500, message)
- def __init__(self, base_fork_url: str, selector_list: List[str]):
- base_fork_url = remove_fragment(base_fork_url)
- parsed = urlparse(base_fork_url)
- path = parsed.path.rstrip('/')
- self.base_fork_url = urlunparse((
- parsed.scheme,
- parsed.netloc,
- path,
- None,
- None,
- None # fragment
- ))
- parsed = urlsplit(base_fork_url)
- query = parsed.query
- if query is not None and len(query) > 0:
- self.base_fork_url = self.base_fork_url + '?' + query
- self.selector_list = [selector for selector in selector_list if selector is not None and len(selector) > 0]
- self.urlparse = urlparse(self.base_fork_url)
- self.base_url = ParseResult(scheme=self.urlparse.scheme, netloc=self.urlparse.netloc, path='', params='',
- query='',
- fragment='').geturl()
- def get_child_link_list(self, bf: BeautifulSoup):
- pattern = "^((?!(http:|https:|tel:/|#|mailto:|javascript:))|" + self.base_fork_url + "|/).*"
- link_list = bf.find_all(name='a', href=re.compile(pattern))
- result = [ChildLink(link.get('href'), link) if link.get('href').startswith(self.base_url) else ChildLink(
- self.base_url + link.get('href'), link) for link in link_list]
- result = [row for row in result if row.url.startswith(self.base_fork_url)]
- return result
- def get_content_html(self, bf: BeautifulSoup):
- if self.selector_list is None or len(self.selector_list) == 0:
- return str(bf)
- params = reduce(lambda x, y: {**x, **y},
- [{'class_': selector.replace('.', '')} if selector.startswith('.') else
- {'id': selector.replace("#", "")} if selector.startswith("#") else {'name': selector} for
- selector in
- self.selector_list], {})
- f = bf.find_all(**params)
- return "\n".join([str(row) for row in f])
- @staticmethod
- def reset_url(tag, field, base_fork_url):
- field_value: str = tag[field]
- if field_value.startswith("/"):
- result = urlparse(base_fork_url)
- result_url = ParseResult(scheme=result.scheme, netloc=result.netloc, path=field_value, params='', query='',
- fragment='').geturl()
- else:
- result_url = urljoin(
- base_fork_url + '/' + (field_value if field_value.endswith('/') else field_value + '/'),
- ".")
- result_url = result_url[:-1] if result_url.endswith('/') else result_url
- tag[field] = result_url
- def reset_beautiful_soup(self, bf: BeautifulSoup):
- reset_config_list = [
- {
- 'field': 'href',
- },
- {
- 'field': 'src',
- }
- ]
- for reset_config in reset_config_list:
- field = reset_config.get('field')
- tag_list = bf.find_all(**{field: re.compile('^(?!(http:|https:|tel:/|#|mailto:|javascript:)).*')})
- for tag in tag_list:
- self.reset_url(tag, field, self.base_fork_url)
- return bf
- @staticmethod
- def get_beautiful_soup(response):
- encoding = response.encoding if response.encoding is not None and response.encoding != 'ISO-8859-1' else response.apparent_encoding
- html_content = response.content.decode(encoding)
- beautiful_soup = BeautifulSoup(html_content, "html.parser")
- meta_list = beautiful_soup.find_all('meta')
- charset_list = Fork.get_charset_list(meta_list)
- if len(charset_list) > 0:
- charset = charset_list[0]
- if charset != encoding:
- try:
- html_content = response.content.decode(charset, errors='replace')
- except Exception as e:
- maxkb_logger.error(f'{e}: {traceback.format_exc()}')
- return BeautifulSoup(html_content, "html.parser")
- return beautiful_soup
- @staticmethod
- def get_charset_list(meta_list):
- charset_list = []
- for meta in meta_list:
- if meta.attrs is not None:
- if 'charset' in meta.attrs:
- charset_list.append(meta.attrs.get('charset'))
- elif meta.attrs.get('http-equiv', '').lower() == 'content-type' and 'content' in meta.attrs:
- match = re.search(r'charset=([^\s;]+)', meta.attrs['content'], re.I)
- if match:
- charset_list.append(match.group(1))
- return charset_list
- def fork(self):
- try:
- headers = {
- 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36'
- }
- maxkb_logger.info(f'fork:{self.base_fork_url}')
- response = requests.get(self.base_fork_url, verify=False, headers=headers)
- if response.status_code != 200:
- maxkb_logger.error(f"url: {self.base_fork_url} code:{response.status_code}")
- return Fork.Response.error(f"url: {self.base_fork_url} code:{response.status_code}")
- bf = self.get_beautiful_soup(response)
- except Exception as e:
- maxkb_logger.error(f'{str(e)}:{traceback.format_exc()}')
- return Fork.Response.error(str(e))
- bf = self.reset_beautiful_soup(bf)
- link_list = self.get_child_link_list(bf)
- content = self.get_content_html(bf)
- r = ht.html2text(content)
- return Fork.Response.success(r, link_list)
- def handler(base_url, response: Fork.Response):
- maxkb_logger.info(base_url.url, base_url.tag.text if base_url.tag else None, response.content)
- # ForkManage('https://bbs.fit2cloud.com/c/de/6', ['.md-content']).fork(3, set(), handler)
|