| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816 |
- #!/usr/bin/env python3
- """
- Aliyun Model Price Scraper - Final Improved Version
- - 使用 Playwright 渲染页面并抓取"模型价格"区域内的价格信息
- - 支持单个模型页面 URL,或从文件读取多个 URL
- 改进要点:
- 1. 能够生成阶梯计费结构:{input: {tier1: {...}, tier2: {...}}, output: {...}}
- 2. 优惠标记正确处理:label只保留基础部分,优惠信息放入note字段
- 3. 强化过滤:完全排除工具调用价格(包括"千次调用"单位)
- 依赖:
- pip install playwright beautifulsoup4 lxml
- python -m playwright install
- 用法示例:
- python scrape_aliyun_models.py --url "https://bailian.console.aliyun.com/.../qwen3-max"
- python scrape_aliyun_models.py --file urls.txt
- 输出: JSON 到 stdout
- """
- import argparse
- import json
- import re
- import time
- import os
- from typing import List, Dict, Optional
- from bs4 import BeautifulSoup, FeatureNotFound
- from bs4.element import Tag
- from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
- TOOL_CALL_RE = re.compile(
- r"调用|工具|接口|api调用|api|次调用|千次调用|/千次|每千次|搜索策略|代码解释|文生图|数据增强|模型推理",
- re.I,
- )
- def _is_tool_call_item(label: str, raw: str, unit: str) -> bool:
- label_l = label.lower()
- raw_l = raw.lower()
- unit_l = unit.lower()
- if TOOL_CALL_RE.search(label_l) or TOOL_CALL_RE.search(raw_l) or TOOL_CALL_RE.search(unit_l):
- return True
- if "千次" in unit_l or "/千" in unit_l or "次调用" in unit_l:
- return True
- return False
- def _find_nearest_tier_label(lines: List[str], idx: int) -> Optional[str]:
- tier_re = re.compile(r"(输入|输出).*(<=|>=|<|>|\b\d+\s*k|\d+\s*万|\d+\s*千|\d+\s*tokens?)", re.I)
- for step in range(1, 6):
- for pos in (idx - step, idx + step):
- if pos < 0 or pos >= len(lines):
- continue
- candidate = lines[pos]
- if not candidate or re.search(r"([0-9]+(?:\.[0-9]+)?)\s*元", candidate, re.I):
- continue
- if tier_re.search(candidate):
- return candidate.strip()
- return None
- def _open_tier_dropdown(page) -> bool:
- try:
- try:
- selector = page.locator(".efm_ant-select-selector, .ant-select-selector").filter(has_text=re.compile(r"输入.*\d+\s*[kK]"))
- if selector.count() > 0:
- selector.first.click(timeout=3000)
- time.sleep(0.5)
- return True
- except Exception as e:
- pass
- ok = page.evaluate(
- """
- () => {
- const isVisible = (el) => {
- if (!el) return false;
- const rect = el.getBoundingClientRect();
- const style = window.getComputedStyle(el);
- return rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden';
- };
- const norm = (s) => (s || '').replace(/\s+/g, ' ').trim();
- const tierRe = /输入.*\d+\s*[kK]/i;
- let clickEl = null;
- const selectors = Array.from(document.querySelectorAll(
- ".efm_ant-select-selector, .ant-select-selector"
- ));
- for (const el of selectors) {
- const txt = norm(el.innerText || el.textContent);
- if (tierRe.test(txt) && isVisible(el)) {
- clickEl = el;
- break;
- }
- }
- if (!clickEl) {
- const containers = Array.from(document.querySelectorAll(
- ".efm_ant-select, .ant-select"
- ));
- for (const el of containers) {
- const txt = norm(el.innerText || el.textContent);
- if (tierRe.test(txt) && isVisible(el)) {
- clickEl = el.querySelector(".efm_ant-select-selector, .ant-select-selector") || el;
- break;
- }
- }
- }
- if (!isVisible(clickEl)) return false;
- clickEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true }));
- clickEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true }));
- clickEl.click();
- return true;
- }
- """
- )
- time.sleep(0.5)
- return bool(ok)
- except Exception:
- return False
- def _normalize_tier_option(opt: str) -> str:
- if not opt:
- return "unknown"
- s = opt.replace('\u00a0', ' ')
- m = re.search(r"(\d+\s*k\s*<\s*输入\s*<=\s*\d+\s*k)", s, re.I)
- if not m:
- m = re.search(r"(输入\s*<=\s*\d+\s*k)", s, re.I)
- if not m:
- m = re.search(r"(\d+\s*k\s*<\s*输入)", s, re.I)
- if m:
- key = m.group(1)
- key = re.sub(r"\s+", "", key)
- key = key.replace("输入", "input").replace("输出", "output")
- return key
- if "输入" in s or "输出" in s:
- nums = re.findall(r"\d+\s*k", s, re.I)
- if nums:
- joined = "-".join([n.replace(' ', '') for n in nums])
- if "输入" in s:
- return f"input_{joined}"
- return f"output_{joined}"
- short = re.sub(r"\s+", " ", s).strip()
- return short[:60]
- def _get_tier_options(page) -> List[str]:
- if not _open_tier_dropdown(page):
- return []
- try:
- page.wait_for_selector(
- ".efm_ant-select-dropdown, .ant-select-dropdown",
- state="visible", timeout=3000
- )
- except Exception:
- pass
- options = []
- try:
- options = page.evaluate(
- """
- () => {
- const isVisible = (el) => {
- const r = el.getBoundingClientRect();
- const s = window.getComputedStyle(el);
- return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden';
- };
- const dropdown = Array.from(document.querySelectorAll(
- '.efm_ant-select-dropdown, .ant-select-dropdown'
- )).find(el => isVisible(el));
- if (!dropdown) return [];
- const leaves = Array.from(dropdown.querySelectorAll('*'))
- .filter(el => isVisible(el) && el.children.length === 0);
- const texts = leaves
- .map(el => (el.innerText || el.textContent || '').replace(/\\s+/g, ' ').trim())
- .filter(t => t.length > 0 && t.length < 60);
- return Array.from(new Set(texts));
- }
- """
- )
- options = [t for t in options if re.search(r"输入", t) and re.search(r"\d+\s*[kK]", t)]
- except Exception:
- options = []
- if not options:
- try:
- options = page.evaluate(
- """
- () => {
- const isVisible = (el) => {
- const r = el.getBoundingClientRect();
- const s = window.getComputedStyle(el);
- return r.width > 0 && r.height > 0 && s.display !== 'none' && s.visibility !== 'hidden';
- };
- const texts = Array.from(document.querySelectorAll('*'))
- .filter(el => isVisible(el) && el.children.length === 0)
- .map(el => (el.innerText || el.textContent || '').replace(/\\s+/g, ' ').trim())
- .filter(t => t.length < 60 && /输入/.test(t) && /\\d+\\s*[kK]/.test(t) && /<=|</.test(t));
- return Array.from(new Set(texts));
- }
- """
- )
- except Exception:
- options = []
- try:
- page.keyboard.press("Escape")
- except Exception:
- pass
- return list(dict.fromkeys(options))
- def _select_tier_option(page, option_text: str) -> bool:
- if not _open_tier_dropdown(page):
- return False
- try:
- page.wait_for_selector(
- ".efm_ant-select-dropdown, .ant-select-dropdown",
- state="visible", timeout=2000,
- )
- except Exception:
- return False
- try:
- try:
- option_loc = page.get_by_text(option_text, exact=True).first
- option_loc.click(timeout=3000, force=False)
- time.sleep(0.6)
- return True
- except Exception:
- pass
- clicked = page.evaluate(
- """
- (opt) => {
- const isVisible = (el) => {
- if (!el) return false;
- const rect = el.getBoundingClientRect();
- const style = window.getComputedStyle(el);
- return rect.width > 0 && rect.height > 0 && style.display !== 'none' && style.visibility !== 'hidden';
- };
- const norm = (s) => (s || '').replace(/\s+/g, ' ').trim();
- const nodes = Array.from(document.querySelectorAll(
- ".efm_ant-select-item-option-content, [role='option'], .efm_ant-select-item, .ant-select-item"
- ));
- const target = nodes.find((n) => norm(n.textContent) === opt && isVisible(n));
- if (!target) return false;
- const clickEl = target.closest(".efm_ant-select-item, [role='option']") || target;
- clickEl.dispatchEvent(new MouseEvent('mousedown', { bubbles: true }));
- clickEl.dispatchEvent(new MouseEvent('mouseup', { bubbles: true }));
- clickEl.click();
- return true;
- }
- """,
- option_text,
- )
- if clicked:
- time.sleep(0.6)
- return True
- return False
- except Exception:
- return False
- def _ensure_tiered_pricing(page) -> None:
- try:
- toggle = page.locator("text=阶梯计费").first
- if toggle.count() > 0:
- toggle.click()
- time.sleep(0.3)
- except Exception:
- pass
- def parse_prices_from_text(text: str) -> List[Dict]:
- lines = [ln.strip() for ln in text.splitlines()]
- lines = [ln for ln in lines if ln]
- items = []
- price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元", re.I)
- for idx, ln in enumerate(lines):
- matches = price_re.findall(ln)
- if not matches:
- continue
- label = None
- first_m = price_re.search(ln)
- if first_m:
- before = ln[: first_m.start()].strip()
- if before:
- label = before
- if not label:
- for j in range(idx - 1, -1, -1):
- if lines[j] and not price_re.search(lines[j]):
- label = lines[j]
- break
- if not label:
- label = f"price_{len(items) + 1}"
- if label == "原价":
- if items and matches:
- try:
- items[-1]["price_original"] = float(matches[0])
- except Exception:
- items[-1]["price_original"] = matches[0]
- items[-1].setdefault("note", "")
- if items[-1]["note"]:
- items[-1]["note"] += "; 原价显示"
- else:
- items[-1]["note"] = "原价显示"
- continue
- raw = ln
- if re.fullmatch(r"输入|输出", label.strip()):
- tier_label = _find_nearest_tier_label(lines, idx)
- if tier_label:
- label = tier_label
- entry: Dict = {"label": label.strip(), "raw": raw}
- try:
- nums = [float(x) for x in matches]
- if len(nums) == 1:
- entry["price"] = nums[0]
- else:
- fnums = sorted(nums)
- entry["price_current"] = fnums[0]
- entry["price_original"] = fnums[-1]
- except Exception:
- try:
- entry["price"] = float(matches[0])
- except Exception:
- entry["price"] = matches[0]
- unit = None
- if re.search(r"每千|每 1k|/千|/每千|tokens", raw, re.I):
- unit = "元/每千tokens"
- unit_m = re.search(r"元\s*/?\s*每[^\n,,;]*", raw)
- if unit_m:
- unit = unit_m.group(0)
- if unit:
- entry["unit"] = unit
- note = []
- if re.search(r"限时|折", raw):
- note.append("限时优惠")
- if re.search(r"原价", raw):
- note.append("原价显示")
- if note:
- entry["note"] = "; ".join(note)
- entry["currency"] = "CNY"
- items.append(entry)
- return items
- def extract_price_block_html(html: str) -> str:
- try:
- soup = BeautifulSoup(html, "lxml")
- except FeatureNotFound:
- soup = BeautifulSoup(html, "html.parser")
- # 跳过 script/style 标签内的文本节点
- node = None
- for n in soup.find_all(string=re.compile(r"模型价格")):
- if n.parent and n.parent.name in ("script", "style"):
- continue
- node = n
- break
- if not node:
- return soup.get_text(separator="\n")
- ancestor = node.parent
- for _ in range(6):
- txt = ancestor.get_text(separator="\n")
- if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower():
- return txt
- if ancestor.parent:
- ancestor = ancestor.parent
- else:
- break
- return ancestor.get_text(separator="\n")
- def extract_price_items_from_html(html: str) -> List[Dict]:
- try:
- soup = BeautifulSoup(html, "lxml")
- except FeatureNotFound:
- soup = BeautifulSoup(html, "html.parser")
- node = None
- for n in soup.find_all(string=re.compile(r"模型价格")):
- if n.parent and n.parent.name in ("script", "style"):
- continue
- node = n
- break
- if not node:
- return []
- ancestor = node.parent
- container = ancestor
- for _ in range(6):
- txt = ancestor.get_text(separator="\n")
- if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower():
- container = ancestor
- break
- if ancestor.parent:
- ancestor = ancestor.parent
- else:
- container = ancestor
- break
- price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元", re.I)
- items: List[Dict] = []
- container_text = container.get_text(separator="\n")
- items = parse_prices_from_text(container_text)
- def _postprocess_items(raw_items: List[Dict]) -> List[Dict]:
- filtered: List[Dict] = []
- for it in raw_items:
- raw = it.get("raw", "")
- label = it.get("label", "")
- unit = it.get("unit", "")
- if _is_tool_call_item(label, raw, unit):
- continue
- if "原价" in label and filtered:
- if "price" in it:
- filtered[-1]["price_original"] = it["price"]
- elif "price_current" in it and "price_original" in it:
- filtered[-1]["price_original"] = it["price_original"]
- filtered[-1].setdefault("note", "")
- if filtered[-1]["note"]:
- filtered[-1]["note"] += "; 原价显示"
- else:
- filtered[-1]["note"] = "原价显示"
- continue
- notes = []
- discount_match = re.search(r"(限时)?([0-9.]+)\s*折", raw)
- if discount_match:
- discount = discount_match.group(2)
- notes.append(f"限时{discount}折")
- else:
- if re.search(r"限时|免费", raw) or re.search(r"限时|免费", label):
- if re.search(r"免费", raw):
- notes.append("限时免费")
- else:
- notes.append("限时优惠")
- if re.search(r"原价", raw):
- notes.append("原价显示")
- if notes:
- it["note"] = "; ".join(notes)
- if "unit" not in it:
- if re.search(r"每千|tokens|/千|/每千", raw, re.I):
- it["unit"] = "元/每千tokens"
- else:
- um = re.search(r"元\s*/?\s*每[^\n,,;]*", raw)
- if um:
- it["unit"] = um.group(0)
- cleaned_label = re.sub(r"限时[0-9.]*折|限时|免费|原价|\s*元.*", "", label).strip()
- cleaned_label = re.sub(r"\s+", " ", cleaned_label).strip()
- if not cleaned_label:
- cleaned_label = "price"
- it["label"] = cleaned_label
- it["currency"] = "CNY"
- filtered.append(it)
- return filtered
- filtered = _postprocess_items(items)
- structured: List[Dict] = []
- grouped: Dict[str, Dict[str, Dict]] = {}
- for it in filtered:
- lbl = it.get("label", "")
- raw = it.get("raw", "")
- combined = lbl + " " + raw
- should_group = False
- group = None
- if re.search(r"输入", lbl):
- should_group = True
- group = "input"
- elif re.search(r"输出", lbl):
- should_group = True
- group = "output"
- if "tier" in it:
- tier_raw = it.get("tier") or ""
- tier_key = _normalize_tier_option(tier_raw)
- if not group:
- if "input" in tier_key.lower():
- group = "input"
- elif "output" in tier_key.lower():
- group = "output"
- else:
- group = "input"
- tier_data = {k: v for k, v in it.items() if k not in ("label", "tier")}
- grouped.setdefault(group, {})[tier_key] = tier_data
- elif should_group and group:
- key = lbl
- if group == "input":
- key = re.sub(r"^输入", "input", key)
- elif group == "output":
- key = re.sub(r"^输出", "output", key)
- tier_data = {k: v for k, v in it.items() if k not in ("label",)}
- grouped.setdefault(group, {})[key] = tier_data
- else:
- structured.append(it)
- for g, mapping in grouped.items():
- structured.append({"label": g, "tiers": mapping})
- items = structured
- if not items:
- try:
- price_nodes = []
- for el in soup.find_all(class_=re.compile(r"price", re.I)):
- text = el.get_text(" ", strip=True)
- if not re.search(r"[0-9]+(\.[0-9]+)?", text):
- continue
- price_nodes.append((el, text))
- seen = set()
- for el, text in price_nodes:
- if text in seen:
- continue
- seen.add(text)
- unit_el = el.find_next(class_=re.compile(r"unit", re.I))
- unit_text = unit_el.get_text(" ", strip=True) if unit_el else None
- label = None
- p = el
- for _ in range(4):
- sib_label = None
- parent = p.parent
- if parent:
- sib_label = parent.find(class_=re.compile(r"label", re.I))
- if sib_label and sib_label.get_text(strip=True):
- label = sib_label.get_text(" ", strip=True)
- break
- if parent is None:
- break
- p = parent
- if not label:
- prev = el.previous_sibling
- steps = 0
- while prev and steps < 6:
- candidate = None
- if isinstance(prev, str) and prev.strip():
- candidate = prev.strip()
- else:
- try:
- candidate = prev.get_text(" ", strip=True)
- except Exception:
- candidate = None
- if candidate and not re.search(r"[0-9]", candidate):
- label = candidate
- break
- prev = prev.previous_sibling
- steps += 1
- entry = {"label": label or "price", "raw": text, "currency": "CNY"}
- try:
- entry["price"] = float(re.search(r"([0-9]+(?:\.[0-9]+)?)", text).group(1))
- except Exception:
- entry["price"] = text
- if unit_text:
- entry["unit"] = unit_text
- items.append(entry)
- except Exception:
- pass
- if items:
- items = _postprocess_items(items)
- return items
- def extract_price_items_global(html: str) -> List[Dict]:
- try:
- soup = BeautifulSoup(html, "lxml")
- except FeatureNotFound:
- soup = BeautifulSoup(html, "html.parser")
- node = None
- for n in soup.find_all(string=re.compile(r"模型价格")):
- if n.parent and n.parent.name in ("script", "style"):
- continue
- node = n
- break
- if not node:
- return []
- ancestor = node.parent
- for _ in range(6):
- txt = ancestor.get_text(separator="\n")
- if "元" in txt or re.search(r"\d", txt) or "tokens" in txt.lower():
- return parse_prices_from_text(txt)
- if ancestor.parent:
- ancestor = ancestor.parent
- else:
- break
- return parse_prices_from_text(ancestor.get_text(separator="\n"))
- def scrape_model_price(url: str, headless: bool = True, timeout: int = 20000, executable_path: Optional[str] = None) -> Dict:
- result = {"url": url, "error": None, "items": []}
- with sync_playwright() as p:
- launch_kwargs = {"headless": headless}
- if executable_path:
- launch_kwargs["executable_path"] = executable_path
- extra_args_env = os.environ.get("PLAYWRIGHT_EXTRA_ARGS", "")
- extra_args = [a.strip() for a in extra_args_env.split(",") if a.strip()]
- if extra_args:
- launch_kwargs["args"] = extra_args
- browser = p.chromium.launch(**launch_kwargs)
- context = browser.new_context()
- page = context.new_page()
- network_hits = []
- console_logs = []
- def _on_console(msg):
- try:
- console_logs.append({"type": msg.type, "text": msg.text})
- except Exception:
- pass
- def _on_response(resp):
- try:
- url_r = resp.url
- ct = resp.headers.get("content-type", "")
- if "application/json" in ct or ct.startswith("text") or "json" in url_r.lower() or "price" in url_r.lower():
- try:
- body = resp.text()
- except Exception:
- body = None
- snippet = None
- if body:
- if "元" in body or "price" in body.lower() or "tokens" in body.lower() or "price" in url_r.lower():
- snippet = body[:2000]
- if snippet:
- network_hits.append({"url": url_r, "content_type": ct, "snippet": snippet})
- except Exception:
- pass
- page.on("console", _on_console)
- page.on("response", _on_response)
- try:
- page.goto(url, wait_until="networkidle", timeout=timeout)
- except PlaywrightTimeoutError:
- try:
- page.goto(url, wait_until="load", timeout=timeout)
- except Exception as e:
- result["error"] = f"导航失败: {e}"
- browser.close()
- return result
- try:
- page.wait_for_selector("text=模型价格", timeout=8000)
- except PlaywrightTimeoutError:
- pass
- time.sleep(1.2)
- html = page.content()
- items = []
- try:
- items = extract_price_items_from_html(html)
- except Exception:
- items = []
- tiered_items: List[Dict] = []
- try:
- _ensure_tiered_pricing(page)
- tier_options = _get_tier_options(page)
- for opt in tier_options:
- if not _select_tier_option(page, opt):
- continue
- html = page.content()
- try:
- tier_items = extract_price_items_from_html(html)
- except Exception:
- tier_items = []
- for it in tier_items:
- it["tier"] = opt
- tiered_items.extend(tier_items)
- except Exception:
- tiered_items = []
- if tiered_items:
- items = tiered_items
- if not items:
- try:
- page.wait_for_selector("text=/[0-9]+(\\.[0-9]+)?\\s*元/", timeout=8000)
- except PlaywrightTimeoutError:
- pass
- try:
- page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- time.sleep(1.0)
- html = page.content()
- items = extract_price_items_from_html(html)
- except Exception:
- items = []
- if not items:
- text_block = extract_price_block_html(html)
- if not text_block:
- result["error"] = "未找到包含 '模型价格' 的区域,可能需要登录或页面结构不同。"
- browser.close()
- return result
- items = parse_prices_from_text(text_block)
- def _build_price_map(parsed_items: List[Dict]) -> Dict:
- price_map: Dict = {}
- for it in parsed_items:
- if isinstance(it, dict) and it.get("tiers") and isinstance(it.get("tiers"), dict):
- for tier_key, tier_val in it["tiers"].items():
- k = _normalize_tier_option(tier_key)
- price_map.setdefault(k, {})
- sub_label = tier_val.get("label") or tier_val.get("raw") or k
- price_map[k][sub_label] = {k2: v for k2, v in tier_val.items() if k2 not in ("tier", "tiers", "label")}
- continue
- if it.get("tier"):
- tk = _normalize_tier_option(it.get("tier"))
- price_map.setdefault(tk, {})
- sub_label = it.get("label") or it.get("raw") or tk
- price_map[tk][sub_label] = {k: v for k, v in it.items() if k not in ("tier", "label")}
- continue
- lbl = it.get("label") or it.get("raw") or "price"
- if lbl in price_map and not isinstance(price_map[lbl], list):
- price_map[lbl] = [price_map[lbl]]
- if isinstance(price_map.get(lbl), list):
- price_map[lbl].append({k: v for k, v in it.items() if k != "label"})
- else:
- price_map[lbl] = {k: v for k, v in it.items() if k != "label"}
- return price_map
- price_map = _build_price_map(items)
- result = {"url": url, "error": result.get("error"), "prices": price_map}
- browser.close()
- return result
- def main():
- ap = argparse.ArgumentParser(description="爬取阿里云模型市场页面的模型价格(基于 Playwright)")
- group = ap.add_mutually_exclusive_group(required=True)
- group.add_argument("--url", help="单个模型页面 URL")
- group.add_argument("--file", help="包含多个 URL(每行一个)的文件路径")
- ap.add_argument("--headful", action="store_true", help="以有头模式打开浏览器(方便调试)")
- ap.add_argument("--timeout", type=int, default=20000, help="导航超时(毫秒),默认20000")
- ap.add_argument("--browser-path", help="浏览器可执行文件完整路径")
- args = ap.parse_args()
- urls: List[str] = []
- if args.url:
- urls = [args.url]
- else:
- with open(args.file, "r", encoding="utf-8") as f:
- urls = [ln.strip() for ln in f if ln.strip()]
- exec_path = None
- if args.browser_path:
- exec_path = args.browser_path
- else:
- exec_path = os.environ.get("PLAYWRIGHT_EXECUTABLE")
- headless = not args.headful
- if os.environ.get("PLAYWRIGHT_HEADLESS", "").lower() == "false":
- headless = False
- results = []
- for u in urls:
- print(f"抓取: {u}")
- res = scrape_model_price(u, headless=headless, timeout=args.timeout, executable_path=exec_path)
- results.append(res)
- print(json.dumps(results, ensure_ascii=False, indent=2))
- if __name__ == "__main__":
- main()
|