| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938 |
- #!/usr/bin/env python3
- """
- Aliyun Model Price Scraper - Final Improved Version
- - 使用 Playwright 渲染页面并抓取"模型价格"区域内的价格信息
- - 支持单个模型页面 URL,或从文件读取多个 URL
- 改进要点:
- 1. 能够生成阶梯计费结构:{input: {tier1: {...}, tier2: {...}}, output: {...}}
- 2. 优惠标记正确处理:label只保留基础部分,优惠信息放入note字段
- 3. 强化过滤:完全排除工具调用价格(包括"千次调用"单位)
- 依赖:
- pip install playwright beautifulsoup4 lxml
- python -m playwright install
- 用法示例:
- python scrape_aliyun_models.py --url "https://bailian.console.aliyun.com/.../qwen3-max"
- python scrape_aliyun_models.py --file urls.txt
- 输出: JSON 到 stdout
- """
- import argparse
- import json
- import re
- import time
- import os
- from typing import List, Dict, Optional
- from bs4 import BeautifulSoup, FeatureNotFound
- from bs4.element import Tag
- from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
- TOOL_CALL_RE = re.compile(
- r"调用|工具|接口|api调用|api|次调用|千次调用|/千次|每千次|搜索策略|代码解释|文生图|数据增强|模型推理",
- re.I,
- )
- def _is_tool_call_item(label: str, raw: str, unit: str) -> bool:
- label_l = label.lower()
- raw_l = raw.lower()
- unit_l = unit.lower()
- if TOOL_CALL_RE.search(label_l) or TOOL_CALL_RE.search(raw_l) or TOOL_CALL_RE.search(unit_l):
- return True
- if "千次" in unit_l or "/千" in unit_l or "次调用" in unit_l:
- return True
- return False
- def _find_nearest_tier_label(lines: List[str], idx: int) -> Optional[str]:
- tier_re = re.compile(r"(输入|输出).*(<=|>=|<|>|\b\d+\s*k|\d+\s*万|\d+\s*千|\d+\s*tokens?)", re.I)
- for step in range(1, 6):
- for pos in (idx - step, idx + step):
- if pos < 0 or pos >= len(lines):
- continue
- candidate = lines[pos]
- if not candidate or re.search(r"([0-9]+(?:\.[0-9]+)?)\s*元", candidate, re.I):
- continue
- if tier_re.search(candidate):
- return candidate.strip()
- return None
- def _open_tier_dropdown(page) -> bool:
- try:
- # 先尝试用 Playwright 原生点击,定位包含"输入"且有 k 范围的 select
- try:
- # 精准定位:文本包含"输入"和"k"的 select selector
- selector = page.locator(".efm_ant-select-selector, .ant-select-selector").filter(has_text=re.compile(r"输入.*\d+\s*[kK]"))
- if selector.count() > 0:
- selector.first.click(timeout=3000)
- time.sleep(0.5)
- print("[DEBUG] 原生点击成功")
- return True
- except Exception as e:
- print(f"[DEBUG] 原生点击失败: {e}")
- # 回退到 JS 点击
- ok = page.evaluate(
- """
- () => {
- const isVisible = (el) => {
- if (!el) return false;
- const rect = el.getBoundingClientRect();
- const style = window.getComputedStyle(el);
- return rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden';
- };
- const norm = (s) => (s || '').replace(/\s+/g, ' ').trim();
- const tierRe = /输入.*\d+\s*[kK]/i;
- // 优先找 selector 节点,文本匹配"输入<=32k"之类
- let clickEl = null;
- const selectors = Array.from(document.querySelectorAll(
- ".efm_ant-select-selector, .ant-select-selector"
- ));
- for (const el of selectors) {
- const txt = norm(el.innerText || el.textContent);
- if (tierRe.test(txt) && isVisible(el)) {
- clickEl = el;
- break;
- }
- }
- if (!clickEl) {
- // 回退:找包含"输入"的 select 容器
- const containers = Array.from(document.querySelectorAll(
- ".efm_ant-select, .ant-select"
- ));
- for (const el of containers) {
- const txt = norm(el.innerText || el.textContent);
- if (tierRe.test(txt) && isVisible(el)) {
- clickEl = el.querySelector(".efm_ant-select-selector, .ant-select-selector") || el;
- break;
- }
- }
- }
- if (!isVisible(clickEl)) return false;
- clickEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true }));
- clickEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true }));
- clickEl.click();
- return true;
- }
- """
- )
- time.sleep(0.5)
- return bool(ok)
- except Exception:
- return False
- def _normalize_tier_option(opt: str) -> str:
- """从下拉原始文本中提取并规范化阶梯键,优先返回 input/output 范围键。
- 例如:"输入<=32k" -> "input<=32k";"32k<输入<=128k" -> "32k<input<=128k"。
- 如果无法识别,返回去掉空白的短文本。
- """
- if not opt:
- return "unknown"
- s = opt.replace('\u00a0', ' ')
- # 常见模式
- m = re.search(r"(\d+\s*k\s*<\s*输入\s*<=\s*\d+\s*k)", s, re.I)
- if not m:
- m = re.search(r"(输入\s*<=\s*\d+\s*k)", s, re.I)
- if not m:
- m = re.search(r"(\d+\s*k\s*<\s*输入)", s, re.I)
- if m:
- key = m.group(1)
- key = re.sub(r"\s+", "", key)
- key = key.replace("输入", "input").replace("输出", "output")
- return key
- # 退化策略:找包含输入或输出的数字范围
- if "输入" in s or "输出" in s:
- nums = re.findall(r"\d+\s*k", s, re.I)
- if nums:
- joined = "-".join([n.replace(' ', '') for n in nums])
- if "输入" in s:
- return f"input_{joined}"
- return f"output_{joined}"
- # 最后回退到简短、安全的键
- short = re.sub(r"\s+", " ", s).strip()
- return short[:60]
- def _get_tier_options(page) -> List[str]:
- if not _open_tier_dropdown(page):
- print("[DEBUG] 未找到可点击的阶梯计费触发器")
- return []
- print("[DEBUG] 已展开阶梯计费下拉")
- # 打印点击后所有可见容器的 class,帮助定位下拉 portal
- try:
- containers = page.evaluate(
- """
- () => {
- const isVisible = (el) => {
- const r = el.getBoundingClientRect();
- const s = window.getComputedStyle(el);
- return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden';
- };
- return Array.from(document.querySelectorAll('div,ul'))
- .filter(el => isVisible(el))
- .map(el => ({ cls: el.className, childCount: el.children.length,
- text: (el.innerText||'').replace(/\\s+/g,' ').trim().slice(0,80) }))
- .filter(x => /select|dropdown|popup|overlay|option|list|menu/i.test(x.cls));
- }
- """
- )
- for c in containers:
- print(f"[CONTAINER] cls={c['cls']!r:.80} children={c['childCount']} text={c['text']!r:.60}")
- except Exception as e:
- print(f"[DEBUG] 容器诊断失败: {e}")
- # 等待下拉容器出现(扩大选择器范围)
- dropdown_sel = (
- ".efm_ant-select-dropdown, .ant-select-dropdown, "
- "[class*='dropdown'], [class*='popup'], [class*='select-list']"
- )
- try:
- page.wait_for_selector(dropdown_sel, state="visible", timeout=3000)
- print("[DEBUG] 下拉容器已出现")
- except Exception:
- print("[DEBUG] 下拉容器未出现,尝试继续")
- options = []
- # 策略1:从下拉容器内取选项文本
- try:
- options = page.evaluate(
- """
- () => {
- const isVisible = (el) => {
- const r = el.getBoundingClientRect();
- const s = window.getComputedStyle(el);
- return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden';
- };
- // 找到展开的下拉容器
- const dropdown = Array.from(document.querySelectorAll(
- '.efm_ant-select-dropdown, .ant-select-dropdown'
- )).find(el => isVisible(el));
- if (!dropdown) return [];
- // 取容器内所有叶子节点文本
- const leaves = Array.from(dropdown.querySelectorAll('*'))
- .filter(el => isVisible(el) && el.children.length === 0);
- const texts = leaves
- .map(el => (el.innerText || el.textContent || '').replace(/\\s+/g, ' ').trim())
- .filter(t => t.length > 0 && t.length < 60);
- return Array.from(new Set(texts));
- }
- """
- )
- print(f"[DEBUG] 下拉容器内文本: {options}")
- # 只保留档位选项(含输入+k范围)
- options = [t for t in options if re.search(r"输入", t) and re.search(r"\d+\s*[kK]", t)]
- except Exception as e:
- print(f"[DEBUG] 下拉容器提取失败: {e}")
- options = []
- # 策略2:宽松兜底——整个页面可见叶子节点,文本含输入+k范围
- if not options:
- print("[DEBUG] 下拉容器未找到,尝试宽松兜底")
- try:
- options = page.evaluate(
- """
- () => {
- const isVisible = (el) => {
- const r = el.getBoundingClientRect();
- const s = window.getComputedStyle(el);
- return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden';
- };
- const texts = Array.from(document.querySelectorAll('*'))
- .filter(el => isVisible(el) && el.children.length === 0)
- .map(el => (el.innerText || el.textContent || '').replace(/\\s+/g, ' ').trim())
- .filter(t => t.length < 60 && /输入/.test(t) && /\\d+\\s*[kK]/.test(t) && /<=|</.test(t));
- return Array.from(new Set(texts));
- }
- """
- )
- print(f"[DEBUG] 宽松兜底找到: {options}")
- except Exception as e:
- print(f"[DEBUG] 宽松兜底失败: {e}")
- options = []
- print(f"[DEBUG] 找到的档位选项: {options}")
- try:
- page.keyboard.press("Escape")
- except Exception:
- pass
- return list(dict.fromkeys(options))
- def _select_tier_option(page, option_text: str) -> bool:
- # 每次选择前都重新展开下拉
- if not _open_tier_dropdown(page):
- print(f"[DEBUG] 选择 {option_text} 失败: 未能展开下拉")
- return False
-
- # 等待下拉出现
- try:
- page.wait_for_selector(
- ".efm_ant-select-dropdown, .ant-select-dropdown",
- state="visible",
- timeout=2000,
- )
- except Exception:
- print(f"[DEBUG] 选择 {option_text} 失败: 下拉未出现")
- return False
- try:
- print(f"[DEBUG] 尝试选择档位: {option_text}")
-
- # 优先用原生点击
- try:
- option_loc = page.get_by_text(option_text, exact=True).first
- option_loc.click(timeout=3000, force=False)
- time.sleep(0.6)
- print(f"[DEBUG] 成功选择档位: {option_text}")
- return True
- except Exception as e:
- print(f"[DEBUG] 原生点击失败: {e},尝试 JS 点击")
- # 回退到 JS 点击
- clicked = page.evaluate(
- """
- (opt) => {
- const isVisible = (el) => {
- if (!el) return false;
- const rect = el.getBoundingClientRect();
- const style = window.getComputedStyle(el);
- return rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden';
- };
- const norm = (s) => (s || '').replace(/\s+/g, ' ').trim();
- const nodes = Array.from(document.querySelectorAll(
- ".efm_ant-select-item-option-content, [role='option'], .efm_ant-select-item, .ant-select-item"
- ));
- const target = nodes.find((n) => norm(n.textContent) === opt && isVisible(n));
- if (!target) return false;
- const clickEl = target.closest(".efm_ant-select-item, [role='option']") || target;
- clickEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true }));
- clickEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true }));
- clickEl.click();
- return true;
- }
- """,
- option_text,
- )
- if clicked:
- time.sleep(0.6)
- print(f"[DEBUG] 成功选择档位: {option_text}")
- return True
- else:
- print(f"[DEBUG] JS 点击也失败")
- return False
- except Exception as e:
- print(f"[DEBUG] 选择档位 {option_text} 失败: {e}")
- return False
- def _ensure_tiered_pricing(page) -> None:
- try:
- toggle = page.locator("text=阶梯计费").first
- if toggle.count() > 0:
- toggle.click()
- time.sleep(0.3)
- except Exception:
- pass
- def parse_prices_from_text(text: str) -> List[Dict]:
- """从包含"模型价格"的块文本中提取价格项(标签 + 价格)。"""
- # 规范化换行,按行分割
- lines = [ln.strip() for ln in text.splitlines()]
- # 删除空行
- lines = [ln for ln in lines if ln]
- items = []
- # 遍历行,找到包含"元"的行,把它和前面的标签配对;支持行内多个价格(当前价/原价)和阶梯标签
- price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元", re.I)
- for idx, ln in enumerate(lines):
- matches = price_re.findall(ln)
- if not matches:
- continue
- # label 优先取价格前的文本片段,否则向上寻找上一非数字行
- label = None
- # 尝试同一行中价格前的文本(第一个价格)
- first_m = price_re.search(ln)
- if first_m:
- before = ln[: first_m.start()].strip()
- if before:
- label = before
- if not label:
- for j in range(idx - 1, -1, -1):
- if lines[j] and not price_re.search(lines[j]):
- label = lines[j]
- break
- if not label:
- label = f"price_{len(items) + 1}"
- # 处理原价行:优先附加到上一条记录
- if label == "原价":
- if items and matches:
- try:
- items[-1]["price_original"] = float(matches[0])
- except Exception:
- items[-1]["price_original"] = matches[0]
- items[-1].setdefault("note", "")
- if items[-1]["note"]:
- items[-1]["note"] += "; 原价显示"
- else:
- items[-1]["note"] = "原价显示"
- continue
- raw = ln
- # 处理同一行里有多个价格(例如:现价 0.005 元 原价 0.01 元 或 限时 5 折)
- # 针对阶梯计费的价格行,优先使用附近的范围标签
- if re.fullmatch(r"输入|输出", label.strip()):
- tier_label = _find_nearest_tier_label(lines, idx)
- if tier_label:
- label = tier_label
- entry: Dict = {"label": label.strip(), "raw": raw}
- try:
- nums = [float(x) for x in matches]
- if len(nums) == 1:
- entry["price"] = nums[0]
- else:
- # 启发式:较小为现价,较大为原价
- fnums = sorted(nums)
- entry["price_current"] = fnums[0]
- entry["price_original"] = fnums[-1]
- except Exception:
- # 回退:把第一个当作 price
- try:
- entry["price"] = float(matches[0])
- except Exception:
- entry["price"] = matches[0]
- # 检测单位与优惠标记
- unit = None
- if re.search(r"每千|每 1k|/千|/每千|tokens", raw, re.I):
- unit = "元/每千tokens"
- unit_m = re.search(r"元\s*/?\s*每[^\n,,;]*", raw)
- if unit_m:
- unit = unit_m.group(0)
- if unit:
- entry["unit"] = unit
- note = []
- if re.search(r"限时|折", raw):
- note.append("限时优惠")
- if re.search(r"原价", raw):
- note.append("原价显示")
- if note:
- entry["note"] = "; ".join(note)
- entry["currency"] = "CNY"
- items.append(entry)
- return items
- def extract_price_block_html(html: str) -> str:
- """定位包含"模型价格"标题的节点并返回其较大容器的文本(回退为整页文本)。
- 如果系统未安装 lxml,回退到内置的 html.parser。
- """
- try:
- soup = BeautifulSoup(html, "lxml")
- except FeatureNotFound:
- soup = BeautifulSoup(html, "html.parser")
- node = soup.find(string=re.compile(r"模型价格"))
- if not node:
- return soup.get_text(separator="\n")
- ancestor = node.parent
- for _ in range(6):
- txt = ancestor.get_text(separator="\n")
- if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower():
- return txt
- if ancestor.parent:
- ancestor = ancestor.parent
- else:
- break
- return ancestor.get_text(separator="\n")
- def extract_price_items_from_html(html: str) -> List[Dict]:
- """尝试从渲染后的 HTML 中结构化提取价格项。返回类似:
- [{label, price / price_current & price_original, currency, unit, note, raw}]
- 使用启发式规则,适配表格、行、div 等常见结构。
- """
- try:
- soup = BeautifulSoup(html, "lxml")
- except FeatureNotFound:
- soup = BeautifulSoup(html, "html.parser")
- node = soup.find(string=re.compile(r"模型价格"))
- if not node:
- return []
- ancestor = node.parent
- container = ancestor
- for _ in range(6):
- txt = ancestor.get_text(separator="\n")
- if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower():
- container = ancestor
- break
- if ancestor.parent:
- ancestor = ancestor.parent
- else:
- container = ancestor
- break
- price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元", re.I)
- items: List[Dict] = []
- # 优先使用容器的逐行文本解析,这样能更好地捕获"输入<=32k: 0.0025 元"之类的阶梯行
- container_text = container.get_text(separator="\n")
- items = parse_prices_from_text(container_text)
- def _postprocess_items(raw_items: List[Dict]) -> List[Dict]:
- filtered: List[Dict] = []
- for it in raw_items:
- raw = it.get("raw", "")
- label = it.get("label", "")
- unit = it.get("unit", "")
- tier = it.get("tier", "")
- # 过滤工具调用价格
- if _is_tool_call_item(label, raw, unit):
- continue
- # 原价行:尝试合并到上一条
- if "原价" in label and filtered:
- if "price" in it:
- filtered[-1]["price_original"] = it["price"]
- elif "price_current" in it and "price_original" in it:
- filtered[-1]["price_original"] = it["price_original"]
- filtered[-1].setdefault("note", "")
- if filtered[-1]["note"]:
- filtered[-1]["note"] += "; 原价显示"
- else:
- filtered[-1]["note"] = "原价显示"
- continue
- # 提取优惠信息(限时、折扣)并保存到 note
- notes = []
- discount_match = re.search(r"(限时)?([0-9.]+)\s*折", raw)
- if discount_match:
- discount = discount_match.group(2)
- notes.append(f"限时{discount}折")
- else:
- if re.search(r"限时|免费", raw) or re.search(r"限时|免费", label):
- if re.search(r"免费", raw):
- notes.append("限时免费")
- else:
- notes.append("限时优惠")
- if re.search(r"原价", raw):
- notes.append("原价显示")
- if notes:
- it["note"] = "; ".join(notes)
- # 单位探测(若尚未设置)
- if "unit" not in it:
- if re.search(r"每千|tokens|/千|/每千", raw, re.I):
- it["unit"] = "元/每千tokens"
- else:
- um = re.search(r"元\s*/?\s*每[^\n,,;]*", raw)
- if um:
- it["unit"] = um.group(0)
- # 清理 label:去掉优惠标记、折扣、单位等,只保留基础标签
- cleaned_label = re.sub(r"限时[0-9.]*折|限时|免费|原价|\s*元.*", "", label).strip()
- cleaned_label = re.sub(r"\s+", " ", cleaned_label).strip()
- if not cleaned_label:
- cleaned_label = "price"
- if tier:
- cleaned_label = f"{tier} {cleaned_label}".strip()
- it["label"] = cleaned_label
- # 统一币种
- it["currency"] = "CNY"
- filtered.append(it)
- return filtered
- filtered = _postprocess_items(items)
- # 把阶梯计费结构化为按 base(如 input/output)分组的字典
- # 结构为 {input: {tier_key: {price, ...}}, output: {...}}
- structured: List[Dict] = []
- grouped: Dict[str, Dict[str, Dict]] = {}
- for it in filtered:
- lbl = it.get("label", "")
- raw = it.get("raw", "")
- combined = lbl + " " + raw
-
- # 判断是否应该分组:如果有"输入"/"输出"关键词就分组
- should_group = False
- group = None
-
- if re.search(r"输入", lbl):
- should_group = True
- group = "input"
- elif re.search(r"输出", lbl):
- should_group = True
- group = "output"
- # 如果条目来自某个档位切换(有 tier 字段),则优先按该档位分组
- if "tier" in it:
- tier_raw = it.get("tier") or ""
- tier_key = _normalize_tier_option(tier_raw)
- # 如果 label 明确为输入/输出,则按 group 分类,否则尝试从 tier_key 推断
- if not group:
- if "input" in tier_key.lower():
- group = "input"
- elif "output" in tier_key.lower():
- group = "output"
- else:
- group = "input"
- tier_data = {k: v for k, v in it.items() if k not in ("label", "tier")}
- grouped.setdefault(group, {})[tier_key] = tier_data
- elif should_group and group:
- # 使用 label 作为 tier key(回退)
- key = lbl
- if group == "input":
- key = re.sub(r"^输入", "input", key)
- elif group == "output":
- key = re.sub(r"^输出", "output", key)
- tier_data = {k: v for k, v in it.items() if k not in ("label",)}
- grouped.setdefault(group, {})[key] = tier_data
- else:
- structured.append(it)
- # 把 grouped 转换为 dict 形式,保留 tiers 字段
- for g, mapping in grouped.items():
- structured.append({"label": g, "tiers": mapping})
- items = structured
- # 去重并返回
- # 如果没有解析到,尝试基于 class 名进行备选解析(应对字符编码或单位被伪元素渲染的情况)
- if not items:
- try:
- price_nodes = []
- # 找到 class 名中带 price 的元素作为价格值
- for el in soup.find_all(class_=re.compile(r"price", re.I)):
- text = el.get_text(" ", strip=True)
- # 跳过非数字文本
- if not re.search(r"[0-9]+(\.[0-9]+)?", text):
- continue
- price_nodes.append((el, text))
- seen = set()
- for el, text in price_nodes:
- if text in seen:
- continue
- seen.add(text)
- # 尝试寻找单位元素
- unit_el = el.find_next(class_=re.compile(r"unit", re.I))
- unit_text = unit_el.get_text(" ", strip=True) if unit_el else None
- # 尝试寻找标签:向上找包含 label 或 pricingLabel 的兄弟/父节点
- label = None
- p = el
- for _ in range(4):
- # 检查同级的 label 类
- sib_label = None
- parent = p.parent
- if parent:
- sib_label = parent.find(class_=re.compile(r"label", re.I))
- if sib_label and sib_label.get_text(strip=True):
- label = sib_label.get_text(" ", strip=True)
- break
- if parent is None:
- break
- p = parent
- if not label:
- # 尝试取前一个文本节点
- prev = el.previous_sibling
- steps = 0
- while prev and steps < 6:
- candidate = None
- if isinstance(prev, str) and prev.strip():
- candidate = prev.strip()
- else:
- try:
- candidate = prev.get_text(" ", strip=True)
- except Exception:
- candidate = None
- if candidate and not re.search(r"[0-9]", candidate):
- label = candidate
- break
- prev = prev.previous_sibling
- steps += 1
- entry = {"label": label or "price", "raw": text, "currency": "CNY"}
- try:
- entry["price"] = float(re.search(r"([0-9]+(?:\.[0-9]+)?)", text).group(1))
- except Exception:
- entry["price"] = text
- if unit_text:
- entry["unit"] = unit_text
- items.append(entry)
- except Exception:
- pass
- if items:
- items = _postprocess_items(items)
- return items
- def extract_price_items_global(html: str) -> List[Dict]:
- """在整个 HTML 中全局搜索价格字符串,尝试提取周围上下文作为 label。
- 这是最后的回退解析,适用于页面结构复杂或文本没包含"模型价格"定位词时。
- """
- try:
- soup = BeautifulSoup(html, "lxml")
- except FeatureNotFound:
- soup = BeautifulSoup(html, "html.parser")
- # 全局回退:仍然优先查找靠近"模型价格"标题的文本
- node = soup.find(string=re.compile(r"模型价格"))
- if not node:
- # 若页面没有"模型价格"关键词,则不尝试全页解析(避免抓到工具调用价格)
- return []
- ancestor = node.parent
- for _ in range(6):
- txt = ancestor.get_text(separator="\n")
- if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower():
- return parse_prices_from_text(txt)
- if ancestor.parent:
- ancestor = ancestor.parent
- else:
- break
- return parse_prices_from_text(ancestor.get_text(separator="\n"))
- def scrape_model_price(url: str, headless: bool = True, timeout: int = 20000, executable_path: Optional[str] = None) -> Dict:
- """用 Playwright 打开页面,等待渲染,然后提取价格信息。"""
- result = {"url": url, "error": None, "items": []}
- with sync_playwright() as p:
- launch_kwargs = {"headless": headless}
- if executable_path:
- launch_kwargs["executable_path"] = executable_path
- browser = p.chromium.launch(**launch_kwargs)
- context = browser.new_context()
- page = context.new_page()
- # 调试收集:网络响应、控制台消息
- network_hits = []
- console_logs = []
- def _on_console(msg):
- try:
- console_logs.append({"type": msg.type, "text": msg.text})
- except Exception:
- pass
- def _on_response(resp):
- try:
- url_r = resp.url
- ct = resp.headers.get("content-type", "")
- # 只尝试读取文本/JSON 响应,避免二进制
- if "application/json" in ct or ct.startswith("text") or "json" in url_r.lower() or "price" in url_r.lower():
- try:
- body = resp.text()
- except Exception:
- body = None
- snippet = None
- if body:
- if "元" in body or "price" in body.lower() or "tokens" in body.lower() or "price" in url_r.lower():
- snippet = body[:2000]
- if snippet:
- network_hits.append({"url": url_r, "content_type": ct, "snippet": snippet})
- except Exception:
- pass
- page.on("console", _on_console)
- page.on("response", _on_response)
- try:
- page.goto(url, wait_until="networkidle", timeout=timeout)
- except PlaywrightTimeoutError:
- # 有时 networkidle 很难触发,尝试使用 load 再等待一小会儿
- try:
- page.goto(url, wait_until="load", timeout=timeout)
- except Exception as e:
- result["error"] = f"导航失败: {e}"
- browser.close()
- return result
- # 等待页面内文本"模型价格"出现(最多等待 8 秒)
- try:
- page.wait_for_selector("text=模型价格", timeout=8000)
- except PlaywrightTimeoutError:
- # 继续也许页面没有精准文本,但尝试抓取页面内容
- pass
- # 小等待,确保异步渲染完成
- time.sleep(1.2)
- html = page.content()
- # 优先尝试结构化解析 HTML 表格/行,再回退到纯文本解析
- items = []
- try:
- items = extract_price_items_from_html(html)
- except Exception:
- items = []
- # 尝试展开阶梯计费下拉选项,逐档抓取
- tiered_items: List[Dict] = []
- try:
- _ensure_tiered_pricing(page)
- tier_options = _get_tier_options(page)
- print(f"[DEBUG] 总共找到 {len(tier_options)} 个档位")
- for opt in tier_options:
- if not _select_tier_option(page, opt):
- continue
- html = page.content()
- try:
- tier_items = extract_price_items_from_html(html)
- print(f"[DEBUG] 档位 {opt} 解析出 {len(tier_items)} 条价格")
- except Exception as e:
- print(f"[DEBUG] 档位 {opt} 解析失败: {e}")
- tier_items = []
- for it in tier_items:
- it["tier"] = opt
- tiered_items.extend(tier_items)
- except Exception as e:
- print(f"[DEBUG] 阶梯计费抓取异常: {e}")
- tiered_items = []
- print(f"[DEBUG] 总共收集 {len(tiered_items)} 条有档位标记的价格")
- if tiered_items:
- print("[DEBUG] 使用阶梯计费结果,替换普通结果")
- items = tiered_items
- # 如果没有解析到,尝试等待更通用的价格文本(如"xx 元"),并滚动触发懒加载后重试
- if not items:
- try:
- # 如果页面上没有明确的"模型价格"标题,等待任意包含"元"的价格文本
- page.wait_for_selector("text=/[0-9]+(\\.[0-9]+)?\\s*元/", timeout=8000)
- except PlaywrightTimeoutError:
- pass
- # 再次尝试解析(可能因为懒加载需要滚动)
- try:
- # 尝试滚动到底部并等待渲染
- page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- time.sleep(1.0)
- html = page.content()
- items = extract_price_items_from_html(html)
- except Exception:
- items = []
- # 最后回退到全文本解析
- if not items:
- text_block = extract_price_block_html(html)
- if not text_block:
- result["error"] = "未找到包含 '模型价格' 的区域,可能需要登录或页面结构不同。"
- browser.close()
- return result
- items = parse_prices_from_text(text_block)
- # 把解析到的 items 转换为仅包含模型价格的简洁结构
- def _build_price_map(parsed_items: List[Dict]) -> Dict:
- price_map: Dict = {}
- # 处理两种情况:1) items 中含有 'tiers'(已按 input/output 分组)
- # 2) 每条 item 带有 'tier' 字段(来自逐档抓取)或普通标签
- for it in parsed_items:
- # 如果已经是按分组结构(label=input/output, tiers=dict)
- if isinstance(it, dict) and it.get("tiers") and isinstance(it.get("tiers"), dict):
- for tier_key, tier_val in it["tiers"].items():
- k = _normalize_tier_option(tier_key)
- # 确保内部为 dict,允许同一档位下包含多个子条目
- price_map.setdefault(k, {})
- # 把 tier_val 放入档位下,以其原始 label 或 raw 作为子键
- sub_label = tier_val.get("label") or tier_val.get("raw") or k
- price_map[k][sub_label] = {k2: v for k2, v in tier_val.items() if k2 not in ("tier", "tiers", "label")}
- continue
- # 如果条目本身带有 tier 字段
- if it.get("tier"):
- tk = _normalize_tier_option(it.get("tier"))
- price_map.setdefault(tk, {})
- sub_label = it.get("label") or it.get("raw") or tk
- price_map[tk][sub_label] = {k: v for k, v in it.items() if k not in ("tier", "label")}
- continue
- # 普通非阶梯条目:直接以 label 为键
- lbl = it.get("label") or it.get("raw") or "price"
- # 若 label 已存在且是 dict(多条非阶梯同名),则合并为列表形式
- if lbl in price_map and not isinstance(price_map[lbl], list):
- price_map[lbl] = [price_map[lbl]]
- if isinstance(price_map.get(lbl), list):
- price_map[lbl].append({k: v for k, v in it.items() if k != "label"})
- else:
- price_map[lbl] = {k: v for k, v in it.items() if k != "label"}
- return price_map
- price_map = _build_price_map(items)
- result = {"url": url, "error": result.get("error"), "prices": price_map}
- browser.close()
- return result
- def main():
- ap = argparse.ArgumentParser(description="爬取阿里云模型市场页面的模型价格(基于 Playwright)")
- group = ap.add_mutually_exclusive_group(required=True)
- group.add_argument("--url", help="单个模型页面 URL")
- group.add_argument("--file", help="包含多个 URL(每行一个)的文件路径")
- ap.add_argument("--headful", action="store_true", help="以有头模式打开浏览器(方便调试)")
- ap.add_argument("--timeout", type=int, default=20000, help="导航超时(毫秒),默认20000")
- ap.add_argument("--browser-path", help="浏览器可执行文件完整路径(覆盖环境变量 PLAYWRIGHT_EXECUTABLE)")
- args = ap.parse_args()
- urls: List[str] = []
- if args.url:
- urls = [args.url]
- else:
- with open(args.file, "r", encoding="utf-8") as f:
- urls = [ln.strip() for ln in f if ln.strip()]
- results = []
- # 优先使用命令行传入的浏览器可执行路径,其次检查环境变量 PLAYWRIGHT_EXECUTABLE
- exec_path = None
- if args.browser_path:
- exec_path = args.browser_path
- else:
- exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE")
- # 环境变量 PLAYWRIGHT_HEADLESS=false 可强制有头模式
- headless = not args.headful
- if os.environ.get("PLAYWRIGHT_HEADLESS", "").lower() == "false":
- headless = False
- for u in urls:
- print(f"抓取: {u}")
- res = scrape_model_price(u, headless=headless, timeout=args.timeout, executable_path=exec_path)
- results.append(res)
- print(json.dumps(results, ensure_ascii=False, indent=2))
- if __name__ == "__main__":
- main()
|