#!/usr/bin/env python3 """ scrape_tool_prices.py 抓取阿里云百炼模型页面的工具调用价格: - 搜索策略、代码解释器、文生图等工具的调用费用 - 单位通常为 元/千次调用 原理:复用 scrape_aliyun_models.py 的页面渲染逻辑, 但专门提取工具调用相关价格行(原脚本会过滤掉这些)。 """ import re import time import json from typing import Dict, List, Optional from playwright.sync_api import TimeoutError as PlaywrightTimeoutError # 工具调用价格识别规则 TOOL_CALL_RE = re.compile( r"搜索策略|代码解释|文生图|数据增强|模型推理|工具调用|千次调用|/千次|次调用", re.I, ) # 单位识别 TOOL_UNIT_RE = re.compile(r"千次调用|/千次|次调用", re.I) def _is_tool_call_item(label: str, raw: str) -> bool: return bool(TOOL_CALL_RE.search(label) or TOOL_CALL_RE.search(raw)) def parse_tool_prices_from_text(text: str) -> List[Dict]: """ 从"工具调用价格"区块文本中提取工具调用价格条目。 文本是一整行,格式: 工具名Completions API价格信息工具名Responses API价格信息... """ items: List[Dict] = [] seen: set = set() price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)") free_re = re.compile(r"限时免费|免费") # 用 API 类型作为分隔符,切成 [工具名+价格, API类型, 工具名+价格, API类型, ...] api_sep_re = re.compile(r"(Completions API|Responses API)") parts = api_sep_re.split(text) # parts 结构: ["工具调用价格tool1", "Completions API", "价格1tool2", "Responses API", "价格2tool3", ...] # 每个条目 = parts[n](工具名在末尾) + parts[n+1](API类型,丢弃) + parts[n+2](价格在开头) # 工具名在前一段的末尾,价格在后一段的开头 tool_re = re.compile(r"([a-zA-Z][a-zA-Z0-9_:]*)$") # 段末尾的工具名 for i in range(0, len(parts) - 1, 2): before = parts[i] # 包含工具名(在末尾) # parts[i+1] 是 API 类型,跳过 after = parts[i + 2] if i + 2 < len(parts) else "" # 包含价格(在开头) # 从 before 末尾提取工具名 m = tool_re.search(before) if not m: continue label = m.group(1) if label in seen: continue # 从 after 开头提取价格信息(到下一个工具名开始前) next_tool_m = tool_re.search(after) price_info = after[: next_tool_m.start()].strip() if next_tool_m else after.strip() entry: Dict = {"label": label, "currency": "CNY", "unit": "元/千次调用"} if free_re.search(price_info): entry["price"] = 0 entry["note"] = "限时免费" else: nums = price_re.findall(price_info) if not nums: continue try: entry["price"] = float(nums[0]) except Exception: entry["price"] = nums[0] if re.search(r"限时优惠", price_info): entry["note"] = "限时优惠" dm = re.search(r"([0-9.]+)\s*折", price_info) if dm: entry["note"] = f"限时{dm.group(1)}折" seen.add(label) items.append(entry) return items price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)") free_re = re.compile(r"限时免费|免费") for m in pattern.finditer(text): label = m.group(1).strip() price_info = m.group(2).strip() if not label or label in seen: continue entry: Dict = {"label": label, "currency": "CNY", "unit": "元/千次调用"} if free_re.search(price_info): entry["price"] = 0 entry["note"] = "限时免费" else: nums = price_re.findall(price_info) if not nums: continue try: entry["price"] = float(nums[0]) except Exception: entry["price"] = nums[0] if re.search(r"限时优惠", price_info): entry["note"] = "限时优惠" dm = re.search(r"([0-9.]+)\s*折", price_info) if dm: entry["note"] = f"限时{dm.group(1)}折" seen.add(label) items.append(entry) return items # 用 API 类型标注作为分隔符切割整段文本 api_sep_re = re.compile(r"(Completions API|Responses API)") price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元") free_re = re.compile(r"限时免费|免费") # 先去掉标题 text = re.sub(r"^工具调用价格", "", text.strip()) # 按 API 类型切割:得到 [工具名, API类型, 价格信息, 工具名, API类型, 价格信息, ...] parts = api_sep_re.split(text) # parts 结构:[工具名1, "Completions API", 价格1+工具名2, "Responses API", 价格2+工具名3, ...] items: List[Dict] = [] seen: set = set() i = 0 while i < len(parts): segment = parts[i].strip() # 跳过 API 类型标注本身 if api_sep_re.fullmatch(segment): i += 1 continue # 这段包含:上一条目的价格信息 + 下一条目的工具名 # 需要从末尾提取工具名(工具名是纯英文+冒号/下划线,不含中文和数字价格) # 工具名模式:由字母、数字、下划线、冒号组成 tool_name_re = re.compile(r"([a-zA-Z][a-zA-Z0-9_:]*(?:\.[a-zA-Z0-9_:]+)*)$") # 先提取末尾的工具名(留给下一轮用) next_tool = "" m = tool_name_re.search(segment) if m: next_tool = m.group(1) price_part = segment[: m.start()].strip() else: price_part = segment # 如果有上一个工具名等待配对价格 if i > 0: # 找上一个工具名 prev_tool = "" prev_seg = parts[i - 2].strip() if i >= 2 else "" tm = tool_name_re.search(prev_seg) if tm: prev_tool = tm.group(1) elif i == 1: # 第一段就是工具名 prev_tool = parts[0].strip() if prev_tool and prev_tool not in seen: entry: Dict = {"label": prev_tool, "currency": "CNY", "unit": "元/千次调用"} if free_re.search(price_part) and not price_re.search(price_part): entry["price"] = 0 entry["note"] = "限时免费" else: nums = price_re.findall(price_part) if nums: try: entry["price"] = float(nums[0]) except Exception: entry["price"] = nums[0] if re.search(r"限时优惠", price_part): entry["note"] = "限时优惠" dm = re.search(r"([0-9.]+)\s*折", price_part) if dm: entry["note"] = f"限时{dm.group(1)}折" else: i += 1 continue seen.add(prev_tool) items.append(entry) i += 1 # 处理最后一个工具(最后一段没有后续 API 标注) if parts: last_seg = parts[-1].strip() # 如果最后一段不是 API 类型,且含价格或免费信息 if not api_sep_re.fullmatch(last_seg): # 找最后一个工具名(倒数第二个 API 标注之后的工具名) # 已在循环中处理,这里处理最后一段的价格+工具名情况 tool_name_re = re.compile(r"([a-zA-Z][a-zA-Z0-9_:]*(?:\.[a-zA-Z0-9_:]+)*)$") m = tool_name_re.search(last_seg) if m: last_tool = m.group(1) last_price_part = last_seg[: m.start()].strip() if last_tool not in seen and (free_re.search(last_price_part) or price_re.search(last_price_part)): entry = {"label": last_tool, "currency": "CNY", "unit": "元/千次调用"} if free_re.search(last_price_part) and not price_re.search(last_price_part): entry["price"] = 0 entry["note"] = "限时免费" else: nums = price_re.findall(last_price_part) if nums: try: entry["price"] = float(nums[0]) except Exception: entry["price"] = nums[0] if re.search(r"限时优惠", last_price_part): entry["note"] = "限时优惠" seen.add(last_tool) items.append(entry) return items def _get_tool_price_section_text(html: str) -> str: """ 专门定位"工具调用价格"区块文本,排除 script/style。 工具调用价格是独立区块,标题为"工具调用价格",不在"模型价格"区块内。 """ try: from bs4 import BeautifulSoup, FeatureNotFound try: soup = BeautifulSoup(html, "lxml") except FeatureNotFound: soup = BeautifulSoup(html, "html.parser") # 优先找"工具调用价格"标题节点 target_node = None for node in soup.find_all(string=re.compile(r"工具调用价格")): if node.parent and node.parent.name in ("script", "style"): continue target_node = node break if not target_node: return "" # 向上找包含价格数字的容器 ancestor = target_node.parent for _ in range(10): txt = ancestor.get_text(separator="\n") if ("元" in txt or "免费" in txt) and len(txt) > 50: return txt if ancestor.parent: ancestor = ancestor.parent else: break return ancestor.get_text(separator="\n") except Exception: return "" def scrape_tool_prices_standalone( url: str, headless: bool = True, timeout: int = 20000, executable_path: Optional[str] = None, ) -> Dict: """ 独立运行:启动浏览器,导航,抓取工具调用价格后关闭。 返回: { "url": str, "error": str | None, "tool_call_prices": [ {"label": "搜索策略", "price": 0.5, "unit": "元/千次调用", "currency": "CNY"}, ... ] } """ from playwright.sync_api import sync_playwright result: Dict = {"url": url, "error": None, "tool_call_prices": []} with sync_playwright() as p: launch_kwargs: Dict = {"headless": headless} if executable_path: launch_kwargs["executable_path"] = executable_path browser = p.chromium.launch(**launch_kwargs) page = browser.new_context().new_page() try: page.goto(url, wait_until="networkidle", timeout=timeout) except PlaywrightTimeoutError: try: page.goto(url, wait_until="load", timeout=timeout) except Exception as e: result["error"] = f"导航失败: {e}" browser.close() return result try: page.wait_for_selector("text=模型价格", timeout=8000) except PlaywrightTimeoutError: pass time.sleep(1.2) html = page.content() price_text = _get_tool_price_section_text(html) if not price_text: # 尝试滚动后重试 try: page.evaluate("window.scrollTo(0, document.body.scrollHeight)") time.sleep(1.5) html = page.content() price_text = _get_tool_price_section_text(html) except Exception: pass if not price_text: result["error"] = "未找到工具调用价格区域" browser.close() return result print(f"[DEBUG] 工具调用价格区域文本:\n{price_text[:300]}") result["tool_call_prices"] = parse_tool_prices_from_text(price_text) browser.close() return result if __name__ == "__main__": import argparse, os ap = argparse.ArgumentParser(description="抓取阿里云模型工具调用价格") group = ap.add_mutually_exclusive_group(required=True) group.add_argument("--url") group.add_argument("--file") ap.add_argument("--headful", action="store_true") ap.add_argument("--timeout", type=int, default=20000) ap.add_argument("--browser-path") args = ap.parse_args() urls = [args.url] if args.url else open(args.file, encoding="utf-8").read().splitlines() urls = [u.strip() for u in urls if u.strip()] exec_path = args.browser_path or os.environ.get("PLAYWRIGHT_EXECUTABLE") headless = not args.headful results = [] for u in urls: print(f"抓取工具调用价格: {u}", flush=True) results.append(scrape_tool_prices_standalone(u, headless=headless, timeout=args.timeout, executable_path=exec_path)) print(json.dumps(results, ensure_ascii=False, indent=2))