#!/usr/bin/env python3 """ main.py - 阿里云百炼模型完整信息抓取入口 整合以下模块,对每个 URL 只打开一次浏览器,依次运行所有抓取逻辑: - scrape_aliyun_models.py → 模型价格(含阶梯计费) - scrape_model_info.py → 模型基本信息 + 能力 - scrape_rate_limits.py → 限流与上下文 - scrape_tool_prices.py → 工具调用价格 用法: python main.py --url "https://bailian.console.aliyun.com/...#/model-market/detail/qwen3-max" python main.py --file urls.txt python main.py --url "..." --browser-path "D:\\playwright-browsers\\...\\chrome.exe" python main.py --url "..." --modules info,price,rate,tool # 只运行指定模块 python main.py --url "..." --headful # 有头模式调试 输出: JSON 到 stdout,同时保存到 output/.json """ import argparse import json import os import re import time from typing import Dict, List, Optional from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError # 导入各模块的核心解析函数(不启动独立浏览器) from scrape_model_info import ( _extract_model_id_from_url, _find_model_in_json, parse_model_info, API_URL_RE as INFO_API_RE, ) from scrape_rate_limits import ( parse_rate_limits_from_text, _get_rate_limit_section_text, ) from scrape_tool_prices import ( parse_tool_prices_from_text, _get_tool_price_section_text, ) from scrape_aliyun_models import ( scrape_model_price, ) def _navigate(page, url: str, timeout: int) -> bool: """导航到 URL,返回是否成功。""" try: page.goto(url, wait_until="networkidle", timeout=timeout) return True except PlaywrightTimeoutError: try: page.goto(url, wait_until="load", timeout=timeout) return True except Exception as e: print(f"[ERROR] 导航失败: {e}") return False def _wait_for_content(page) -> None: """等待页面核心内容渲染完成。""" for sel in ["text=模型价格", "text=模型介绍", "text=模型能力"]: try: page.wait_for_selector(sel, timeout=6000) break except PlaywrightTimeoutError: pass time.sleep(1.5) # 滚动触发懒加载 try: page.evaluate("window.scrollTo(0, document.body.scrollHeight)") time.sleep(0.8) page.evaluate("window.scrollTo(0, 0)") time.sleep(0.3) except Exception: pass def scrape_all( url: str, headless: bool = True, timeout: int = 20000, executable_path: Optional[str] = None, modules: Optional[List[str]] = None, ) -> Dict: """ 对单个 URL 运行所有(或指定)模块,共享一个浏览器实例。 modules 可选值: ["info", "rate", "tool", "price"] 默认全部运行。 """ if modules is None: modules = ["info", "rate", "tool", "price"] target = _extract_model_id_from_url(url) result: Dict = {"url": url, "model_id": target, "error": None} # price 模块复用原始脚本,独立启动浏览器(原脚本结构限制) # 其余模块共享一个浏览器实例 shared_modules = [m for m in modules if m != "price"] # ── 共享浏览器:info / rate / tool ────────────────────────────────────────── if shared_modules: api_data: List[Dict] = [] with sync_playwright() as p: launch_kwargs: Dict = {"headless": headless} if executable_path: launch_kwargs["executable_path"] = executable_path # 额外 Chrome 启动参数(生产环境 Linux 可通过 PLAYWRIGHT_EXTRA_ARGS 注入) extra_args_env = os.environ.get("PLAYWRIGHT_EXTRA_ARGS", "") extra_args = [a.strip() for a in extra_args_env.split(",") if a.strip()] if extra_args: launch_kwargs["args"] = extra_args browser = p.chromium.launch(**launch_kwargs) page = browser.new_context().new_page() # 拦截 API 响应 def on_response(resp): try: if "application/json" not in resp.headers.get("content-type", ""): return if not INFO_API_RE.search(resp.url): return try: api_data.append(resp.json()) except Exception: pass except Exception: pass page.on("response", on_response) if not _navigate(page, url, timeout): result["error"] = "导航失败" browser.close() else: _wait_for_content(page) # 从 API 找模型对象 model_obj = None for body in api_data: found = _find_model_in_json(body, target) if found: model_obj = found print(f"[INFO] API 找到模型: {found.get('model', found.get('name', target))}") break if not model_obj: print(f"[WARN] 未从 API 找到模型 '{target}',部分字段将为空") # ── info 模块 ── if "info" in shared_modules: if model_obj: result["info"] = parse_model_info(model_obj) else: result["info"] = {"error": f"未找到模型 '{target}'"} # ── rate 模块 ── if "rate" in shared_modules: rate_text = _get_rate_limit_section_text(page) result["rate_limits"] = parse_rate_limits_from_text(rate_text) if rate_text else {} # ── tool 模块 ── if "tool" in shared_modules: html = page.content() tool_text = _get_tool_price_section_text(html) result["tool_call_prices"] = parse_tool_prices_from_text(tool_text) if tool_text else [] browser.close() # ── price 模块(原始脚本,独立浏览器) ────────────────────────────────────── if "price" in modules: print(f"[INFO] 运行价格模块...") price_result = scrape_model_price( url, headless=headless, timeout=timeout, executable_path=executable_path, ) result["prices"] = price_result.get("prices", {}) if price_result.get("error"): result["price_error"] = price_result["error"] return result def main(): ap = argparse.ArgumentParser( description="阿里云百炼模型完整信息抓取(整合所有模块)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 模块说明: info - 模型基本信息、能力、模态 rate - 限流与上下文(RPM、context window 等) tool - 工具调用价格 price - 模型 token 价格(含阶梯计费) 示例: python main.py --url "https://..." --browser-path "D:\\chrome.exe" python main.py --file urls.txt --headful python main.py --url "https://..." --modules info,rate """, ) group = ap.add_mutually_exclusive_group(required=True) group.add_argument("--url", help="单个模型页面 URL") group.add_argument("--file", help="URL 列表文件(每行一个)") ap.add_argument("--headful", action="store_true", help="有头模式(方便调试)") ap.add_argument("--timeout", type=int, default=20000, help="导航超时毫秒,默认 20000") ap.add_argument("--browser-path", help="浏览器可执行文件路径") ap.add_argument( "--modules", default="info,rate,tool,price", help="要运行的模块,逗号分隔,可选: info,rate,tool,price(默认全部)", ) ap.add_argument("--output-dir", default="output", help="结果保存目录,默认 output/") args = ap.parse_args() urls: List[str] = [] if args.url: urls = [args.url] else: with open(args.file, "r", encoding="utf-8") as f: urls = [ln.strip() for ln in f if ln.strip()] exec_path = args.browser_path or os.environ.get("PLAYWRIGHT_EXECUTABLE") headless = not args.headful if os.environ.get("PLAYWRIGHT_HEADLESS", "").lower() == "false": headless = False modules = [m.strip() for m in args.modules.split(",") if m.strip()] print(f"[INFO] 运行模块: {modules}") os.makedirs(args.output_dir, exist_ok=True) all_results = [] for u in urls: print(f"\n{'='*60}\n[INFO] 抓取: {u}", flush=True) res = scrape_all(u, headless=headless, timeout=args.timeout, executable_path=exec_path, modules=modules) all_results.append(res) # 保存单个结果 model_id = res.get("model_id", "unknown") safe_id = re.sub(r"[^\w\-.]", "_", model_id) out_path = os.path.join(args.output_dir, f"{safe_id}.json") with open(out_path, "w", encoding="utf-8") as f: json.dump(res, f, ensure_ascii=False, indent=2) print(f"[INFO] 已保存: {out_path}") # 输出到 stdout print(json.dumps(all_results, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()