|
|
@@ -0,0 +1,810 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+"""
|
|
|
+Aliyun Model Price Scraper - Final Improved Version
|
|
|
+- 使用 Playwright 渲染页面并抓取"模型价格"区域内的价格信息
|
|
|
+- 支持单个模型页面 URL,或从文件读取多个 URL
|
|
|
+
|
|
|
+改进要点:
|
|
|
+1. 能够生成阶梯计费结构:{input: {tier1: {...}, tier2: {...}}, output: {...}}
|
|
|
+2. 优惠标记正确处理:label只保留基础部分,优惠信息放入note字段
|
|
|
+3. 强化过滤:完全排除工具调用价格(包括"千次调用"单位)
|
|
|
+
|
|
|
+依赖:
|
|
|
+ pip install playwright beautifulsoup4 lxml
|
|
|
+ python -m playwright install
|
|
|
+
|
|
|
+用法示例:
|
|
|
+ python scrape_aliyun_models.py --url "https://bailian.console.aliyun.com/.../qwen3-max"
|
|
|
+ python scrape_aliyun_models.py --file urls.txt
|
|
|
+
|
|
|
+输出: JSON 到 stdout
|
|
|
+"""
|
|
|
+
|
|
|
+import argparse
|
|
|
+import json
|
|
|
+import re
|
|
|
+import time
|
|
|
+import os
|
|
|
+from typing import List, Dict, Optional
|
|
|
+
|
|
|
+from bs4 import BeautifulSoup, FeatureNotFound
|
|
|
+from bs4.element import Tag
|
|
|
+from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
|
|
|
+
|
|
|
+
|
|
|
+TOOL_CALL_RE = re.compile(
|
|
|
+ r"调用|工具|接口|api调用|api|次调用|千次调用|/千次|每千次|搜索策略|代码解释|文生图|数据增强|模型推理",
|
|
|
+ re.I,
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+def _is_tool_call_item(label: str, raw: str, unit: str) -> bool:
|
|
|
+ label_l = label.lower()
|
|
|
+ raw_l = raw.lower()
|
|
|
+ unit_l = unit.lower()
|
|
|
+ if TOOL_CALL_RE.search(label_l) or TOOL_CALL_RE.search(raw_l) or TOOL_CALL_RE.search(unit_l):
|
|
|
+ return True
|
|
|
+ if "千次" in unit_l or "/千" in unit_l or "次调用" in unit_l:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def _find_nearest_tier_label(lines: List[str], idx: int) -> Optional[str]:
|
|
|
+ tier_re = re.compile(r"(输入|输出).*(<=|>=|<|>|\b\d+\s*k|\d+\s*万|\d+\s*千|\d+\s*tokens?)", re.I)
|
|
|
+ for step in range(1, 6):
|
|
|
+ for pos in (idx - step, idx + step):
|
|
|
+ if pos < 0 or pos >= len(lines):
|
|
|
+ continue
|
|
|
+ candidate = lines[pos]
|
|
|
+ if not candidate or re.search(r"([0-9]+(?:\.[0-9]+)?)\s*元", candidate, re.I):
|
|
|
+ continue
|
|
|
+ if tier_re.search(candidate):
|
|
|
+ return candidate.strip()
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def _open_tier_dropdown(page) -> bool:
|
|
|
+ try:
|
|
|
+ try:
|
|
|
+ selector = page.locator(".efm_ant-select-selector, .ant-select-selector").filter(has_text=re.compile(r"输入.*\d+\s*[kK]"))
|
|
|
+ if selector.count() > 0:
|
|
|
+ selector.first.click(timeout=3000)
|
|
|
+ time.sleep(0.5)
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ pass
|
|
|
+
|
|
|
+ ok = page.evaluate(
|
|
|
+ """
|
|
|
+ () => {
|
|
|
+ const isVisible = (el) => {
|
|
|
+ if (!el) return false;
|
|
|
+ const rect = el.getBoundingClientRect();
|
|
|
+ const style = window.getComputedStyle(el);
|
|
|
+ return rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden';
|
|
|
+ };
|
|
|
+
|
|
|
+ const norm = (s) => (s || '').replace(/\s+/g, ' ').trim();
|
|
|
+ const tierRe = /输入.*\d+\s*[kK]/i;
|
|
|
+
|
|
|
+ let clickEl = null;
|
|
|
+ const selectors = Array.from(document.querySelectorAll(
|
|
|
+ ".efm_ant-select-selector, .ant-select-selector"
|
|
|
+ ));
|
|
|
+ for (const el of selectors) {
|
|
|
+ const txt = norm(el.innerText || el.textContent);
|
|
|
+ if (tierRe.test(txt) && isVisible(el)) {
|
|
|
+ clickEl = el;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!clickEl) {
|
|
|
+ const containers = Array.from(document.querySelectorAll(
|
|
|
+ ".efm_ant-select, .ant-select"
|
|
|
+ ));
|
|
|
+ for (const el of containers) {
|
|
|
+ const txt = norm(el.innerText || el.textContent);
|
|
|
+ if (tierRe.test(txt) && isVisible(el)) {
|
|
|
+ clickEl = el.querySelector(".efm_ant-select-selector, .ant-select-selector") || el;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!isVisible(clickEl)) return false;
|
|
|
+ clickEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true }));
|
|
|
+ clickEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true }));
|
|
|
+ clickEl.click();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ """
|
|
|
+ )
|
|
|
+ time.sleep(0.5)
|
|
|
+ return bool(ok)
|
|
|
+ except Exception:
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def _normalize_tier_option(opt: str) -> str:
|
|
|
+ if not opt:
|
|
|
+ return "unknown"
|
|
|
+ s = opt.replace('\u00a0', ' ')
|
|
|
+ m = re.search(r"(\d+\s*k\s*<\s*输入\s*<=\s*\d+\s*k)", s, re.I)
|
|
|
+ if not m:
|
|
|
+ m = re.search(r"(输入\s*<=\s*\d+\s*k)", s, re.I)
|
|
|
+ if not m:
|
|
|
+ m = re.search(r"(\d+\s*k\s*<\s*输入)", s, re.I)
|
|
|
+ if m:
|
|
|
+ key = m.group(1)
|
|
|
+ key = re.sub(r"\s+", "", key)
|
|
|
+ key = key.replace("输入", "input").replace("输出", "output")
|
|
|
+ return key
|
|
|
+
|
|
|
+ if "输入" in s or "输出" in s:
|
|
|
+ nums = re.findall(r"\d+\s*k", s, re.I)
|
|
|
+ if nums:
|
|
|
+ joined = "-".join([n.replace(' ', '') for n in nums])
|
|
|
+ if "输入" in s:
|
|
|
+ return f"input_{joined}"
|
|
|
+ return f"output_{joined}"
|
|
|
+
|
|
|
+ short = re.sub(r"\s+", " ", s).strip()
|
|
|
+ return short[:60]
|
|
|
+
|
|
|
+
|
|
|
+def _get_tier_options(page) -> List[str]:
|
|
|
+ if not _open_tier_dropdown(page):
|
|
|
+ return []
|
|
|
+
|
|
|
+ try:
|
|
|
+ page.wait_for_selector(
|
|
|
+ ".efm_ant-select-dropdown, .ant-select-dropdown",
|
|
|
+ state="visible", timeout=3000
|
|
|
+ )
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ options = []
|
|
|
+ try:
|
|
|
+ options = page.evaluate(
|
|
|
+ """
|
|
|
+ () => {
|
|
|
+ const isVisible = (el) => {
|
|
|
+ const r = el.getBoundingClientRect();
|
|
|
+ const s = window.getComputedStyle(el);
|
|
|
+ return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden';
|
|
|
+ };
|
|
|
+ const dropdown = Array.from(document.querySelectorAll(
|
|
|
+ '.efm_ant-select-dropdown, .ant-select-dropdown'
|
|
|
+ )).find(el => isVisible(el));
|
|
|
+ if (!dropdown) return [];
|
|
|
+ const leaves = Array.from(dropdown.querySelectorAll('*'))
|
|
|
+ .filter(el => isVisible(el) && el.children.length === 0);
|
|
|
+ const texts = leaves
|
|
|
+ .map(el => (el.innerText || el.textContent || '').replace(/\\s+/g, ' ').trim())
|
|
|
+ .filter(t => t.length > 0 && t.length < 60);
|
|
|
+ return Array.from(new Set(texts));
|
|
|
+ }
|
|
|
+ """
|
|
|
+ )
|
|
|
+ options = [t for t in options if re.search(r"输入", t) and re.search(r"\d+\s*[kK]", t)]
|
|
|
+ except Exception:
|
|
|
+ options = []
|
|
|
+
|
|
|
+ if not options:
|
|
|
+ try:
|
|
|
+ options = page.evaluate(
|
|
|
+ """
|
|
|
+ () => {
|
|
|
+ const isVisible = (el) => {
|
|
|
+ const r = el.getBoundingClientRect();
|
|
|
+ const s = window.getComputedStyle(el);
|
|
|
+ return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden';
|
|
|
+ };
|
|
|
+ const texts = Array.from(document.querySelectorAll('*'))
|
|
|
+ .filter(el => isVisible(el) && el.children.length === 0)
|
|
|
+ .map(el => (el.innerText || el.textContent || '').replace(/\\s+/g, ' ').trim())
|
|
|
+ .filter(t => t.length < 60 && /输入/.test(t) && /\\d+\\s*[kK]/.test(t) && /<=|</.test(t));
|
|
|
+ return Array.from(new Set(texts));
|
|
|
+ }
|
|
|
+ """
|
|
|
+ )
|
|
|
+ except Exception:
|
|
|
+ options = []
|
|
|
+
|
|
|
+ try:
|
|
|
+ page.keyboard.press("Escape")
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ return list(dict.fromkeys(options))
|
|
|
+
|
|
|
+
|
|
|
+def _select_tier_option(page, option_text: str) -> bool:
|
|
|
+ if not _open_tier_dropdown(page):
|
|
|
+ return False
|
|
|
+
|
|
|
+ try:
|
|
|
+ page.wait_for_selector(
|
|
|
+ ".efm_ant-select-dropdown, .ant-select-dropdown",
|
|
|
+ state="visible", timeout=2000,
|
|
|
+ )
|
|
|
+ except Exception:
|
|
|
+ return False
|
|
|
+
|
|
|
+ try:
|
|
|
+ try:
|
|
|
+ option_loc = page.get_by_text(option_text, exact=True).first
|
|
|
+ option_loc.click(timeout=3000, force=False)
|
|
|
+ time.sleep(0.6)
|
|
|
+ return True
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ clicked = page.evaluate(
|
|
|
+ """
|
|
|
+ (opt) => {
|
|
|
+ const isVisible = (el) => {
|
|
|
+ if (!el) return false;
|
|
|
+ const rect = el.getBoundingClientRect();
|
|
|
+ const style = window.getComputedStyle(el);
|
|
|
+ return rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden';
|
|
|
+ };
|
|
|
+ const norm = (s) => (s || '').replace(/\s+/g, ' ').trim();
|
|
|
+ const nodes = Array.from(document.querySelectorAll(
|
|
|
+ ".efm_ant-select-item-option-content, [role='option'], .efm_ant-select-item, .ant-select-item"
|
|
|
+ ));
|
|
|
+ const target = nodes.find((n) => norm(n.textContent) === opt && isVisible(n));
|
|
|
+ if (!target) return false;
|
|
|
+ const clickEl = target.closest(".efm_ant-select-item, [role='option']") || target;
|
|
|
+ clickEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true }));
|
|
|
+ clickEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true }));
|
|
|
+ clickEl.click();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ """,
|
|
|
+ option_text,
|
|
|
+ )
|
|
|
+ if clicked:
|
|
|
+ time.sleep(0.6)
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+ except Exception:
|
|
|
+ return False
|
|
|
+
|
|
|
+
|
|
|
+def _ensure_tiered_pricing(page) -> None:
|
|
|
+ try:
|
|
|
+ toggle = page.locator("text=阶梯计费").first
|
|
|
+ if toggle.count() > 0:
|
|
|
+ toggle.click()
|
|
|
+ time.sleep(0.3)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+def parse_prices_from_text(text: str) -> List[Dict]:
|
|
|
+ lines = [ln.strip() for ln in text.splitlines()]
|
|
|
+ lines = [ln for ln in lines if ln]
|
|
|
+
|
|
|
+ items = []
|
|
|
+ price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元", re.I)
|
|
|
+ for idx, ln in enumerate(lines):
|
|
|
+ matches = price_re.findall(ln)
|
|
|
+ if not matches:
|
|
|
+ continue
|
|
|
+
|
|
|
+ label = None
|
|
|
+ first_m = price_re.search(ln)
|
|
|
+ if first_m:
|
|
|
+ before = ln[: first_m.start()].strip()
|
|
|
+ if before:
|
|
|
+ label = before
|
|
|
+ if not label:
|
|
|
+ for j in range(idx - 1, -1, -1):
|
|
|
+ if lines[j] and not price_re.search(lines[j]):
|
|
|
+ label = lines[j]
|
|
|
+ break
|
|
|
+ if not label:
|
|
|
+ label = f"price_{len(items) + 1}"
|
|
|
+
|
|
|
+ if label == "原价":
|
|
|
+ if items and matches:
|
|
|
+ try:
|
|
|
+ items[-1]["price_original"] = float(matches[0])
|
|
|
+ except Exception:
|
|
|
+ items[-1]["price_original"] = matches[0]
|
|
|
+ items[-1].setdefault("note", "")
|
|
|
+ if items[-1]["note"]:
|
|
|
+ items[-1]["note"] += "; 原价显示"
|
|
|
+ else:
|
|
|
+ items[-1]["note"] = "原价显示"
|
|
|
+ continue
|
|
|
+
|
|
|
+ raw = ln
|
|
|
+
|
|
|
+ if re.fullmatch(r"输入|输出", label.strip()):
|
|
|
+ tier_label = _find_nearest_tier_label(lines, idx)
|
|
|
+ if tier_label:
|
|
|
+ label = tier_label
|
|
|
+
|
|
|
+ entry: Dict = {"label": label.strip(), "raw": raw}
|
|
|
+ try:
|
|
|
+ nums = [float(x) for x in matches]
|
|
|
+ if len(nums) == 1:
|
|
|
+ entry["price"] = nums[0]
|
|
|
+ else:
|
|
|
+ fnums = sorted(nums)
|
|
|
+ entry["price_current"] = fnums[0]
|
|
|
+ entry["price_original"] = fnums[-1]
|
|
|
+ except Exception:
|
|
|
+ try:
|
|
|
+ entry["price"] = float(matches[0])
|
|
|
+ except Exception:
|
|
|
+ entry["price"] = matches[0]
|
|
|
+
|
|
|
+ unit = None
|
|
|
+ if re.search(r"每千|每 1k|/千|/每千|tokens", raw, re.I):
|
|
|
+ unit = "元/每千tokens"
|
|
|
+ unit_m = re.search(r"元\s*/?\s*每[^\n,,;]*", raw)
|
|
|
+ if unit_m:
|
|
|
+ unit = unit_m.group(0)
|
|
|
+ if unit:
|
|
|
+ entry["unit"] = unit
|
|
|
+
|
|
|
+ note = []
|
|
|
+ if re.search(r"限时|折", raw):
|
|
|
+ note.append("限时优惠")
|
|
|
+ if re.search(r"原价", raw):
|
|
|
+ note.append("原价显示")
|
|
|
+ if note:
|
|
|
+ entry["note"] = "; ".join(note)
|
|
|
+
|
|
|
+ entry["currency"] = "CNY"
|
|
|
+ items.append(entry)
|
|
|
+
|
|
|
+ return items
|
|
|
+
|
|
|
+
|
|
|
+def extract_price_block_html(html: str) -> str:
|
|
|
+ try:
|
|
|
+ soup = BeautifulSoup(html, "lxml")
|
|
|
+ except FeatureNotFound:
|
|
|
+ soup = BeautifulSoup(html, "html.parser")
|
|
|
+
|
|
|
+ # 跳过 script/style 标签内的文本节点
|
|
|
+ node = None
|
|
|
+ for n in soup.find_all(string=re.compile(r"模型价格")):
|
|
|
+ if n.parent and n.parent.name in ("script", "style"):
|
|
|
+ continue
|
|
|
+ node = n
|
|
|
+ break
|
|
|
+
|
|
|
+ if not node:
|
|
|
+ return soup.get_text(separator="\n")
|
|
|
+
|
|
|
+ ancestor = node.parent
|
|
|
+ for _ in range(6):
|
|
|
+ txt = ancestor.get_text(separator="\n")
|
|
|
+ if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower():
|
|
|
+ return txt
|
|
|
+ if ancestor.parent:
|
|
|
+ ancestor = ancestor.parent
|
|
|
+ else:
|
|
|
+ break
|
|
|
+ return ancestor.get_text(separator="\n")
|
|
|
+
|
|
|
+
|
|
|
+def extract_price_items_from_html(html: str) -> List[Dict]:
|
|
|
+ try:
|
|
|
+ soup = BeautifulSoup(html, "lxml")
|
|
|
+ except FeatureNotFound:
|
|
|
+ soup = BeautifulSoup(html, "html.parser")
|
|
|
+
|
|
|
+ node = None
|
|
|
+ for n in soup.find_all(string=re.compile(r"模型价格")):
|
|
|
+ if n.parent and n.parent.name in ("script", "style"):
|
|
|
+ continue
|
|
|
+ node = n
|
|
|
+ break
|
|
|
+ if not node:
|
|
|
+ return []
|
|
|
+
|
|
|
+ ancestor = node.parent
|
|
|
+ container = ancestor
|
|
|
+ for _ in range(6):
|
|
|
+ txt = ancestor.get_text(separator="\n")
|
|
|
+ if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower():
|
|
|
+ container = ancestor
|
|
|
+ break
|
|
|
+ if ancestor.parent:
|
|
|
+ ancestor = ancestor.parent
|
|
|
+ else:
|
|
|
+ container = ancestor
|
|
|
+ break
|
|
|
+
|
|
|
+ price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元", re.I)
|
|
|
+ items: List[Dict] = []
|
|
|
+
|
|
|
+ container_text = container.get_text(separator="\n")
|
|
|
+ items = parse_prices_from_text(container_text)
|
|
|
+
|
|
|
+ def _postprocess_items(raw_items: List[Dict]) -> List[Dict]:
|
|
|
+ filtered: List[Dict] = []
|
|
|
+ for it in raw_items:
|
|
|
+ raw = it.get("raw", "")
|
|
|
+ label = it.get("label", "")
|
|
|
+ unit = it.get("unit", "")
|
|
|
+
|
|
|
+ if _is_tool_call_item(label, raw, unit):
|
|
|
+ continue
|
|
|
+
|
|
|
+ if "原价" in label and filtered:
|
|
|
+ if "price" in it:
|
|
|
+ filtered[-1]["price_original"] = it["price"]
|
|
|
+ elif "price_current" in it and "price_original" in it:
|
|
|
+ filtered[-1]["price_original"] = it["price_original"]
|
|
|
+ filtered[-1].setdefault("note", "")
|
|
|
+ if filtered[-1]["note"]:
|
|
|
+ filtered[-1]["note"] += "; 原价显示"
|
|
|
+ else:
|
|
|
+ filtered[-1]["note"] = "原价显示"
|
|
|
+ continue
|
|
|
+
|
|
|
+ notes = []
|
|
|
+ discount_match = re.search(r"(限时)?([0-9.]+)\s*折", raw)
|
|
|
+ if discount_match:
|
|
|
+ discount = discount_match.group(2)
|
|
|
+ notes.append(f"限时{discount}折")
|
|
|
+ else:
|
|
|
+ if re.search(r"限时|免费", raw) or re.search(r"限时|免费", label):
|
|
|
+ if re.search(r"免费", raw):
|
|
|
+ notes.append("限时免费")
|
|
|
+ else:
|
|
|
+ notes.append("限时优惠")
|
|
|
+
|
|
|
+ if re.search(r"原价", raw):
|
|
|
+ notes.append("原价显示")
|
|
|
+ if notes:
|
|
|
+ it["note"] = "; ".join(notes)
|
|
|
+
|
|
|
+ if "unit" not in it:
|
|
|
+ if re.search(r"每千|tokens|/千|/每千", raw, re.I):
|
|
|
+ it["unit"] = "元/每千tokens"
|
|
|
+ else:
|
|
|
+ um = re.search(r"元\s*/?\s*每[^\n,,;]*", raw)
|
|
|
+ if um:
|
|
|
+ it["unit"] = um.group(0)
|
|
|
+
|
|
|
+ cleaned_label = re.sub(r"限时[0-9.]*折|限时|免费|原价|\s*元.*", "", label).strip()
|
|
|
+ cleaned_label = re.sub(r"\s+", " ", cleaned_label).strip()
|
|
|
+ if not cleaned_label:
|
|
|
+ cleaned_label = "price"
|
|
|
+ it["label"] = cleaned_label
|
|
|
+
|
|
|
+ it["currency"] = "CNY"
|
|
|
+ filtered.append(it)
|
|
|
+ return filtered
|
|
|
+
|
|
|
+ filtered = _postprocess_items(items)
|
|
|
+
|
|
|
+ structured: List[Dict] = []
|
|
|
+ grouped: Dict[str, Dict[str, Dict]] = {}
|
|
|
+
|
|
|
+ for it in filtered:
|
|
|
+ lbl = it.get("label", "")
|
|
|
+ raw = it.get("raw", "")
|
|
|
+ combined = lbl + " " + raw
|
|
|
+
|
|
|
+ should_group = False
|
|
|
+ group = None
|
|
|
+
|
|
|
+ if re.search(r"输入", lbl):
|
|
|
+ should_group = True
|
|
|
+ group = "input"
|
|
|
+ elif re.search(r"输出", lbl):
|
|
|
+ should_group = True
|
|
|
+ group = "output"
|
|
|
+ if "tier" in it:
|
|
|
+ tier_raw = it.get("tier") or ""
|
|
|
+ tier_key = _normalize_tier_option(tier_raw)
|
|
|
+ if not group:
|
|
|
+ if "input" in tier_key.lower():
|
|
|
+ group = "input"
|
|
|
+ elif "output" in tier_key.lower():
|
|
|
+ group = "output"
|
|
|
+ else:
|
|
|
+ group = "input"
|
|
|
+
|
|
|
+ tier_data = {k: v for k, v in it.items() if k not in ("label", "tier")}
|
|
|
+ grouped.setdefault(group, {})[tier_key] = tier_data
|
|
|
+ elif should_group and group:
|
|
|
+ key = lbl
|
|
|
+ if group == "input":
|
|
|
+ key = re.sub(r"^输入", "input", key)
|
|
|
+ elif group == "output":
|
|
|
+ key = re.sub(r"^输出", "output", key)
|
|
|
+ tier_data = {k: v for k, v in it.items() if k not in ("label",)}
|
|
|
+ grouped.setdefault(group, {})[key] = tier_data
|
|
|
+ else:
|
|
|
+ structured.append(it)
|
|
|
+
|
|
|
+ for g, mapping in grouped.items():
|
|
|
+ structured.append({"label": g, "tiers": mapping})
|
|
|
+
|
|
|
+ items = structured
|
|
|
+
|
|
|
+ if not items:
|
|
|
+ try:
|
|
|
+ price_nodes = []
|
|
|
+ for el in soup.find_all(class_=re.compile(r"price", re.I)):
|
|
|
+ text = el.get_text(" ", strip=True)
|
|
|
+ if not re.search(r"[0-9]+(\.[0-9]+)?", text):
|
|
|
+ continue
|
|
|
+ price_nodes.append((el, text))
|
|
|
+
|
|
|
+ seen = set()
|
|
|
+ for el, text in price_nodes:
|
|
|
+ if text in seen:
|
|
|
+ continue
|
|
|
+ seen.add(text)
|
|
|
+ unit_el = el.find_next(class_=re.compile(r"unit", re.I))
|
|
|
+ unit_text = unit_el.get_text(" ", strip=True) if unit_el else None
|
|
|
+
|
|
|
+ label = None
|
|
|
+ p = el
|
|
|
+ for _ in range(4):
|
|
|
+ sib_label = None
|
|
|
+ parent = p.parent
|
|
|
+ if parent:
|
|
|
+ sib_label = parent.find(class_=re.compile(r"label", re.I))
|
|
|
+ if sib_label and sib_label.get_text(strip=True):
|
|
|
+ label = sib_label.get_text(" ", strip=True)
|
|
|
+ break
|
|
|
+ if parent is None:
|
|
|
+ break
|
|
|
+ p = parent
|
|
|
+
|
|
|
+ if not label:
|
|
|
+ prev = el.previous_sibling
|
|
|
+ steps = 0
|
|
|
+ while prev and steps < 6:
|
|
|
+ candidate = None
|
|
|
+ if isinstance(prev, str) and prev.strip():
|
|
|
+ candidate = prev.strip()
|
|
|
+ else:
|
|
|
+ try:
|
|
|
+ candidate = prev.get_text(" ", strip=True)
|
|
|
+ except Exception:
|
|
|
+ candidate = None
|
|
|
+ if candidate and not re.search(r"[0-9]", candidate):
|
|
|
+ label = candidate
|
|
|
+ break
|
|
|
+ prev = prev.previous_sibling
|
|
|
+ steps += 1
|
|
|
+
|
|
|
+ entry = {"label": label or "price", "raw": text, "currency": "CNY"}
|
|
|
+ try:
|
|
|
+ entry["price"] = float(re.search(r"([0-9]+(?:\.[0-9]+)?)", text).group(1))
|
|
|
+ except Exception:
|
|
|
+ entry["price"] = text
|
|
|
+ if unit_text:
|
|
|
+ entry["unit"] = unit_text
|
|
|
+ items.append(entry)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ if items:
|
|
|
+ items = _postprocess_items(items)
|
|
|
+
|
|
|
+ return items
|
|
|
+
|
|
|
+
|
|
|
+def extract_price_items_global(html: str) -> List[Dict]:
|
|
|
+ try:
|
|
|
+ soup = BeautifulSoup(html, "lxml")
|
|
|
+ except FeatureNotFound:
|
|
|
+ soup = BeautifulSoup(html, "html.parser")
|
|
|
+
|
|
|
+ node = None
|
|
|
+ for n in soup.find_all(string=re.compile(r"模型价格")):
|
|
|
+ if n.parent and n.parent.name in ("script", "style"):
|
|
|
+ continue
|
|
|
+ node = n
|
|
|
+ break
|
|
|
+ if not node:
|
|
|
+ return []
|
|
|
+
|
|
|
+ ancestor = node.parent
|
|
|
+ for _ in range(6):
|
|
|
+ txt = ancestor.get_text(separator="\n")
|
|
|
+ if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower():
|
|
|
+ return parse_prices_from_text(txt)
|
|
|
+ if ancestor.parent:
|
|
|
+ ancestor = ancestor.parent
|
|
|
+ else:
|
|
|
+ break
|
|
|
+ return parse_prices_from_text(ancestor.get_text(separator="\n"))
|
|
|
+
|
|
|
+
|
|
|
+def scrape_model_price(url: str, headless: bool = True, timeout: int = 20000, executable_path: Optional[str] = None) -> Dict:
|
|
|
+ result = {"url": url, "error": None, "items": []}
|
|
|
+
|
|
|
+ with sync_playwright() as p:
|
|
|
+ launch_kwargs = {"headless": headless}
|
|
|
+ if executable_path:
|
|
|
+ launch_kwargs["executable_path"] = executable_path
|
|
|
+ browser = p.chromium.launch(**launch_kwargs)
|
|
|
+ context = browser.new_context()
|
|
|
+ page = context.new_page()
|
|
|
+
|
|
|
+ network_hits = []
|
|
|
+ console_logs = []
|
|
|
+
|
|
|
+ def _on_console(msg):
|
|
|
+ try:
|
|
|
+ console_logs.append({"type": msg.type, "text": msg.text})
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ def _on_response(resp):
|
|
|
+ try:
|
|
|
+ url_r = resp.url
|
|
|
+ ct = resp.headers.get("content-type", "")
|
|
|
+ if "application/json" in ct or ct.startswith("text") or "json" in url_r.lower() or "price" in url_r.lower():
|
|
|
+ try:
|
|
|
+ body = resp.text()
|
|
|
+ except Exception:
|
|
|
+ body = None
|
|
|
+ snippet = None
|
|
|
+ if body:
|
|
|
+ if "元" in body or "price" in body.lower() or "tokens" in body.lower() or "price" in url_r.lower():
|
|
|
+ snippet = body[:2000]
|
|
|
+ if snippet:
|
|
|
+ network_hits.append({"url": url_r, "content_type": ct, "snippet": snippet})
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+
|
|
|
+ page.on("console", _on_console)
|
|
|
+ page.on("response", _on_response)
|
|
|
+ try:
|
|
|
+ page.goto(url, wait_until="networkidle", timeout=timeout)
|
|
|
+ except PlaywrightTimeoutError:
|
|
|
+ try:
|
|
|
+ page.goto(url, wait_until="load", timeout=timeout)
|
|
|
+ except Exception as e:
|
|
|
+ result["error"] = f"导航失败: {e}"
|
|
|
+ browser.close()
|
|
|
+ return result
|
|
|
+
|
|
|
+ try:
|
|
|
+ page.wait_for_selector("text=模型价格", timeout=8000)
|
|
|
+ except PlaywrightTimeoutError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ time.sleep(1.2)
|
|
|
+ html = page.content()
|
|
|
+ items = []
|
|
|
+ try:
|
|
|
+ items = extract_price_items_from_html(html)
|
|
|
+ except Exception:
|
|
|
+ items = []
|
|
|
+
|
|
|
+ tiered_items: List[Dict] = []
|
|
|
+ try:
|
|
|
+ _ensure_tiered_pricing(page)
|
|
|
+ tier_options = _get_tier_options(page)
|
|
|
+ for opt in tier_options:
|
|
|
+ if not _select_tier_option(page, opt):
|
|
|
+ continue
|
|
|
+ html = page.content()
|
|
|
+ try:
|
|
|
+ tier_items = extract_price_items_from_html(html)
|
|
|
+ except Exception:
|
|
|
+ tier_items = []
|
|
|
+ for it in tier_items:
|
|
|
+ it["tier"] = opt
|
|
|
+ tiered_items.extend(tier_items)
|
|
|
+ except Exception:
|
|
|
+ tiered_items = []
|
|
|
+
|
|
|
+ if tiered_items:
|
|
|
+ items = tiered_items
|
|
|
+
|
|
|
+ if not items:
|
|
|
+ try:
|
|
|
+ page.wait_for_selector("text=/[0-9]+(\\.[0-9]+)?\\s*元/", timeout=8000)
|
|
|
+ except PlaywrightTimeoutError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ try:
|
|
|
+ page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
|
|
+ time.sleep(1.0)
|
|
|
+ html = page.content()
|
|
|
+ items = extract_price_items_from_html(html)
|
|
|
+ except Exception:
|
|
|
+ items = []
|
|
|
+
|
|
|
+ if not items:
|
|
|
+ text_block = extract_price_block_html(html)
|
|
|
+ if not text_block:
|
|
|
+ result["error"] = "未找到包含 '模型价格' 的区域,可能需要登录或页面结构不同。"
|
|
|
+ browser.close()
|
|
|
+ return result
|
|
|
+ items = parse_prices_from_text(text_block)
|
|
|
+
|
|
|
+ def _build_price_map(parsed_items: List[Dict]) -> Dict:
|
|
|
+ price_map: Dict = {}
|
|
|
+
|
|
|
+ for it in parsed_items:
|
|
|
+ if isinstance(it, dict) and it.get("tiers") and isinstance(it.get("tiers"), dict):
|
|
|
+ for tier_key, tier_val in it["tiers"].items():
|
|
|
+ k = _normalize_tier_option(tier_key)
|
|
|
+ price_map.setdefault(k, {})
|
|
|
+ sub_label = tier_val.get("label") or tier_val.get("raw") or k
|
|
|
+ price_map[k][sub_label] = {k2: v for k2, v in tier_val.items() if k2 not in ("tier", "tiers", "label")}
|
|
|
+ continue
|
|
|
+
|
|
|
+ if it.get("tier"):
|
|
|
+ tk = _normalize_tier_option(it.get("tier"))
|
|
|
+ price_map.setdefault(tk, {})
|
|
|
+ sub_label = it.get("label") or it.get("raw") or tk
|
|
|
+ price_map[tk][sub_label] = {k: v for k, v in it.items() if k not in ("tier", "label")}
|
|
|
+ continue
|
|
|
+
|
|
|
+ lbl = it.get("label") or it.get("raw") or "price"
|
|
|
+ if lbl in price_map and not isinstance(price_map[lbl], list):
|
|
|
+ price_map[lbl] = [price_map[lbl]]
|
|
|
+ if isinstance(price_map.get(lbl), list):
|
|
|
+ price_map[lbl].append({k: v for k, v in it.items() if k != "label"})
|
|
|
+ else:
|
|
|
+ price_map[lbl] = {k: v for k, v in it.items() if k != "label"}
|
|
|
+
|
|
|
+ return price_map
|
|
|
+
|
|
|
+ price_map = _build_price_map(items)
|
|
|
+ result = {"url": url, "error": result.get("error"), "prices": price_map}
|
|
|
+
|
|
|
+ browser.close()
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ ap = argparse.ArgumentParser(description="爬取阿里云模型市场页面的模型价格(基于 Playwright)")
|
|
|
+ group = ap.add_mutually_exclusive_group(required=True)
|
|
|
+ group.add_argument("--url", help="单个模型页面 URL")
|
|
|
+ group.add_argument("--file", help="包含多个 URL(每行一个)的文件路径")
|
|
|
+ ap.add_argument("--headful", action="store_true", help="以有头模式打开浏览器(方便调试)")
|
|
|
+ ap.add_argument("--timeout", type=int, default=20000, help="导航超时(毫秒),默认20000")
|
|
|
+ ap.add_argument("--browser-path", help="浏览器可执行文件完整路径")
|
|
|
+ args = ap.parse_args()
|
|
|
+
|
|
|
+ urls: List[str] = []
|
|
|
+ if args.url:
|
|
|
+ urls = [args.url]
|
|
|
+ else:
|
|
|
+ with open(args.file, "r", encoding="utf-8") as f:
|
|
|
+ urls = [ln.strip() for ln in f if ln.strip()]
|
|
|
+
|
|
|
+ exec_path = None
|
|
|
+ if args.browser_path:
|
|
|
+ exec_path = args.browser_path
|
|
|
+ else:
|
|
|
+ exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE")
|
|
|
+
|
|
|
+ headless = not args.headful
|
|
|
+ if os.environ.get("PLAYWRIGHT_HEADLESS", "").lower() == "false":
|
|
|
+ headless = False
|
|
|
+
|
|
|
+ results = []
|
|
|
+ for u in urls:
|
|
|
+ print(f"抓取: {u}")
|
|
|
+ res = scrape_model_price(u, headless=headless, timeout=args.timeout, executable_path=exec_path)
|
|
|
+ results.append(res)
|
|
|
+
|
|
|
+ print(json.dumps(results, ensure_ascii=False, indent=2))
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|