#!/usr/bin/env python3 """ scrape_rate_limits.py 抓取阿里云百炼模型"模型限流与上下文"区块,字段与页面完全对应: 最大输入长度、RPM、最大输入长度(思考)、上下文长度 最大输出长度、TPM、最大输出长度(思考)、最大思维链长度 原理:从页面文本直接提取,字段名和值与页面显示一致。 """ import re import time import json from typing import Dict, List, Optional from playwright.sync_api import TimeoutError as PlaywrightTimeoutError # 页面字段名 -> 输出 key 映射(按截图顺序) FIELD_PATTERNS = [ # (正则匹配页面文字, 输出 key) (r"最大输入长度[((]思考[))]", "最大输入长度(思考)"), (r"最大输入长度", "最大输入长度"), (r"最大输出长度[((]思考[))]", "最大输出长度(思考)"), (r"最大输出长度", "最大输出长度"), (r"上下文长度", "上下文长度"), (r"最大思维链长度", "最大思维链长度"), (r"\bRPM\b", "RPM"), (r"\bTPM\b", "TPM"), (r"\bQPM\b", "QPM"), ] # 值的格式:数字 + 可选单位(K/M/万 等) VALUE_RE = re.compile(r"(\d[\d,,]*(?:\.\d+)?\s*[KkMm万]?)") def _extract_model_id_from_url(url: str) -> str: m = re.search(r"#.*?/detail/([^/?#&]+)", url) if m: return m.group(1).strip() clean = re.sub(r"[?#].*", "", url) parts = [p for p in clean.rstrip("/").split("/") if p] return parts[-1] if parts else "" def _get_rate_limit_section_text(page) -> str: """从页面提取"模型限流与上下文"区块的文本。""" try: return page.evaluate(""" () => { // 找"模型限流与上下文"标题节点 const walker = document.createTreeWalker(document.body, NodeFilter.SHOW_TEXT); let node; while ((node = walker.nextNode())) { if (/模型限流|限流与上下文/.test(node.textContent)) { let el = node.parentElement; for (let i = 0; i < 10; i++) { if (!el) break; const txt = (el.innerText || '').trim(); // 找到包含数字和限流关键词的容器 if (txt.length > 50 && /RPM|TPM|\\d+K/.test(txt)) return txt; el = el.parentElement; } } } return ''; } """) except Exception: return "" def parse_rate_limits_from_text(text: str) -> Dict: """ 从限流区块文本中提取字段,输出与页面完全对应。 文本示例(紧凑格式): 模型限流与上下文最大输入长度252KRPM30000最大输入长度(思考)252K上下文长度256K 最大输出长度64KTPM5000000最大输出长度(思考)32K最大思维链长度80K """ result: Dict = {} # 把文本规范化:去掉多余空白 text = re.sub(r"\s+", " ", text).strip() for pattern, key in FIELD_PATTERNS: if key in result: continue # 匹配数值或 "-"(空值) m = re.search(pattern + r"\s*(-|[0-9][0-9,,]*(?:\.\d+)?\s*[KkMm万]?)", text, re.I) if m: val = m.group(1).strip().replace(",", ",") if val == "-": result[key] = None else: # 统一大写 K val = re.sub(r"k$", "K", val) result[key] = val return result def scrape_rate_limits_standalone( url: str, headless: bool = True, timeout: int = 20000, executable_path: Optional[str] = None, ) -> Dict: """独立运行:启动浏览器,导航,抓取限流信息后关闭。""" from playwright.sync_api import sync_playwright target = _extract_model_id_from_url(url) result: Dict = {"url": url, "model_code": target, "error": None} with sync_playwright() as p: launch_kwargs: Dict = {"headless": headless} if executable_path: launch_kwargs["executable_path"] = executable_path browser = p.chromium.launch(**launch_kwargs) page = browser.new_context().new_page() try: page.goto(url, wait_until="networkidle", timeout=timeout) except PlaywrightTimeoutError: try: page.goto(url, wait_until="load", timeout=timeout) except Exception as e: result["error"] = f"导航失败: {e}" browser.close() return result for sel in ["text=模型限流", "text=上下文长度", "text=RPM"]: try: page.wait_for_selector(sel, timeout=6000) break except PlaywrightTimeoutError: pass time.sleep(1.0) # 滚动确保限流区块加载 try: page.evaluate("window.scrollTo(0, document.body.scrollHeight)") time.sleep(0.8) except Exception: pass text = _get_rate_limit_section_text(page) print(f"[DEBUG] 限流区块文本: {text[:200]}") if text: result["rate_limits"] = parse_rate_limits_from_text(text) else: result["error"] = "未找到模型限流与上下文区块" result["rate_limits"] = {} browser.close() return result if __name__ == "__main__": import argparse, os ap = argparse.ArgumentParser(description="抓取阿里云模型限流与上下文信息") group = ap.add_mutually_exclusive_group(required=True) group.add_argument("--url") group.add_argument("--file") ap.add_argument("--headful", action="store_true") ap.add_argument("--timeout", type=int, default=20000) ap.add_argument("--browser-path") args = ap.parse_args() urls = [args.url] if args.url else open(args.file, encoding="utf-8").read().splitlines() urls = [u.strip() for u in urls if u.strip()] exec_path = args.browser_path or os.environ.get("PLAYWRIGHT_EXECUTABLE") headless = not args.headful results = [] for u in urls: print(f"抓取限流信息: {u}", flush=True) results.append(scrape_rate_limits_standalone( u, headless=headless, timeout=args.timeout, executable_path=exec_path )) print(json.dumps(results, ensure_ascii=False, indent=2))