| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- #!/usr/bin/env python3
- """
- scrape_tool_prices.py
- 抓取阿里云百炼模型页面的工具调用价格:
- - 搜索策略、代码解释器、文生图等工具的调用费用
- - 单位通常为 元/千次调用
- 原理:复用 scrape_aliyun_models.py 的页面渲染逻辑,
- 但专门提取工具调用相关价格行(原脚本会过滤掉这些)。
- """
- import re
- import time
- import json
- from typing import Dict, List, Optional
- from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
- # 工具调用价格识别规则
- TOOL_CALL_RE = re.compile(
- r"搜索策略|代码解释|文生图|数据增强|模型推理|工具调用|千次调用|/千次|次调用",
- re.I,
- )
- # 单位识别
- TOOL_UNIT_RE = re.compile(r"千次调用|/千次|次调用", re.I)
- def _is_tool_call_item(label: str, raw: str) -> bool:
- return bool(TOOL_CALL_RE.search(label) or TOOL_CALL_RE.search(raw))
- def parse_tool_prices_from_text(text: str) -> List[Dict]:
- """
- 从"工具调用价格"区块文本中提取工具调用价格条目。
- 文本是一整行,格式:
- 工具名Completions API价格信息工具名Responses API价格信息...
- """
- items: List[Dict] = []
- seen: set = set()
- price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)")
- free_re = re.compile(r"限时免费|免费")
- # 用 API 类型作为分隔符,切成 [工具名+价格, API类型, 工具名+价格, API类型, ...]
- api_sep_re = re.compile(r"(Completions API|Responses API)")
- parts = api_sep_re.split(text)
- # parts 结构: ["工具调用价格tool1", "Completions API", "价格1tool2", "Responses API", "价格2tool3", ...]
- # 每个条目 = parts[n](工具名在末尾) + parts[n+1](API类型,丢弃) + parts[n+2](价格在开头)
- # 工具名在前一段的末尾,价格在后一段的开头
- tool_re = re.compile(r"([a-zA-Z][a-zA-Z0-9_:]*)$") # 段末尾的工具名
- for i in range(0, len(parts) - 1, 2):
- before = parts[i] # 包含工具名(在末尾)
- # parts[i+1] 是 API 类型,跳过
- after = parts[i + 2] if i + 2 < len(parts) else "" # 包含价格(在开头)
- # 从 before 末尾提取工具名
- m = tool_re.search(before)
- if not m:
- continue
- label = m.group(1)
- if label in seen:
- continue
- # 从 after 开头提取价格信息(到下一个工具名开始前)
- next_tool_m = tool_re.search(after)
- price_info = after[: next_tool_m.start()].strip() if next_tool_m else after.strip()
- entry: Dict = {"label": label, "currency": "CNY", "unit": "元/千次调用"}
- if free_re.search(price_info):
- entry["price"] = 0
- entry["note"] = "限时免费"
- else:
- nums = price_re.findall(price_info)
- if not nums:
- continue
- try:
- entry["price"] = float(nums[0])
- except Exception:
- entry["price"] = nums[0]
- if re.search(r"限时优惠", price_info):
- entry["note"] = "限时优惠"
- dm = re.search(r"([0-9.]+)\s*折", price_info)
- if dm:
- entry["note"] = f"限时{dm.group(1)}折"
- seen.add(label)
- items.append(entry)
- return items
- price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)")
- free_re = re.compile(r"限时免费|免费")
- for m in pattern.finditer(text):
- label = m.group(1).strip()
- price_info = m.group(2).strip()
- if not label or label in seen:
- continue
- entry: Dict = {"label": label, "currency": "CNY", "unit": "元/千次调用"}
- if free_re.search(price_info):
- entry["price"] = 0
- entry["note"] = "限时免费"
- else:
- nums = price_re.findall(price_info)
- if not nums:
- continue
- try:
- entry["price"] = float(nums[0])
- except Exception:
- entry["price"] = nums[0]
- if re.search(r"限时优惠", price_info):
- entry["note"] = "限时优惠"
- dm = re.search(r"([0-9.]+)\s*折", price_info)
- if dm:
- entry["note"] = f"限时{dm.group(1)}折"
- seen.add(label)
- items.append(entry)
- return items
- # 用 API 类型标注作为分隔符切割整段文本
- api_sep_re = re.compile(r"(Completions API|Responses API)")
- price_re = re.compile(r"([0-9]+(?:\.[0-9]+)?)\s*元")
- free_re = re.compile(r"限时免费|免费")
- # 先去掉标题
- text = re.sub(r"^工具调用价格", "", text.strip())
- # 按 API 类型切割:得到 [工具名, API类型, 价格信息, 工具名, API类型, 价格信息, ...]
- parts = api_sep_re.split(text)
- # parts 结构:[工具名1, "Completions API", 价格1+工具名2, "Responses API", 价格2+工具名3, ...]
- items: List[Dict] = []
- seen: set = set()
- i = 0
- while i < len(parts):
- segment = parts[i].strip()
- # 跳过 API 类型标注本身
- if api_sep_re.fullmatch(segment):
- i += 1
- continue
- # 这段包含:上一条目的价格信息 + 下一条目的工具名
- # 需要从末尾提取工具名(工具名是纯英文+冒号/下划线,不含中文和数字价格)
- # 工具名模式:由字母、数字、下划线、冒号组成
- tool_name_re = re.compile(r"([a-zA-Z][a-zA-Z0-9_:]*(?:\.[a-zA-Z0-9_:]+)*)$")
- # 先提取末尾的工具名(留给下一轮用)
- next_tool = ""
- m = tool_name_re.search(segment)
- if m:
- next_tool = m.group(1)
- price_part = segment[: m.start()].strip()
- else:
- price_part = segment
- # 如果有上一个工具名等待配对价格
- if i > 0:
- # 找上一个工具名
- prev_tool = ""
- prev_seg = parts[i - 2].strip() if i >= 2 else ""
- tm = tool_name_re.search(prev_seg)
- if tm:
- prev_tool = tm.group(1)
- elif i == 1:
- # 第一段就是工具名
- prev_tool = parts[0].strip()
- if prev_tool and prev_tool not in seen:
- entry: Dict = {"label": prev_tool, "currency": "CNY", "unit": "元/千次调用"}
- if free_re.search(price_part) and not price_re.search(price_part):
- entry["price"] = 0
- entry["note"] = "限时免费"
- else:
- nums = price_re.findall(price_part)
- if nums:
- try:
- entry["price"] = float(nums[0])
- except Exception:
- entry["price"] = nums[0]
- if re.search(r"限时优惠", price_part):
- entry["note"] = "限时优惠"
- dm = re.search(r"([0-9.]+)\s*折", price_part)
- if dm:
- entry["note"] = f"限时{dm.group(1)}折"
- else:
- i += 1
- continue
- seen.add(prev_tool)
- items.append(entry)
- i += 1
- # 处理最后一个工具(最后一段没有后续 API 标注)
- if parts:
- last_seg = parts[-1].strip()
- # 如果最后一段不是 API 类型,且含价格或免费信息
- if not api_sep_re.fullmatch(last_seg):
- # 找最后一个工具名(倒数第二个 API 标注之后的工具名)
- # 已在循环中处理,这里处理最后一段的价格+工具名情况
- tool_name_re = re.compile(r"([a-zA-Z][a-zA-Z0-9_:]*(?:\.[a-zA-Z0-9_:]+)*)$")
- m = tool_name_re.search(last_seg)
- if m:
- last_tool = m.group(1)
- last_price_part = last_seg[: m.start()].strip()
- if last_tool not in seen and (free_re.search(last_price_part) or price_re.search(last_price_part)):
- entry = {"label": last_tool, "currency": "CNY", "unit": "元/千次调用"}
- if free_re.search(last_price_part) and not price_re.search(last_price_part):
- entry["price"] = 0
- entry["note"] = "限时免费"
- else:
- nums = price_re.findall(last_price_part)
- if nums:
- try:
- entry["price"] = float(nums[0])
- except Exception:
- entry["price"] = nums[0]
- if re.search(r"限时优惠", last_price_part):
- entry["note"] = "限时优惠"
- seen.add(last_tool)
- items.append(entry)
- return items
- def _get_tool_price_section_text(html: str) -> str:
- """
- 专门定位"工具调用价格"区块文本,排除 script/style。
- 工具调用价格是独立区块,标题为"工具调用价格",不在"模型价格"区块内。
- """
- try:
- from bs4 import BeautifulSoup, FeatureNotFound
- try:
- soup = BeautifulSoup(html, "lxml")
- except FeatureNotFound:
- soup = BeautifulSoup(html, "html.parser")
- # 优先找"工具调用价格"标题节点
- target_node = None
- for node in soup.find_all(string=re.compile(r"工具调用价格")):
- if node.parent and node.parent.name in ("script", "style"):
- continue
- target_node = node
- break
- if not target_node:
- return ""
- # 向上找包含价格数字的容器
- ancestor = target_node.parent
- for _ in range(10):
- txt = ancestor.get_text(separator="\n")
- if ("元" in txt or "免费" in txt) and len(txt) > 50:
- return txt
- if ancestor.parent:
- ancestor = ancestor.parent
- else:
- break
- return ancestor.get_text(separator="\n")
- except Exception:
- return ""
- def scrape_tool_prices_standalone(
- url: str,
- headless: bool = True,
- timeout: int = 20000,
- executable_path: Optional[str] = None,
- ) -> Dict:
- """
- 独立运行:启动浏览器,导航,抓取工具调用价格后关闭。
- 返回:
- {
- "url": str,
- "error": str | None,
- "tool_call_prices": [
- {"label": "搜索策略", "price": 0.5, "unit": "元/千次调用", "currency": "CNY"},
- ...
- ]
- }
- """
- from playwright.sync_api import sync_playwright
- result: Dict = {"url": url, "error": None, "tool_call_prices": []}
- with sync_playwright() as p:
- launch_kwargs: Dict = {"headless": headless}
- if executable_path:
- launch_kwargs["executable_path"] = executable_path
- browser = p.chromium.launch(**launch_kwargs)
- page = browser.new_context().new_page()
- try:
- page.goto(url, wait_until="networkidle", timeout=timeout)
- except PlaywrightTimeoutError:
- try:
- page.goto(url, wait_until="load", timeout=timeout)
- except Exception as e:
- result["error"] = f"导航失败: {e}"
- browser.close()
- return result
- try:
- page.wait_for_selector("text=模型价格", timeout=8000)
- except PlaywrightTimeoutError:
- pass
- time.sleep(1.2)
- html = page.content()
- price_text = _get_tool_price_section_text(html)
- if not price_text:
- # 尝试滚动后重试
- try:
- page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- time.sleep(1.5)
- html = page.content()
- price_text = _get_tool_price_section_text(html)
- except Exception:
- pass
- if not price_text:
- result["error"] = "未找到工具调用价格区域"
- browser.close()
- return result
- print(f"[DEBUG] 工具调用价格区域文本:\n{price_text[:300]}")
- result["tool_call_prices"] = parse_tool_prices_from_text(price_text)
- browser.close()
- return result
- if __name__ == "__main__":
- import argparse, os
- ap = argparse.ArgumentParser(description="抓取阿里云模型工具调用价格")
- group = ap.add_mutually_exclusive_group(required=True)
- group.add_argument("--url")
- group.add_argument("--file")
- ap.add_argument("--headful", action="store_true")
- ap.add_argument("--timeout", type=int, default=20000)
- ap.add_argument("--browser-path")
- args = ap.parse_args()
- urls = [args.url] if args.url else open(args.file, encoding="utf-8").read().splitlines()
- urls = [u.strip() for u in urls if u.strip()]
- exec_path = args.browser_path or os.environ.get("PLAYWRIGHT_EXECUTABLE")
- headless = not args.headful
- results = []
- for u in urls:
- print(f"抓取工具调用价格: {u}", flush=True)
- results.append(scrape_tool_prices_standalone(u, headless=headless, timeout=args.timeout, executable_path=exec_path))
- print(json.dumps(results, ensure_ascii=False, indent=2))
|