import os import re import sys import urllib.parse import requests from bs4 import BeautifulSoup import json import asyncio from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode # Set event loop policy for Windows if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) def build_headers(): config_path = os.path.join(os.path.dirname(__file__), "config.json") if os.path.exists(config_path): try: with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) headers = config.get("headers", {}) if headers: return headers except Exception as e: print(f"Warning: Failed to load config.json: {e}", file=sys.stderr) # Fallback to default if config loading fails cookie = os.environ.get("BAIDU_COOKIE", "").strip() headers = { "Accept": "text/html", "Accept-Language": "zh-CN,zh;q=0.9", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.5845.97 Safari/537.36", } if cookie: headers["Cookie"] = cookie return headers def clean_text(s): s = re.sub(r"\s+", " ", s or "") return s.strip() def absolutize_url(u): if not u: return "" if u.startswith("//"): return "https:" + u if u.startswith("/"): return urllib.parse.urljoin("https://www.baidu.com", u) return u def extract_result_item(container): title = "" link = "" cover = "" source = "" h3 = container.find("h3") if h3: a = h3.find("a") if a: title = clean_text(a.get_text()) link = absolutize_url(a.get("href", "")) if not title: a = container.find("a") if a and a.get("href"): title = clean_text(a.get_text()) link = absolutize_url(a.get("href", "")) abstract = "" abs_div = container.select_one("div.c-abstract") if not abs_div: abs_div = container.find("div", attrs={"class": lambda x: isinstance(x, str) and "abstract" in x}) if not abs_div: abs_div = container.find("div", attrs={"class": lambda x: isinstance(x, str) and ("content" in x or "summary" in x)}) if abs_div: abstract = clean_text(abs_div.get_text()) else: p = container.find("p") if p: abstract = clean_text(p.get_text()) img = container.find("img") if img: cover = absolutize_url(img.get("data-src") or img.get("src") or "") src_span = None f13 = container.select_one("div.f13") if f13: src_span = f13.find("span") if not src_span: src_span = container.find("span", attrs={"class": lambda x: isinstance(x, str) and "c-color-gray" in x}) if src_span: source = clean_text(src_span.get_text()) if not source and link: try: parsed = urllib.parse.urlparse(link) source = parsed.netloc except Exception: source = "" # Try to extract date date = "" # Strategy 1: Look for specific class patterns commonly used by Baidu # .newTimeFactor_... is common in new PMD, .c-color-gray2 is legacy date_candidates = container.find_all("span", attrs={"class": lambda x: isinstance(x, str) and ( "TimeFactor" in x or "c-color-gray2" in x or "c-gray" in x or "cos-color-text-minor" in x or "source-time" in x )}) for candidate in date_candidates: text = clean_text(candidate.get_text()) # Check if it matches date pattern # Matches: 2023年10月1日, 2023-10-01, 3小时前, 5分钟前, 2天前 match = re.search(r"(\d{4}年\d{1,2}月\d{1,2}日|\d{4}-\d{1,2}-\d{1,2}|\d+(小时|分钟|天)前)", text) if match: date = match.group(0).strip() break # Strategy 2: Fallback for Aladdin cards (often in div with specific structure) if not date: # Check for source-date wrapper often found in news results source_date = container.select_one(".c-span-last") if source_date: text = clean_text(source_date.get_text()) match = re.search(r"(\d{4}年\d{1,2}月\d{1,2}日|\d{4}-\d{1,2}-\d{1,2}|\d+(小时|分钟|天)前)", text) if match: date = match.group(0).strip() return { "title": title, "abstract": abstract, "source": source, "date": date, "cover": cover, "link": link, } def parse_results(html, limit=10): try: soup = BeautifulSoup(html, "lxml") except Exception: soup = BeautifulSoup(html, "html.parser") items = [] seen = set() anchors = soup.select("#content_left h3.t a, #content_left h3 a") for a in anchors: c = a container = None for _ in range(10): c = c.find_parent("div") if not c: break cls = " ".join(c.get("class", [])) if "result" in cls: container = c break if not container: container = a.find_parent("div") item = extract_result_item(container or a) key = (item["title"], item["abstract"]) if not item["title"]: continue if key in seen: continue seen.add(key) items.append(item) if len(items) >= limit: break if not items: containers = soup.select("div.result, div[class*=result]") for c in containers: item = extract_result_item(c) key = (item["title"], item["abstract"]) if not item["title"]: continue if key in seen: continue seen.add(key) items.append(item) if len(items) >= limit: break return items async def _fetch_with_crawl4ai(url): """ Fetch page content using Crawl4AI (Playwright) to handle JS and basic anti-bot checks """ print(f"Fallback to Crawl4AI for: {url}") browser_config = BrowserConfig( headless=True, verbose=False, java_script_enabled=True, ) run_config = CrawlerRunConfig( cache_mode=CacheMode.BYPASS, # Wait for the main content container wait_for="#content_left" ) async with AsyncWebCrawler(config=browser_config) as crawler: result = await crawler.arun( url=url, config=run_config ) return result.html def fetch_html(wd, pn): params = {"wd": wd, "pn": str(pn)} headers = build_headers() url_https = "https://www.baidu.com/s" # Construct full URL for Crawl4AI if needed full_url = f"{url_https}?{urllib.parse.urlencode(params)}" try: r = requests.get(url_https, params=params, headers=headers, timeout=15) r.raise_for_status() # Force UTF-8 encoding as Baidu sometimes returns ISO-8859-1 header r.encoding = "utf-8" text = r.text # Check for security verification or simple redirect if "location.replace" in text or "http-equiv=\"refresh\"" in text or "百度安全验证" in text or "wappass.baidu.com" in text: print("Detected redirect/security check, trying Crawl4AI...") return asyncio.run(_fetch_with_crawl4ai(full_url)) return text except Exception as e: print(f"Requests failed ({e}), trying Crawl4AI...") try: return asyncio.run(_fetch_with_crawl4ai(full_url)) except Exception as ai_e: print(f"Crawl4AI failed: {ai_e}", file=sys.stderr) raise e def compute_pn(page): if page < 1: return 0 return (page - 1) * 10 def run_spider(keyword, page=1, limit=10): """ Executes the Baidu spider :param keyword: Search keyword :param page: Page number :param limit: Result limit :return: List of results """ try: pn = compute_pn(page) html = fetch_html(keyword, pn) items = parse_results(html, limit=limit) return items except Exception as e: print(f"Error running spider: {e}", file=sys.stderr) return []