#!/usr/bin/env python3 """ Aliyun Model Price Scraper - Final Improved Version - 使用 Playwright 渲染页面并抓取"模型价格"区域内的价格信息 - 支持单个模型页面 URL,或从文件读取多个 URL 改进要点: 1. 能够生成阶梯计费结构:{input: {tier1: {...}, tier2: {...}}, output: {...}} 2. 优惠标记正确处理:label只保留基础部分,优惠信息放入note字段 3. 强化过滤:完全排除工具调用价格(包括"千次调用"单位) 依赖: pip install playwright beautifulsoup4 lxml python -m playwright install 用法示例: python scrape_aliyun_models.py --url "https://bailian.console.aliyun.com/.../qwen3-max" python scrape_aliyun_models.py --file urls.txt 输出: JSON 到 stdout """ import argparse import json import re import time import os from typing import List, Dict, Optional from bs4 import BeautifulSoup, FeatureNotFound from bs4.element import Tag from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError TOOL_CALL_RE = re.compile( r"调用|工具|接口|api调用|api|次调用|千次调用|/千次|每千次|搜索策略|代码解释|文生图|数据增强|模型推理", re.I, ) def _is_tool_call_item(label: str, raw: str, unit: str) -> bool: label_l = label.lower() raw_l = raw.lower() unit_l = unit.lower() if TOOL_CALL_RE.search(label_l) or TOOL_CALL_RE.search(raw_l) or TOOL_CALL_RE.search(unit_l): return True if "千次" in unit_l or "/千" in unit_l or "次调用" in unit_l: return True return False def _find_nearest_tier_label(lines: List[str], idx: int) -> Optional[str]: tier_re = re.compile(r"(输入|输出).*(<=|>=|<|>|\b\d+\s*k|\d+\s*万|\d+\s*千|\d+\s*tokens?)", re.I) for step in range(1, 6): for pos in (idx - step, idx + step): if pos < 0 or pos >= len(lines): continue candidate = lines[pos] if not candidate or re.search(r"([0-9]+(?:\.[0-9]+)?)\s*元", candidate, re.I): continue if tier_re.search(candidate): return candidate.strip() return None def _open_tier_dropdown(page) -> bool: try: # 先尝试用 Playwright 原生点击,定位包含"输入"且有 k 范围的 select try: # 精准定位:文本包含"输入"和"k"的 select selector selector = page.locator(".efm_ant-select-selector, .ant-select-selector").filter(has_text=re.compile(r"输入.*\d+\s*[kK]")) if selector.count() > 0: selector.first.click(timeout=3000) time.sleep(0.5) print("[DEBUG] 原生点击成功") return True except Exception as e: print(f"[DEBUG] 原生点击失败: {e}") # 回退到 JS 点击 ok = page.evaluate( """ () => { const isVisible = (el) => { if (!el) return false; const rect = el.getBoundingClientRect(); const style = window.getComputedStyle(el); return rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden'; }; const norm = (s) => (s || '').replace(/\s+/g, ' ').trim(); const tierRe = /输入.*\d+\s*[kK]/i; // 优先找 selector 节点,文本匹配"输入<=32k"之类 let clickEl = null; const selectors = Array.from(document.querySelectorAll( ".efm_ant-select-selector, .ant-select-selector" )); for (const el of selectors) { const txt = norm(el.innerText || el.textContent); if (tierRe.test(txt) && isVisible(el)) { clickEl = el; break; } } if (!clickEl) { // 回退:找包含"输入"的 select 容器 const containers = Array.from(document.querySelectorAll( ".efm_ant-select, .ant-select" )); for (const el of containers) { const txt = norm(el.innerText || el.textContent); if (tierRe.test(txt) && isVisible(el)) { clickEl = el.querySelector(".efm_ant-select-selector, .ant-select-selector") || el; break; } } } if (!isVisible(clickEl)) return false; clickEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true })); clickEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true })); clickEl.click(); return true; } """ ) time.sleep(0.5) return bool(ok) except Exception: return False def _normalize_tier_option(opt: str) -> str: """从下拉原始文本中提取并规范化阶梯键,优先返回 input/output 范围键。 例如:"输入<=32k" -> "input<=32k";"32k<输入<=128k" -> "32k List[str]: if not _open_tier_dropdown(page): print("[DEBUG] 未找到可点击的阶梯计费触发器") return [] print("[DEBUG] 已展开阶梯计费下拉") # 打印点击后所有可见容器的 class,帮助定位下拉 portal try: containers = page.evaluate( """ () => { const isVisible = (el) => { const r = el.getBoundingClientRect(); const s = window.getComputedStyle(el); return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden'; }; return Array.from(document.querySelectorAll('div,ul')) .filter(el => isVisible(el)) .map(el => ({ cls: el.className, childCount: el.children.length, text: (el.innerText||'').replace(/\\s+/g,' ').trim().slice(0,80) })) .filter(x => /select|dropdown|popup|overlay|option|list|menu/i.test(x.cls)); } """ ) for c in containers: print(f"[CONTAINER] cls={c['cls']!r:.80} children={c['childCount']} text={c['text']!r:.60}") except Exception as e: print(f"[DEBUG] 容器诊断失败: {e}") # 等待下拉容器出现(扩大选择器范围) dropdown_sel = ( ".efm_ant-select-dropdown, .ant-select-dropdown, " "[class*='dropdown'], [class*='popup'], [class*='select-list']" ) try: page.wait_for_selector(dropdown_sel, state="visible", timeout=3000) print("[DEBUG] 下拉容器已出现") except Exception: print("[DEBUG] 下拉容器未出现,尝试继续") options = [] # 策略1:从下拉容器内取选项文本 try: options = page.evaluate( """ () => { const isVisible = (el) => { const r = el.getBoundingClientRect(); const s = window.getComputedStyle(el); return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden'; }; // 找到展开的下拉容器 const dropdown = Array.from(document.querySelectorAll( '.efm_ant-select-dropdown, .ant-select-dropdown' )).find(el => isVisible(el)); if (!dropdown) return []; // 取容器内所有叶子节点文本 const leaves = Array.from(dropdown.querySelectorAll('*')) .filter(el => isVisible(el) && el.children.length === 0); const texts = leaves .map(el => (el.innerText || el.textContent || '').replace(/\\s+/g, ' ').trim()) .filter(t => t.length > 0 && t.length < 60); return Array.from(new Set(texts)); } """ ) print(f"[DEBUG] 下拉容器内文本: {options}") # 只保留档位选项(含输入+k范围) options = [t for t in options if re.search(r"输入", t) and re.search(r"\d+\s*[kK]", t)] except Exception as e: print(f"[DEBUG] 下拉容器提取失败: {e}") options = [] # 策略2:宽松兜底——整个页面可见叶子节点,文本含输入+k范围 if not options: print("[DEBUG] 下拉容器未找到,尝试宽松兜底") try: options = page.evaluate( """ () => { const isVisible = (el) => { const r = el.getBoundingClientRect(); const s = window.getComputedStyle(el); return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden'; }; const texts = Array.from(document.querySelectorAll('*')) .filter(el => isVisible(el) && el.children.length === 0) .map(el => (el.innerText || el.textContent || '').replace(/\\s+/g, ' ').trim()) .filter(t => t.length < 60 && /输入/.test(t) && /\\d+\\s*[kK]/.test(t) && /<=| bool: # 每次选择前都重新展开下拉 if not _open_tier_dropdown(page): print(f"[DEBUG] 选择 {option_text} 失败: 未能展开下拉") return False # 等待下拉出现 try: page.wait_for_selector( ".efm_ant-select-dropdown, .ant-select-dropdown", state="visible", timeout=2000, ) except Exception: print(f"[DEBUG] 选择 {option_text} 失败: 下拉未出现") return False try: print(f"[DEBUG] 尝试选择档位: {option_text}") # 优先用原生点击 try: option_loc = page.get_by_text(option_text, exact=True).first option_loc.click(timeout=3000, force=False) time.sleep(0.6) print(f"[DEBUG] 成功选择档位: {option_text}") return True except Exception as e: print(f"[DEBUG] 原生点击失败: {e},尝试 JS 点击") # 回退到 JS 点击 clicked = page.evaluate( """ (opt) => { const isVisible = (el) => { if (!el) return false; const rect = el.getBoundingClientRect(); const style = window.getComputedStyle(el); return rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden'; }; const norm = (s) => (s || '').replace(/\s+/g, ' ').trim(); const nodes = Array.from(document.querySelectorAll( ".efm_ant-select-item-option-content, [role='option'], .efm_ant-select-item, .ant-select-item" )); const target = nodes.find((n) => norm(n.textContent) === opt && isVisible(n)); if (!target) return false; const clickEl = target.closest(".efm_ant-select-item, [role='option']") || target; clickEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true })); clickEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true })); clickEl.click(); return true; } """, option_text, ) if clicked: time.sleep(0.6) print(f"[DEBUG] 成功选择档位: {option_text}") return True else: print(f"[DEBUG] JS 点击也失败") return False except Exception as e: print(f"[DEBUG] 选择档位 {option_text} 失败: {e}") return False def _ensure_tiered_pricing(page) -> None: try: toggle = page.locator("text=阶梯计费").first if toggle.count() > 0: toggle.click() time.sleep(0.3) except Exception: pass def parse_prices_from_text(text: str) -> List[Dict]: """从包含"模型价格"的块文本中提取价格项(标签 + 价格)。""" # 规范化换行,按行分割 lines = [ln.strip() for ln in text.splitlines()] # 删除空行 lines = [ln for ln in lines if ln] items = [] # 遍历行,找到包含"元"的行,把它和前面的标签配对;支持行内多个价格(当前价/原价)和阶梯标签 price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元", re.I) for idx, ln in enumerate(lines): matches = price_re.findall(ln) if not matches: continue # label 优先取价格前的文本片段,否则向上寻找上一非数字行 label = None # 尝试同一行中价格前的文本(第一个价格) first_m = price_re.search(ln) if first_m: before = ln[: first_m.start()].strip() if before: label = before if not label: for j in range(idx - 1, -1, -1): if lines[j] and not price_re.search(lines[j]): label = lines[j] break if not label: label = f"price_{len(items) + 1}" # 处理原价行:优先附加到上一条记录 if label == "原价": if items and matches: try: items[-1]["price_original"] = float(matches[0]) except Exception: items[-1]["price_original"] = matches[0] items[-1].setdefault("note", "") if items[-1]["note"]: items[-1]["note"] += "; 原价显示" else: items[-1]["note"] = "原价显示" continue raw = ln # 处理同一行里有多个价格(例如:现价 0.005 元 原价 0.01 元 或 限时 5 折) # 针对阶梯计费的价格行,优先使用附近的范围标签 if re.fullmatch(r"输入|输出", label.strip()): tier_label = _find_nearest_tier_label(lines, idx) if tier_label: label = tier_label entry: Dict = {"label": label.strip(), "raw": raw} try: nums = [float(x) for x in matches] if len(nums) == 1: entry["price"] = nums[0] else: # 启发式:较小为现价,较大为原价 fnums = sorted(nums) entry["price_current"] = fnums[0] entry["price_original"] = fnums[-1] except Exception: # 回退:把第一个当作 price try: entry["price"] = float(matches[0]) except Exception: entry["price"] = matches[0] # 检测单位与优惠标记 unit = None if re.search(r"每千|每 1k|/千|/每千|tokens", raw, re.I): unit = "元/每千tokens" unit_m = re.search(r"元\s*/?\s*每[^\n,,;]*", raw) if unit_m: unit = unit_m.group(0) if unit: entry["unit"] = unit note = [] if re.search(r"限时|折", raw): note.append("限时优惠") if re.search(r"原价", raw): note.append("原价显示") if note: entry["note"] = "; ".join(note) entry["currency"] = "CNY" items.append(entry) return items def extract_price_block_html(html: str) -> str: """定位包含"模型价格"标题的节点并返回其较大容器的文本(回退为整页文本)。 如果系统未安装 lxml,回退到内置的 html.parser。 """ try: soup = BeautifulSoup(html, "lxml") except FeatureNotFound: soup = BeautifulSoup(html, "html.parser") node = soup.find(string=re.compile(r"模型价格")) if not node: return soup.get_text(separator="\n") ancestor = node.parent for _ in range(6): txt = ancestor.get_text(separator="\n") if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower(): return txt if ancestor.parent: ancestor = ancestor.parent else: break return ancestor.get_text(separator="\n") def extract_price_items_from_html(html: str) -> List[Dict]: """尝试从渲染后的 HTML 中结构化提取价格项。返回类似: [{label, price / price_current & price_original, currency, unit, note, raw}] 使用启发式规则,适配表格、行、div 等常见结构。 """ try: soup = BeautifulSoup(html, "lxml") except FeatureNotFound: soup = BeautifulSoup(html, "html.parser") node = soup.find(string=re.compile(r"模型价格")) if not node: return [] ancestor = node.parent container = ancestor for _ in range(6): txt = ancestor.get_text(separator="\n") if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower(): container = ancestor break if ancestor.parent: ancestor = ancestor.parent else: container = ancestor break price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元", re.I) items: List[Dict] = [] # 优先使用容器的逐行文本解析,这样能更好地捕获"输入<=32k: 0.0025 元"之类的阶梯行 container_text = container.get_text(separator="\n") items = parse_prices_from_text(container_text) def _postprocess_items(raw_items: List[Dict]) -> List[Dict]: filtered: List[Dict] = [] for it in raw_items: raw = it.get("raw", "") label = it.get("label", "") unit = it.get("unit", "") tier = it.get("tier", "") # 过滤工具调用价格 if _is_tool_call_item(label, raw, unit): continue # 原价行:尝试合并到上一条 if "原价" in label and filtered: if "price" in it: filtered[-1]["price_original"] = it["price"] elif "price_current" in it and "price_original" in it: filtered[-1]["price_original"] = it["price_original"] filtered[-1].setdefault("note", "") if filtered[-1]["note"]: filtered[-1]["note"] += "; 原价显示" else: filtered[-1]["note"] = "原价显示" continue # 提取优惠信息(限时、折扣)并保存到 note notes = [] discount_match = re.search(r"(限时)?([0-9.]+)\s*折", raw) if discount_match: discount = discount_match.group(2) notes.append(f"限时{discount}折") else: if re.search(r"限时|免费", raw) or re.search(r"限时|免费", label): if re.search(r"免费", raw): notes.append("限时免费") else: notes.append("限时优惠") if re.search(r"原价", raw): notes.append("原价显示") if notes: it["note"] = "; ".join(notes) # 单位探测(若尚未设置) if "unit" not in it: if re.search(r"每千|tokens|/千|/每千", raw, re.I): it["unit"] = "元/每千tokens" else: um = re.search(r"元\s*/?\s*每[^\n,,;]*", raw) if um: it["unit"] = um.group(0) # 清理 label:去掉优惠标记、折扣、单位等,只保留基础标签 cleaned_label = re.sub(r"限时[0-9.]*折|限时|免费|原价|\s*元.*", "", label).strip() cleaned_label = re.sub(r"\s+", " ", cleaned_label).strip() if not cleaned_label: cleaned_label = "price" if tier: cleaned_label = f"{tier} {cleaned_label}".strip() it["label"] = cleaned_label # 统一币种 it["currency"] = "CNY" filtered.append(it) return filtered filtered = _postprocess_items(items) # 把阶梯计费结构化为按 base(如 input/output)分组的字典 # 结构为 {input: {tier_key: {price, ...}}, output: {...}} structured: List[Dict] = [] grouped: Dict[str, Dict[str, Dict]] = {} for it in filtered: lbl = it.get("label", "") raw = it.get("raw", "") combined = lbl + " " + raw # 判断是否应该分组:如果有"输入"/"输出"关键词就分组 should_group = False group = None if re.search(r"输入", lbl): should_group = True group = "input" elif re.search(r"输出", lbl): should_group = True group = "output" # 如果条目来自某个档位切换(有 tier 字段),则优先按该档位分组 if "tier" in it: tier_raw = it.get("tier") or "" tier_key = _normalize_tier_option(tier_raw) # 如果 label 明确为输入/输出,则按 group 分类,否则尝试从 tier_key 推断 if not group: if "input" in tier_key.lower(): group = "input" elif "output" in tier_key.lower(): group = "output" else: group = "input" tier_data = {k: v for k, v in it.items() if k not in ("label", "tier")} grouped.setdefault(group, {})[tier_key] = tier_data elif should_group and group: # 使用 label 作为 tier key(回退) key = lbl if group == "input": key = re.sub(r"^输入", "input", key) elif group == "output": key = re.sub(r"^输出", "output", key) tier_data = {k: v for k, v in it.items() if k not in ("label",)} grouped.setdefault(group, {})[key] = tier_data else: structured.append(it) # 把 grouped 转换为 dict 形式,保留 tiers 字段 for g, mapping in grouped.items(): structured.append({"label": g, "tiers": mapping}) items = structured # 去重并返回 # 如果没有解析到,尝试基于 class 名进行备选解析(应对字符编码或单位被伪元素渲染的情况) if not items: try: price_nodes = [] # 找到 class 名中带 price 的元素作为价格值 for el in soup.find_all(class_=re.compile(r"price", re.I)): text = el.get_text(" ", strip=True) # 跳过非数字文本 if not re.search(r"[0-9]+(\.[0-9]+)?", text): continue price_nodes.append((el, text)) seen = set() for el, text in price_nodes: if text in seen: continue seen.add(text) # 尝试寻找单位元素 unit_el = el.find_next(class_=re.compile(r"unit", re.I)) unit_text = unit_el.get_text(" ", strip=True) if unit_el else None # 尝试寻找标签:向上找包含 label 或 pricingLabel 的兄弟/父节点 label = None p = el for _ in range(4): # 检查同级的 label 类 sib_label = None parent = p.parent if parent: sib_label = parent.find(class_=re.compile(r"label", re.I)) if sib_label and sib_label.get_text(strip=True): label = sib_label.get_text(" ", strip=True) break if parent is None: break p = parent if not label: # 尝试取前一个文本节点 prev = el.previous_sibling steps = 0 while prev and steps < 6: candidate = None if isinstance(prev, str) and prev.strip(): candidate = prev.strip() else: try: candidate = prev.get_text(" ", strip=True) except Exception: candidate = None if candidate and not re.search(r"[0-9]", candidate): label = candidate break prev = prev.previous_sibling steps += 1 entry = {"label": label or "price", "raw": text, "currency": "CNY"} try: entry["price"] = float(re.search(r"([0-9]+(?:\.[0-9]+)?)", text).group(1)) except Exception: entry["price"] = text if unit_text: entry["unit"] = unit_text items.append(entry) except Exception: pass if items: items = _postprocess_items(items) return items def extract_price_items_global(html: str) -> List[Dict]: """在整个 HTML 中全局搜索价格字符串,尝试提取周围上下文作为 label。 这是最后的回退解析,适用于页面结构复杂或文本没包含"模型价格"定位词时。 """ try: soup = BeautifulSoup(html, "lxml") except FeatureNotFound: soup = BeautifulSoup(html, "html.parser") # 全局回退:仍然优先查找靠近"模型价格"标题的文本 node = soup.find(string=re.compile(r"模型价格")) if not node: # 若页面没有"模型价格"关键词,则不尝试全页解析(避免抓到工具调用价格) return [] ancestor = node.parent for _ in range(6): txt = ancestor.get_text(separator="\n") if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower(): return parse_prices_from_text(txt) if ancestor.parent: ancestor = ancestor.parent else: break return parse_prices_from_text(ancestor.get_text(separator="\n")) def scrape_model_price(url: str, headless: bool = True, timeout: int = 20000, executable_path: Optional[str] = None) -> Dict: """用 Playwright 打开页面,等待渲染,然后提取价格信息。""" result = {"url": url, "error": None, "items": []} with sync_playwright() as p: launch_kwargs = {"headless": headless} if executable_path: launch_kwargs["executable_path"] = executable_path browser = p.chromium.launch(**launch_kwargs) context = browser.new_context() page = context.new_page() # 调试收集:网络响应、控制台消息 network_hits = [] console_logs = [] def _on_console(msg): try: console_logs.append({"type": msg.type, "text": msg.text}) except Exception: pass def _on_response(resp): try: url_r = resp.url ct = resp.headers.get("content-type", "") # 只尝试读取文本/JSON 响应,避免二进制 if "application/json" in ct or ct.startswith("text") or "json" in url_r.lower() or "price" in url_r.lower(): try: body = resp.text() except Exception: body = None snippet = None if body: if "元" in body or "price" in body.lower() or "tokens" in body.lower() or "price" in url_r.lower(): snippet = body[:2000] if snippet: network_hits.append({"url": url_r, "content_type": ct, "snippet": snippet}) except Exception: pass page.on("console", _on_console) page.on("response", _on_response) try: page.goto(url, wait_until="networkidle", timeout=timeout) except PlaywrightTimeoutError: # 有时 networkidle 很难触发,尝试使用 load 再等待一小会儿 try: page.goto(url, wait_until="load", timeout=timeout) except Exception as e: result["error"] = f"导航失败: {e}" browser.close() return result # 等待页面内文本"模型价格"出现(最多等待 8 秒) try: page.wait_for_selector("text=模型价格", timeout=8000) except PlaywrightTimeoutError: # 继续也许页面没有精准文本,但尝试抓取页面内容 pass # 小等待,确保异步渲染完成 time.sleep(1.2) html = page.content() # 优先尝试结构化解析 HTML 表格/行,再回退到纯文本解析 items = [] try: items = extract_price_items_from_html(html) except Exception: items = [] # 尝试展开阶梯计费下拉选项,逐档抓取 tiered_items: List[Dict] = [] try: _ensure_tiered_pricing(page) tier_options = _get_tier_options(page) print(f"[DEBUG] 总共找到 {len(tier_options)} 个档位") for opt in tier_options: if not _select_tier_option(page, opt): continue html = page.content() try: tier_items = extract_price_items_from_html(html) print(f"[DEBUG] 档位 {opt} 解析出 {len(tier_items)} 条价格") except Exception as e: print(f"[DEBUG] 档位 {opt} 解析失败: {e}") tier_items = [] for it in tier_items: it["tier"] = opt tiered_items.extend(tier_items) except Exception as e: print(f"[DEBUG] 阶梯计费抓取异常: {e}") tiered_items = [] print(f"[DEBUG] 总共收集 {len(tiered_items)} 条有档位标记的价格") if tiered_items: print("[DEBUG] 使用阶梯计费结果,替换普通结果") items = tiered_items # 如果没有解析到,尝试等待更通用的价格文本(如"xx 元"),并滚动触发懒加载后重试 if not items: try: # 如果页面上没有明确的"模型价格"标题,等待任意包含"元"的价格文本 page.wait_for_selector("text=/[0-9]+(\\.[0-9]+)?\\s*元/", timeout=8000) except PlaywrightTimeoutError: pass # 再次尝试解析(可能因为懒加载需要滚动) try: # 尝试滚动到底部并等待渲染 page.evaluate("window.scrollTo(0, document.body.scrollHeight)") time.sleep(1.0) html = page.content() items = extract_price_items_from_html(html) except Exception: items = [] # 最后回退到全文本解析 if not items: text_block = extract_price_block_html(html) if not text_block: result["error"] = "未找到包含 '模型价格' 的区域,可能需要登录或页面结构不同。" browser.close() return result items = parse_prices_from_text(text_block) # 把解析到的 items 转换为仅包含模型价格的简洁结构 def _build_price_map(parsed_items: List[Dict]) -> Dict: price_map: Dict = {} # 处理两种情况:1) items 中含有 'tiers'(已按 input/output 分组) # 2) 每条 item 带有 'tier' 字段(来自逐档抓取)或普通标签 for it in parsed_items: # 如果已经是按分组结构(label=input/output, tiers=dict) if isinstance(it, dict) and it.get("tiers") and isinstance(it.get("tiers"), dict): for tier_key, tier_val in it["tiers"].items(): k = _normalize_tier_option(tier_key) # 确保内部为 dict,允许同一档位下包含多个子条目 price_map.setdefault(k, {}) # 把 tier_val 放入档位下,以其原始 label 或 raw 作为子键 sub_label = tier_val.get("label") or tier_val.get("raw") or k price_map[k][sub_label] = {k2: v for k2, v in tier_val.items() if k2 not in ("tier", "tiers", "label")} continue # 如果条目本身带有 tier 字段 if it.get("tier"): tk = _normalize_tier_option(it.get("tier")) price_map.setdefault(tk, {}) sub_label = it.get("label") or it.get("raw") or tk price_map[tk][sub_label] = {k: v for k, v in it.items() if k not in ("tier", "label")} continue # 普通非阶梯条目:直接以 label 为键 lbl = it.get("label") or it.get("raw") or "price" # 若 label 已存在且是 dict(多条非阶梯同名),则合并为列表形式 if lbl in price_map and not isinstance(price_map[lbl], list): price_map[lbl] = [price_map[lbl]] if isinstance(price_map.get(lbl), list): price_map[lbl].append({k: v for k, v in it.items() if k != "label"}) else: price_map[lbl] = {k: v for k, v in it.items() if k != "label"} return price_map price_map = _build_price_map(items) result = {"url": url, "error": result.get("error"), "prices": price_map} browser.close() return result def main(): ap = argparse.ArgumentParser(description="爬取阿里云模型市场页面的模型价格(基于 Playwright)") group = ap.add_mutually_exclusive_group(required=True) group.add_argument("--url", help="单个模型页面 URL") group.add_argument("--file", help="包含多个 URL(每行一个)的文件路径") ap.add_argument("--headful", action="store_true", help="以有头模式打开浏览器(方便调试)") ap.add_argument("--timeout", type=int, default=20000, help="导航超时(毫秒),默认20000") ap.add_argument("--browser-path", help="浏览器可执行文件完整路径(覆盖环境变量 PLAYWRIGHT_EXECUTABLE)") args = ap.parse_args() urls: List[str] = [] if args.url: urls = [args.url] else: with open(args.file, "r", encoding="utf-8") as f: urls = [ln.strip() for ln in f if ln.strip()] results = [] # 优先使用命令行传入的浏览器可执行路径,其次检查环境变量 PLAYWRIGHT_EXECUTABLE exec_path = None if args.browser_path: exec_path = args.browser_path else: exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE") # 环境变量 PLAYWRIGHT_HEADLESS=false 可强制有头模式 headless = not args.headful if os.environ.get("PLAYWRIGHT_HEADLESS", "").lower() == "false": headless = False for u in urls: print(f"抓取: {u}") res = scrape_model_price(u, headless=headless, timeout=args.timeout, executable_path=exec_path) results.append(res) print(json.dumps(results, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()