| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- import os
- import re
- import sys
- import urllib.parse
- import requests
- from bs4 import BeautifulSoup
- import json
- import asyncio
- from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
- # Set event loop policy for Windows
- if sys.platform == 'win32':
- asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
- def build_headers():
- config_path = os.path.join(os.path.dirname(__file__), "config.json")
- if os.path.exists(config_path):
- try:
- with open(config_path, "r", encoding="utf-8") as f:
- config = json.load(f)
- headers = config.get("headers", {})
- if headers:
- return headers
- except Exception as e:
- print(f"Warning: Failed to load config.json: {e}", file=sys.stderr)
-
- # Fallback to default if config loading fails
- cookie = os.environ.get("BAIDU_COOKIE", "").strip()
- headers = {
- "Accept": "text/html",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.5845.97 Safari/537.36",
- }
- if cookie:
- headers["Cookie"] = cookie
- return headers
- def clean_text(s):
- s = re.sub(r"\s+", " ", s or "")
- return s.strip()
- def absolutize_url(u):
- if not u:
- return ""
- if u.startswith("//"):
- return "https:" + u
- if u.startswith("/"):
- return urllib.parse.urljoin("https://www.baidu.com", u)
- return u
- def extract_result_item(container):
- title = ""
- link = ""
- cover = ""
- source = ""
- h3 = container.find("h3")
- if h3:
- a = h3.find("a")
- if a:
- title = clean_text(a.get_text())
- link = absolutize_url(a.get("href", ""))
- if not title:
- a = container.find("a")
- if a and a.get("href"):
- title = clean_text(a.get_text())
- link = absolutize_url(a.get("href", ""))
- abstract = ""
- abs_div = container.select_one("div.c-abstract")
- if not abs_div:
- abs_div = container.find("div", attrs={"class": lambda x: isinstance(x, str) and "abstract" in x})
- if not abs_div:
- abs_div = container.find("div", attrs={"class": lambda x: isinstance(x, str) and ("content" in x or "summary" in x)})
- if abs_div:
- abstract = clean_text(abs_div.get_text())
- else:
- p = container.find("p")
- if p:
- abstract = clean_text(p.get_text())
- img = container.find("img")
- if img:
- cover = absolutize_url(img.get("data-src") or img.get("src") or "")
- src_span = None
- f13 = container.select_one("div.f13")
- if f13:
- src_span = f13.find("span")
- if not src_span:
- src_span = container.find("span", attrs={"class": lambda x: isinstance(x, str) and "c-color-gray" in x})
- if src_span:
- source = clean_text(src_span.get_text())
- if not source and link:
- try:
- parsed = urllib.parse.urlparse(link)
- source = parsed.netloc
- except Exception:
- source = ""
-
- # Try to extract date
- date = ""
-
- # Strategy 1: Look for specific class patterns commonly used by Baidu
- # .newTimeFactor_... is common in new PMD, .c-color-gray2 is legacy
- date_candidates = container.find_all("span", attrs={"class": lambda x: isinstance(x, str) and (
- "TimeFactor" in x or
- "c-color-gray2" in x or
- "c-gray" in x or
- "cos-color-text-minor" in x or
- "source-time" in x
- )})
-
- for candidate in date_candidates:
- text = clean_text(candidate.get_text())
- # Check if it matches date pattern
- # Matches: 2023年10月1日, 2023-10-01, 3小时前, 5分钟前, 2天前
- match = re.search(r"(\d{4}年\d{1,2}月\d{1,2}日|\d{4}-\d{1,2}-\d{1,2}|\d+(小时|分钟|天)前)", text)
- if match:
- date = match.group(0).strip()
- break
-
- # Strategy 2: Fallback for Aladdin cards (often in div with specific structure)
- if not date:
- # Check for source-date wrapper often found in news results
- source_date = container.select_one(".c-span-last")
- if source_date:
- text = clean_text(source_date.get_text())
- match = re.search(r"(\d{4}年\d{1,2}月\d{1,2}日|\d{4}-\d{1,2}-\d{1,2}|\d+(小时|分钟|天)前)", text)
- if match:
- date = match.group(0).strip()
-
- return {
- "title": title,
- "abstract": abstract,
- "source": source,
- "date": date,
- "cover": cover,
- "link": link,
- }
- def parse_results(html, limit=10):
- try:
- soup = BeautifulSoup(html, "lxml")
- except Exception:
- soup = BeautifulSoup(html, "html.parser")
- items = []
- seen = set()
- anchors = soup.select("#content_left h3.t a, #content_left h3 a")
- for a in anchors:
- c = a
- container = None
- for _ in range(10):
- c = c.find_parent("div")
- if not c:
- break
- cls = " ".join(c.get("class", []))
- if "result" in cls:
- container = c
- break
- if not container:
- container = a.find_parent("div")
- item = extract_result_item(container or a)
- key = (item["title"], item["abstract"])
- if not item["title"]:
- continue
- if key in seen:
- continue
- seen.add(key)
- items.append(item)
- if len(items) >= limit:
- break
- if not items:
- containers = soup.select("div.result, div[class*=result]")
- for c in containers:
- item = extract_result_item(c)
- key = (item["title"], item["abstract"])
- if not item["title"]:
- continue
- if key in seen:
- continue
- seen.add(key)
- items.append(item)
- if len(items) >= limit:
- break
- return items
- async def _fetch_with_crawl4ai(url):
- """
- Fetch page content using Crawl4AI (Playwright) to handle JS and basic anti-bot checks
- """
- print(f"Fallback to Crawl4AI for: {url}")
- browser_config = BrowserConfig(
- headless=True,
- verbose=False,
- java_script_enabled=True,
- )
- run_config = CrawlerRunConfig(
- cache_mode=CacheMode.BYPASS,
- # Wait for the main content container
- wait_for="#content_left"
- )
-
- async with AsyncWebCrawler(config=browser_config) as crawler:
- result = await crawler.arun(
- url=url,
- config=run_config
- )
- return result.html
- def fetch_html(wd, pn):
- params = {"wd": wd, "pn": str(pn)}
- headers = build_headers()
- url_https = "https://www.baidu.com/s"
-
- # Construct full URL for Crawl4AI if needed
- full_url = f"{url_https}?{urllib.parse.urlencode(params)}"
-
- try:
- r = requests.get(url_https, params=params, headers=headers, timeout=15)
- r.raise_for_status()
- # Force UTF-8 encoding as Baidu sometimes returns ISO-8859-1 header
- r.encoding = "utf-8"
- text = r.text
-
- # Check for security verification or simple redirect
- if "location.replace" in text or "http-equiv=\"refresh\"" in text or "百度安全验证" in text or "wappass.baidu.com" in text:
- print("Detected redirect/security check, trying Crawl4AI...")
- return asyncio.run(_fetch_with_crawl4ai(full_url))
-
- return text
- except Exception as e:
- print(f"Requests failed ({e}), trying Crawl4AI...")
- try:
- return asyncio.run(_fetch_with_crawl4ai(full_url))
- except Exception as ai_e:
- print(f"Crawl4AI failed: {ai_e}", file=sys.stderr)
- raise e
- def compute_pn(page):
- if page < 1:
- return 0
- return (page - 1) * 10
- def run_spider(keyword, page=1, limit=10):
- """
- Executes the Baidu spider
- :param keyword: Search keyword
- :param page: Page number
- :param limit: Result limit
- :return: List of results
- """
- try:
- pn = compute_pn(page)
- html = fetch_html(keyword, pn)
- items = parse_results(html, limit=limit)
- return items
- except Exception as e:
- print(f"Error running spider: {e}", file=sys.stderr)
- return []
|