import requests from bs4 import BeautifulSoup import json import urllib.parse import re import asyncio import sys from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode import urllib3 # Disable SSL warnings urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) def clean_text(s): s = re.sub(r"\s+", " ", s or "") return s.strip() def absolutize_url(base_url, u): if not u: return "" if u.startswith("//"): return "https:" + u if u.startswith("/"): parsed = urllib.parse.urlparse(base_url) return urllib.parse.urljoin(f"{parsed.scheme}://{parsed.netloc}", u) if not u.startswith("http"): return urllib.parse.urljoin(base_url, u) return u class GenericSpiderEngine: def __init__(self, source_config): """ :param source_config: SpiderSource model instance or dictionary """ self.config = source_config async def _crawl_with_ai(self, url, wait_for=None): browser_config = BrowserConfig( headless=True, verbose=False, java_script_enabled=True, ) run_config = CrawlerRunConfig( cache_mode=CacheMode.BYPASS ) if wait_for: run_config.wait_for = wait_for async with AsyncWebCrawler(config=browser_config) as crawler: try: result = await crawler.arun( url=url, config=run_config ) return result.html except Exception as e: # If wait_for failed, retry without it but with a delay if wait_for and "Wait condition failed" in str(e): print(f"Warning: Wait for selector '{wait_for}' timed out. Retrying with 5s delay...") run_config.wait_for = None # Use js_code to wait for 5 seconds as a fallback run_config.js_code = "await new Promise(r => setTimeout(r, 5000));" result = await crawler.arun( url=url, config=run_config ) return result.html else: raise e def run(self, keyword, limit=10, pages=1): url = self.config.url method = self.config.method or 'GET' headers = json.loads(self.config.headers) if self.config.headers else { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.5845.97 Safari/537.36" } base_params = json.loads(self.config.params) if self.config.params else {} search_key = self.config.search_param_key or 'q' # Add keyword to params base_params[search_key] = keyword # Pagination Config has_pagination = getattr(self.config, 'has_pagination', False) pagination_param = getattr(self.config, 'pagination_param', 'pn') pagination_step = getattr(self.config, 'pagination_step', 10) pagination_start = getattr(self.config, 'pagination_start', 0) pages_to_crawl = pages if has_pagination else 1 all_results = [] for page_idx in range(pages_to_crawl): # Calculate current pagination value current_pagination_val = pagination_start + (page_idx * pagination_step) # Update params current_params = base_params.copy() if has_pagination and pagination_param: current_params[pagination_param] = current_pagination_val print(f"Crawling page {page_idx+1}/{pages_to_crawl} with {pagination_param}={current_pagination_val}") # Construct full URL for logging and fallback full_url = url if current_params: query_string = urllib.parse.urlencode(current_params) if '?' in full_url: full_url += '&' + query_string else: full_url += '?' + query_string html_content = "" current_url = full_url try: if method.upper() == 'GET': response = requests.get(url, headers=headers, params=current_params, timeout=15, verify=False) else: response = requests.post(url, headers=headers, data=current_params, timeout=15, verify=False) response.raise_for_status() # Robust decoding strategy for Chinese websites (Baidu, Gov sites, etc.) try: html_content = response.content.decode('utf-8') except UnicodeDecodeError: try: html_content = response.content.decode('gb18030') except UnicodeDecodeError: response.encoding = response.apparent_encoding html_content = response.text current_url = response.url except Exception as e: print(f"Standard fetch failed ({e}). Activating intelligent crawler for {full_url}") try: # Extract wait_for selector from config wait_for = None try: if self.config.selectors: sel_json = json.loads(self.config.selectors) wait_for = sel_json.get('list') except Exception as ex: print(f"Error parsing selectors for wait_for: {ex}") html_content = asyncio.run(self._crawl_with_ai(full_url, wait_for=wait_for)) current_url = full_url # Crawl4AI result url might be different but we start with full_url except Exception as ai_e: print(f"Crawl4AI also failed: {ai_e}") continue # Skip this page if not html_content: continue page_results = self.parse(html_content, current_url, limit) all_results.extend(page_results) if page_idx < pages_to_crawl - 1: import time time.sleep(1) return all_results def parse(self, html, current_url, limit): if not self.config.selectors: return [] selectors = json.loads(self.config.selectors) list_selector = selectors.get('list') if not list_selector: return [] soup = BeautifulSoup(html, 'lxml') items = [] containers = soup.select(list_selector) for container in containers: if len(items) >= limit: break item = {} # Helper to extract text or attr def extract(field, sel_config): if not sel_config: return "" # sel_config can be "selector" or {"selector": "...", "attr": "..."} selector = sel_config attr = None if isinstance(sel_config, dict): selector = sel_config.get('selector') attr = sel_config.get('attr') element = container.select_one(selector) if selector else container if not element: return "" if attr: return element.get(attr, "") return clean_text(element.get_text()) item['title'] = extract('title', selectors.get('title')) item['link'] = absolutize_url(current_url, extract('link', selectors.get('link'))) item['abstract'] = extract('abstract', selectors.get('abstract')) item['source'] = extract('source', selectors.get('source')) item['cover'] = extract('cover', selectors.get('cover')) item['date'] = extract('date', selectors.get('date')) if item['title']: items.append(item) return items def run_generic_spider(source_model, keyword, limit=10, pages=1): engine = GenericSpiderEngine(source_model) return engine.run(keyword, limit, pages)