#!/usr/bin/env python3 """ Aliyun Model Price Scraper - Final Improved Version - 使用 Playwright 渲染页面并抓取"模型价格"区域内的价格信息 - 支持单个模型页面 URL,或从文件读取多个 URL 改进要点: 1. 能够生成阶梯计费结构:{input: {tier1: {...}, tier2: {...}}, output: {...}} 2. 优惠标记正确处理:label只保留基础部分,优惠信息放入note字段 3. 强化过滤:完全排除工具调用价格(包括"千次调用"单位) 依赖: pip install playwright beautifulsoup4 lxml python -m playwright install 用法示例: python scrape_aliyun_models.py --url "https://bailian.console.aliyun.com/.../qwen3-max" python scrape_aliyun_models.py --file urls.txt 输出: JSON 到 stdout """ import argparse import json import re import time import os from typing import List, Dict, Optional from bs4 import BeautifulSoup, FeatureNotFound from bs4.element import Tag from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError TOOL_CALL_RE = re.compile( r"调用|工具|接口|api调用|api|次调用|千次调用|/千次|每千次|搜索策略|代码解释|文生图|数据增强|模型推理", re.I, ) def _is_tool_call_item(label: str, raw: str, unit: str) -> bool: label_l = label.lower() raw_l = raw.lower() unit_l = unit.lower() if TOOL_CALL_RE.search(label_l) or TOOL_CALL_RE.search(raw_l) or TOOL_CALL_RE.search(unit_l): return True if "千次" in unit_l or "/千" in unit_l or "次调用" in unit_l: return True return False def _find_nearest_tier_label(lines: List[str], idx: int) -> Optional[str]: tier_re = re.compile(r"(输入|输出).*(<=|>=|<|>|\b\d+\s*k|\d+\s*万|\d+\s*千|\d+\s*tokens?)", re.I) for step in range(1, 6): for pos in (idx - step, idx + step): if pos < 0 or pos >= len(lines): continue candidate = lines[pos] if not candidate or re.search(r"([0-9]+(?:\.[0-9]+)?)\s*元", candidate, re.I): continue if tier_re.search(candidate): return candidate.strip() return None def _open_tier_dropdown(page) -> bool: try: try: selector = page.locator(".efm_ant-select-selector, .ant-select-selector").filter(has_text=re.compile(r"输入.*\d+\s*[kK]")) if selector.count() > 0: selector.first.click(timeout=3000) time.sleep(0.5) return True except Exception as e: pass ok = page.evaluate( """ () => { const isVisible = (el) => { if (!el) return false; const rect = el.getBoundingClientRect(); const style = window.getComputedStyle(el); return rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden'; }; const norm = (s) => (s || '').replace(/\s+/g, ' ').trim(); const tierRe = /输入.*\d+\s*[kK]/i; let clickEl = null; const selectors = Array.from(document.querySelectorAll( ".efm_ant-select-selector, .ant-select-selector" )); for (const el of selectors) { const txt = norm(el.innerText || el.textContent); if (tierRe.test(txt) && isVisible(el)) { clickEl = el; break; } } if (!clickEl) { const containers = Array.from(document.querySelectorAll( ".efm_ant-select, .ant-select" )); for (const el of containers) { const txt = norm(el.innerText || el.textContent); if (tierRe.test(txt) && isVisible(el)) { clickEl = el.querySelector(".efm_ant-select-selector, .ant-select-selector") || el; break; } } } if (!isVisible(clickEl)) return false; clickEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true })); clickEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true })); clickEl.click(); return true; } """ ) time.sleep(0.5) return bool(ok) except Exception: return False def _normalize_tier_option(opt: str) -> str: if not opt: return "unknown" s = opt.replace('\u00a0', ' ') m = re.search(r"(\d+\s*k\s*<\s*输入\s*<=\s*\d+\s*k)", s, re.I) if not m: m = re.search(r"(输入\s*<=\s*\d+\s*k)", s, re.I) if not m: m = re.search(r"(\d+\s*k\s*<\s*输入)", s, re.I) if m: key = m.group(1) key = re.sub(r"\s+", "", key) key = key.replace("输入", "input").replace("输出", "output") return key if "输入" in s or "输出" in s: nums = re.findall(r"\d+\s*k", s, re.I) if nums: joined = "-".join([n.replace(' ', '') for n in nums]) if "输入" in s: return f"input_{joined}" return f"output_{joined}" short = re.sub(r"\s+", " ", s).strip() return short[:60] def _get_tier_options(page) -> List[str]: if not _open_tier_dropdown(page): return [] try: page.wait_for_selector( ".efm_ant-select-dropdown, .ant-select-dropdown", state="visible", timeout=3000 ) except Exception: pass options = [] try: options = page.evaluate( """ () => { const isVisible = (el) => { const r = el.getBoundingClientRect(); const s = window.getComputedStyle(el); return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden'; }; const dropdown = Array.from(document.querySelectorAll( '.efm_ant-select-dropdown, .ant-select-dropdown' )).find(el => isVisible(el)); if (!dropdown) return []; const leaves = Array.from(dropdown.querySelectorAll('*')) .filter(el => isVisible(el) && el.children.length === 0); const texts = leaves .map(el => (el.innerText || el.textContent || '').replace(/\\s+/g, ' ').trim()) .filter(t => t.length > 0 && t.length < 60); return Array.from(new Set(texts)); } """ ) options = [t for t in options if re.search(r"输入", t) and re.search(r"\d+\s*[kK]", t)] except Exception: options = [] if not options: try: options = page.evaluate( """ () => { const isVisible = (el) => { const r = el.getBoundingClientRect(); const s = window.getComputedStyle(el); return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden'; }; const texts = Array.from(document.querySelectorAll('*')) .filter(el => isVisible(el) && el.children.length === 0) .map(el => (el.innerText || el.textContent || '').replace(/\\s+/g, ' ').trim()) .filter(t => t.length < 60 && /输入/.test(t) && /\\d+\\s*[kK]/.test(t) && /<=| bool: if not _open_tier_dropdown(page): return False try: page.wait_for_selector( ".efm_ant-select-dropdown, .ant-select-dropdown", state="visible", timeout=2000, ) except Exception: return False try: try: option_loc = page.get_by_text(option_text, exact=True).first option_loc.click(timeout=3000, force=False) time.sleep(0.6) return True except Exception: pass clicked = page.evaluate( """ (opt) => { const isVisible = (el) => { if (!el) return false; const rect = el.getBoundingClientRect(); const style = window.getComputedStyle(el); return rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden'; }; const norm = (s) => (s || '').replace(/\s+/g, ' ').trim(); const nodes = Array.from(document.querySelectorAll( ".efm_ant-select-item-option-content, [role='option'], .efm_ant-select-item, .ant-select-item" )); const target = nodes.find((n) => norm(n.textContent) === opt && isVisible(n)); if (!target) return false; const clickEl = target.closest(".efm_ant-select-item, [role='option']") || target; clickEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true })); clickEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true })); clickEl.click(); return true; } """, option_text, ) if clicked: time.sleep(0.6) return True return False except Exception: return False def _ensure_tiered_pricing(page) -> None: try: toggle = page.locator("text=阶梯计费").first if toggle.count() > 0: toggle.click() time.sleep(0.3) except Exception: pass def parse_prices_from_text(text: str) -> List[Dict]: lines = [ln.strip() for ln in text.splitlines()] lines = [ln for ln in lines if ln] items = [] price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元", re.I) for idx, ln in enumerate(lines): matches = price_re.findall(ln) if not matches: continue label = None first_m = price_re.search(ln) if first_m: before = ln[: first_m.start()].strip() if before: label = before if not label: for j in range(idx - 1, -1, -1): if lines[j] and not price_re.search(lines[j]): label = lines[j] break if not label: label = f"price_{len(items) + 1}" if label == "原价": if items and matches: try: items[-1]["price_original"] = float(matches[0]) except Exception: items[-1]["price_original"] = matches[0] items[-1].setdefault("note", "") if items[-1]["note"]: items[-1]["note"] += "; 原价显示" else: items[-1]["note"] = "原价显示" continue raw = ln if re.fullmatch(r"输入|输出", label.strip()): tier_label = _find_nearest_tier_label(lines, idx) if tier_label: label = tier_label entry: Dict = {"label": label.strip(), "raw": raw} try: nums = [float(x) for x in matches] if len(nums) == 1: entry["price"] = nums[0] else: fnums = sorted(nums) entry["price_current"] = fnums[0] entry["price_original"] = fnums[-1] except Exception: try: entry["price"] = float(matches[0]) except Exception: entry["price"] = matches[0] unit = None if re.search(r"每千|每 1k|/千|/每千|tokens", raw, re.I): unit = "元/每千tokens" unit_m = re.search(r"元\s*/?\s*每[^\n,,;]*", raw) if unit_m: unit = unit_m.group(0) if unit: entry["unit"] = unit note = [] if re.search(r"限时|折", raw): note.append("限时优惠") if re.search(r"原价", raw): note.append("原价显示") if note: entry["note"] = "; ".join(note) entry["currency"] = "CNY" items.append(entry) return items def extract_price_block_html(html: str) -> str: try: soup = BeautifulSoup(html, "lxml") except FeatureNotFound: soup = BeautifulSoup(html, "html.parser") # 跳过 script/style 标签内的文本节点 node = None for n in soup.find_all(string=re.compile(r"模型价格")): if n.parent and n.parent.name in ("script", "style"): continue node = n break if not node: return soup.get_text(separator="\n") ancestor = node.parent for _ in range(6): txt = ancestor.get_text(separator="\n") if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower(): return txt if ancestor.parent: ancestor = ancestor.parent else: break return ancestor.get_text(separator="\n") def extract_price_items_from_html(html: str) -> List[Dict]: try: soup = BeautifulSoup(html, "lxml") except FeatureNotFound: soup = BeautifulSoup(html, "html.parser") node = None for n in soup.find_all(string=re.compile(r"模型价格")): if n.parent and n.parent.name in ("script", "style"): continue node = n break if not node: return [] ancestor = node.parent container = ancestor for _ in range(6): txt = ancestor.get_text(separator="\n") if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower(): container = ancestor break if ancestor.parent: ancestor = ancestor.parent else: container = ancestor break price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元", re.I) items: List[Dict] = [] container_text = container.get_text(separator="\n") items = parse_prices_from_text(container_text) def _postprocess_items(raw_items: List[Dict]) -> List[Dict]: filtered: List[Dict] = [] for it in raw_items: raw = it.get("raw", "") label = it.get("label", "") unit = it.get("unit", "") if _is_tool_call_item(label, raw, unit): continue if "原价" in label and filtered: if "price" in it: filtered[-1]["price_original"] = it["price"] elif "price_current" in it and "price_original" in it: filtered[-1]["price_original"] = it["price_original"] filtered[-1].setdefault("note", "") if filtered[-1]["note"]: filtered[-1]["note"] += "; 原价显示" else: filtered[-1]["note"] = "原价显示" continue notes = [] discount_match = re.search(r"(限时)?([0-9.]+)\s*折", raw) if discount_match: discount = discount_match.group(2) notes.append(f"限时{discount}折") else: if re.search(r"限时|免费", raw) or re.search(r"限时|免费", label): if re.search(r"免费", raw): notes.append("限时免费") else: notes.append("限时优惠") if re.search(r"原价", raw): notes.append("原价显示") if notes: it["note"] = "; ".join(notes) if "unit" not in it: if re.search(r"每千|tokens|/千|/每千", raw, re.I): it["unit"] = "元/每千tokens" else: um = re.search(r"元\s*/?\s*每[^\n,,;]*", raw) if um: it["unit"] = um.group(0) cleaned_label = re.sub(r"限时[0-9.]*折|限时|免费|原价|\s*元.*", "", label).strip() cleaned_label = re.sub(r"\s+", " ", cleaned_label).strip() if not cleaned_label: cleaned_label = "price" it["label"] = cleaned_label it["currency"] = "CNY" filtered.append(it) return filtered filtered = _postprocess_items(items) structured: List[Dict] = [] grouped: Dict[str, Dict[str, Dict]] = {} for it in filtered: lbl = it.get("label", "") raw = it.get("raw", "") combined = lbl + " " + raw should_group = False group = None if re.search(r"输入", lbl): should_group = True group = "input" elif re.search(r"输出", lbl): should_group = True group = "output" if "tier" in it: tier_raw = it.get("tier") or "" tier_key = _normalize_tier_option(tier_raw) if not group: if "input" in tier_key.lower(): group = "input" elif "output" in tier_key.lower(): group = "output" else: group = "input" tier_data = {k: v for k, v in it.items() if k not in ("label", "tier")} grouped.setdefault(group, {})[tier_key] = tier_data elif should_group and group: key = lbl if group == "input": key = re.sub(r"^输入", "input", key) elif group == "output": key = re.sub(r"^输出", "output", key) tier_data = {k: v for k, v in it.items() if k not in ("label",)} grouped.setdefault(group, {})[key] = tier_data else: structured.append(it) for g, mapping in grouped.items(): structured.append({"label": g, "tiers": mapping}) items = structured if not items: try: price_nodes = [] for el in soup.find_all(class_=re.compile(r"price", re.I)): text = el.get_text(" ", strip=True) if not re.search(r"[0-9]+(\.[0-9]+)?", text): continue price_nodes.append((el, text)) seen = set() for el, text in price_nodes: if text in seen: continue seen.add(text) unit_el = el.find_next(class_=re.compile(r"unit", re.I)) unit_text = unit_el.get_text(" ", strip=True) if unit_el else None label = None p = el for _ in range(4): sib_label = None parent = p.parent if parent: sib_label = parent.find(class_=re.compile(r"label", re.I)) if sib_label and sib_label.get_text(strip=True): label = sib_label.get_text(" ", strip=True) break if parent is None: break p = parent if not label: prev = el.previous_sibling steps = 0 while prev and steps < 6: candidate = None if isinstance(prev, str) and prev.strip(): candidate = prev.strip() else: try: candidate = prev.get_text(" ", strip=True) except Exception: candidate = None if candidate and not re.search(r"[0-9]", candidate): label = candidate break prev = prev.previous_sibling steps += 1 entry = {"label": label or "price", "raw": text, "currency": "CNY"} try: entry["price"] = float(re.search(r"([0-9]+(?:\.[0-9]+)?)", text).group(1)) except Exception: entry["price"] = text if unit_text: entry["unit"] = unit_text items.append(entry) except Exception: pass if items: items = _postprocess_items(items) return items def extract_price_items_global(html: str) -> List[Dict]: try: soup = BeautifulSoup(html, "lxml") except FeatureNotFound: soup = BeautifulSoup(html, "html.parser") node = None for n in soup.find_all(string=re.compile(r"模型价格")): if n.parent and n.parent.name in ("script", "style"): continue node = n break if not node: return [] ancestor = node.parent for _ in range(6): txt = ancestor.get_text(separator="\n") if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower(): return parse_prices_from_text(txt) if ancestor.parent: ancestor = ancestor.parent else: break return parse_prices_from_text(ancestor.get_text(separator="\n")) def scrape_model_price(url: str, headless: bool = True, timeout: int = 20000, executable_path: Optional[str] = None) -> Dict: result = {"url": url, "error": None, "items": []} with sync_playwright() as p: launch_kwargs = {"headless": headless} if executable_path: launch_kwargs["executable_path"] = executable_path extra_args_env = os.environ.get("PLAYWRIGHT_EXTRA_ARGS", "") extra_args = [a.strip() for a in extra_args_env.split(",") if a.strip()] if extra_args: launch_kwargs["args"] = extra_args browser = p.chromium.launch(**launch_kwargs) context = browser.new_context() page = context.new_page() network_hits = [] console_logs = [] def _on_console(msg): try: console_logs.append({"type": msg.type, "text": msg.text}) except Exception: pass def _on_response(resp): try: url_r = resp.url ct = resp.headers.get("content-type", "") if "application/json" in ct or ct.startswith("text") or "json" in url_r.lower() or "price" in url_r.lower(): try: body = resp.text() except Exception: body = None snippet = None if body: if "元" in body or "price" in body.lower() or "tokens" in body.lower() or "price" in url_r.lower(): snippet = body[:2000] if snippet: network_hits.append({"url": url_r, "content_type": ct, "snippet": snippet}) except Exception: pass page.on("console", _on_console) page.on("response", _on_response) try: page.goto(url, wait_until="networkidle", timeout=timeout) except PlaywrightTimeoutError: try: page.goto(url, wait_until="load", timeout=timeout) except Exception as e: result["error"] = f"导航失败: {e}" browser.close() return result try: page.wait_for_selector("text=模型价格", timeout=8000) except PlaywrightTimeoutError: pass time.sleep(1.2) html = page.content() items = [] try: items = extract_price_items_from_html(html) except Exception: items = [] tiered_items: List[Dict] = [] try: _ensure_tiered_pricing(page) tier_options = _get_tier_options(page) for opt in tier_options: if not _select_tier_option(page, opt): continue html = page.content() try: tier_items = extract_price_items_from_html(html) except Exception: tier_items = [] for it in tier_items: it["tier"] = opt tiered_items.extend(tier_items) except Exception: tiered_items = [] if tiered_items: items = tiered_items if not items: try: page.wait_for_selector("text=/[0-9]+(\\.[0-9]+)?\\s*元/", timeout=8000) except PlaywrightTimeoutError: pass try: page.evaluate("window.scrollTo(0, document.body.scrollHeight)") time.sleep(1.0) html = page.content() items = extract_price_items_from_html(html) except Exception: items = [] if not items: text_block = extract_price_block_html(html) if not text_block: result["error"] = "未找到包含 '模型价格' 的区域,可能需要登录或页面结构不同。" browser.close() return result items = parse_prices_from_text(text_block) def _build_price_map(parsed_items: List[Dict]) -> Dict: price_map: Dict = {} for it in parsed_items: if isinstance(it, dict) and it.get("tiers") and isinstance(it.get("tiers"), dict): for tier_key, tier_val in it["tiers"].items(): k = _normalize_tier_option(tier_key) price_map.setdefault(k, {}) sub_label = tier_val.get("label") or tier_val.get("raw") or k price_map[k][sub_label] = {k2: v for k2, v in tier_val.items() if k2 not in ("tier", "tiers", "label")} continue if it.get("tier"): tk = _normalize_tier_option(it.get("tier")) price_map.setdefault(tk, {}) sub_label = it.get("label") or it.get("raw") or tk price_map[tk][sub_label] = {k: v for k, v in it.items() if k not in ("tier", "label")} continue lbl = it.get("label") or it.get("raw") or "price" if lbl in price_map and not isinstance(price_map[lbl], list): price_map[lbl] = [price_map[lbl]] if isinstance(price_map.get(lbl), list): price_map[lbl].append({k: v for k, v in it.items() if k != "label"}) else: price_map[lbl] = {k: v for k, v in it.items() if k != "label"} return price_map price_map = _build_price_map(items) result = {"url": url, "error": result.get("error"), "prices": price_map} browser.close() return result def main(): ap = argparse.ArgumentParser(description="爬取阿里云模型市场页面的模型价格(基于 Playwright)") group = ap.add_mutually_exclusive_group(required=True) group.add_argument("--url", help="单个模型页面 URL") group.add_argument("--file", help="包含多个 URL(每行一个)的文件路径") ap.add_argument("--headful", action="store_true", help="以有头模式打开浏览器(方便调试)") ap.add_argument("--timeout", type=int, default=20000, help="导航超时(毫秒),默认20000") ap.add_argument("--browser-path", help="浏览器可执行文件完整路径") args = ap.parse_args() urls: List[str] = [] if args.url: urls = [args.url] else: with open(args.file, "r", encoding="utf-8") as f: urls = [ln.strip() for ln in f if ln.strip()] exec_path = None if args.browser_path: exec_path = args.browser_path else: exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE") headless = not args.headful if os.environ.get("PLAYWRIGHT_HEADLESS", "").lower() == "false": headless = False results = [] for u in urls: print(f"抓取: {u}") res = scrape_model_price(u, headless=headless, timeout=args.timeout, executable_path=exec_path) results.append(res) print(json.dumps(results, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()