| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- import requests
- from bs4 import BeautifulSoup
- import json
- import urllib.parse
- import re
- import asyncio
- import sys
- from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
- import urllib3
- # Disable SSL warnings
- urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- def clean_text(s):
- s = re.sub(r"\s+", " ", s or "")
- return s.strip()
- def absolutize_url(base_url, u):
- if not u:
- return ""
- if u.startswith("//"):
- return "https:" + u
- if u.startswith("/"):
- parsed = urllib.parse.urlparse(base_url)
- return urllib.parse.urljoin(f"{parsed.scheme}://{parsed.netloc}", u)
- if not u.startswith("http"):
- return urllib.parse.urljoin(base_url, u)
- return u
- class GenericSpiderEngine:
- def __init__(self, source_config):
- """
- :param source_config: SpiderSource model instance or dictionary
- """
- self.config = source_config
- async def _crawl_with_ai(self, url, wait_for=None):
- browser_config = BrowserConfig(
- headless=True,
- verbose=False,
- java_script_enabled=True,
- )
- run_config = CrawlerRunConfig(
- cache_mode=CacheMode.BYPASS
- )
- if wait_for:
- run_config.wait_for = wait_for
-
- async with AsyncWebCrawler(config=browser_config) as crawler:
- try:
- result = await crawler.arun(
- url=url,
- config=run_config
- )
- return result.html
- except Exception as e:
- # If wait_for failed, retry without it but with a delay
- if wait_for and "Wait condition failed" in str(e):
- print(f"Warning: Wait for selector '{wait_for}' timed out. Retrying with 5s delay...")
- run_config.wait_for = None
- # Use js_code to wait for 5 seconds as a fallback
- run_config.js_code = "await new Promise(r => setTimeout(r, 5000));"
-
- result = await crawler.arun(
- url=url,
- config=run_config
- )
- return result.html
- else:
- raise e
- def run(self, keyword, limit=10, pages=1):
- url = self.config.url
- method = self.config.method or 'GET'
- headers = json.loads(self.config.headers) if self.config.headers else {
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.5845.97 Safari/537.36"
- }
- base_params = json.loads(self.config.params) if self.config.params else {}
- search_key = self.config.search_param_key or 'q'
-
- # Add keyword to params
- base_params[search_key] = keyword
- # Pagination Config
- has_pagination = getattr(self.config, 'has_pagination', False)
- pagination_param = getattr(self.config, 'pagination_param', 'pn')
- pagination_step = getattr(self.config, 'pagination_step', 10)
- pagination_start = getattr(self.config, 'pagination_start', 0)
-
- pages_to_crawl = pages if has_pagination else 1
- all_results = []
-
- for page_idx in range(pages_to_crawl):
- # Calculate current pagination value
- current_pagination_val = pagination_start + (page_idx * pagination_step)
-
- # Update params
- current_params = base_params.copy()
- if has_pagination and pagination_param:
- current_params[pagination_param] = current_pagination_val
-
- print(f"Crawling page {page_idx+1}/{pages_to_crawl} with {pagination_param}={current_pagination_val}")
- # Construct full URL for logging and fallback
- full_url = url
- if current_params:
- query_string = urllib.parse.urlencode(current_params)
- if '?' in full_url:
- full_url += '&' + query_string
- else:
- full_url += '?' + query_string
- html_content = ""
- current_url = full_url
- try:
- if method.upper() == 'GET':
- response = requests.get(url, headers=headers, params=current_params, timeout=15, verify=False)
- else:
- response = requests.post(url, headers=headers, data=current_params, timeout=15, verify=False)
-
- response.raise_for_status()
-
- # Robust decoding strategy for Chinese websites (Baidu, Gov sites, etc.)
- try:
- html_content = response.content.decode('utf-8')
- except UnicodeDecodeError:
- try:
- html_content = response.content.decode('gb18030')
- except UnicodeDecodeError:
- response.encoding = response.apparent_encoding
- html_content = response.text
-
- current_url = response.url
-
- except Exception as e:
- print(f"Standard fetch failed ({e}). Activating intelligent crawler for {full_url}")
- try:
- # Extract wait_for selector from config
- wait_for = None
- try:
- if self.config.selectors:
- sel_json = json.loads(self.config.selectors)
- wait_for = sel_json.get('list')
- except Exception as ex:
- print(f"Error parsing selectors for wait_for: {ex}")
- html_content = asyncio.run(self._crawl_with_ai(full_url, wait_for=wait_for))
- current_url = full_url # Crawl4AI result url might be different but we start with full_url
- except Exception as ai_e:
- print(f"Crawl4AI also failed: {ai_e}")
- continue # Skip this page
-
- if not html_content:
- continue
-
- page_results = self.parse(html_content, current_url, limit)
- all_results.extend(page_results)
-
- if page_idx < pages_to_crawl - 1:
- import time
- time.sleep(1)
- return all_results
- def parse(self, html, current_url, limit):
- if not self.config.selectors:
- return []
-
- selectors = json.loads(self.config.selectors)
- list_selector = selectors.get('list')
- if not list_selector:
- return []
- soup = BeautifulSoup(html, 'lxml')
- items = []
- containers = soup.select(list_selector)
-
- for container in containers:
- if len(items) >= limit:
- break
-
- item = {}
- # Helper to extract text or attr
- def extract(field, sel_config):
- if not sel_config:
- return ""
- # sel_config can be "selector" or {"selector": "...", "attr": "..."}
- selector = sel_config
- attr = None
- if isinstance(sel_config, dict):
- selector = sel_config.get('selector')
- attr = sel_config.get('attr')
-
- element = container.select_one(selector) if selector else container
- if not element:
- return ""
-
- if attr:
- return element.get(attr, "")
- return clean_text(element.get_text())
- item['title'] = extract('title', selectors.get('title'))
- item['link'] = absolutize_url(current_url, extract('link', selectors.get('link')))
- item['abstract'] = extract('abstract', selectors.get('abstract'))
- item['source'] = extract('source', selectors.get('source'))
- item['cover'] = extract('cover', selectors.get('cover'))
- item['date'] = extract('date', selectors.get('date'))
-
- if item['title']:
- items.append(item)
-
- return items
- def run_generic_spider(source_model, keyword, limit=10, pages=1):
- engine = GenericSpiderEngine(source_model)
- return engine.run(keyword, limit, pages)
|