#!/usr/bin/env python3 """ main.py - 阿里云百炼模型完整信息抓取入口 整合以下模块,对每个 URL 只打开一次浏览器,依次运行所有抓取逻辑: - scrape_aliyun_models.py → 模型价格(含阶梯计费) - scrape_model_info.py → 模型基本信息 + 能力 - scrape_rate_limits.py → 限流与上下文 - scrape_tool_prices.py → 工具调用价格 用法: python main.py --url "https://bailian.console.aliyun.com/...#/model-market/detail/qwen3-max" python main.py --file urls.txt python main.py --url "..." --browser-path "D:\\playwright-browsers\\...\\chrome.exe" python main.py --url "..." --modules info,price,rate,tool # 只运行指定模块 python main.py --url "..." --headful # 有头模式调试 输出: JSON 到 stdout,同时保存到 output/.json """ import argparse import json import os import re import time from typing import Dict, List, Optional from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError # 导入各模块的核心解析函数(不启动独立浏览器) from scrape_model_info import ( _extract_model_id_from_url, _find_model_in_json, parse_model_info, API_URL_RE as INFO_API_RE, ) from scrape_rate_limits import ( parse_rate_limits_from_text, _get_rate_limit_section_text, ) from scrape_tool_prices import ( parse_tool_prices_from_text, _get_tool_price_section_text, ) from scrape_aliyun_models import ( scrape_model_price, ) from scrape_model_icon import _extract_icon_from_page def _navigate(page, url: str, timeout: int) -> bool: """导航到 URL,返回是否成功。""" try: page.goto(url, wait_until="domcontentloaded", timeout=timeout) return True except PlaywrightTimeoutError: try: page.goto(url, wait_until="load", timeout=timeout) return True except Exception as e: print(f"[ERROR] 导航失败: {e}") return False def _wait_for_content(page) -> None: """等待页面核心内容渲染完成。""" for sel in ["text=模型价格", "text=模型介绍", "text=模型能力", "text=模型限流"]: try: page.wait_for_selector(sel, timeout=6000) break except PlaywrightTimeoutError: pass time.sleep(1.5) # 滚动触发懒加载 try: page.evaluate("window.scrollTo(0, document.body.scrollHeight)") time.sleep(0.8) page.evaluate("window.scrollTo(0, 0)") time.sleep(0.3) except Exception: pass def _parse_cookies_env(cookies_str: str, domain: str = ".aliyun.com") -> List[Dict]: """ 将 Cookie 字符串(浏览器复制的 name=value; name2=value2 格式) 解析为 Playwright set_cookies 所需的列表格式。 """ cookies = [] for part in cookies_str.split(";"): part = part.strip() if not part: continue if "=" in part: name, _, value = part.partition("=") cookies.append({ "name": name.strip(), "value": value.strip(), "domain": domain, "path": "/", }) return cookies def scrape_all( url: str, headless: bool = True, timeout: int = 20000, executable_path: Optional[str] = None, modules: Optional[List[str]] = None, api_key: Optional[str] = None, model_hint: Optional[str] = None, ) -> Dict: """ 对单个 URL 运行所有(或指定)模块,共享一个浏览器实例。 modules 可选值: ["info", "rate", "tool", "price"] 默认全部运行。 api_key: 可选的 API 密钥,将通过请求头传递给目标站点。 model_hint: 可选的模型名称提示,优先用于 API JSON 匹配,而不是从 URL 提取。 """ if modules is None: modules = ["info", "rate", "tool", "price", "icon"] # 优先用外部传入的 model_hint,否则从 URL 提取 target = model_hint.strip() if model_hint and model_hint.strip() else _extract_model_id_from_url(url) result: Dict = {"url": url, "model_id": target, "error": None} # price 模块复用原始脚本,独立启动浏览器(原脚本结构限制) # 其余模块共享一个浏览器实例 shared_modules = [m for m in modules if m != "price"] # 从环境变量读取登录 Cookie aliyun_cookies_str = os.environ.get("ALIYUN_COOKIES", "").strip() # ── 共享浏览器:info / rate / tool ────────────────────────────────────────── if shared_modules: api_data: List[Dict] = [] with sync_playwright() as p: launch_kwargs: Dict = {"headless": headless} if executable_path: launch_kwargs["executable_path"] = executable_path # 额外 Chrome 启动参数(生产环境 Linux 可通过 PLAYWRIGHT_EXTRA_ARGS 注入) extra_args_env = os.environ.get("PLAYWRIGHT_EXTRA_ARGS", "") extra_args = [a.strip() for a in extra_args_env.split(",") if a.strip()] if extra_args: launch_kwargs["args"] = extra_args browser = p.chromium.launch(**launch_kwargs) # 如果有 api_key,通过额外请求头传递 context_kwargs: Dict = {} if api_key: context_kwargs["extra_http_headers"] = {"Authorization": f"Bearer {api_key}"} context = browser.new_context(**context_kwargs) # 注入登录 Cookie(避免被重定向到登录/免费试用页) if aliyun_cookies_str: cookies = _parse_cookies_env(aliyun_cookies_str) if cookies: context.add_cookies(cookies) print(f"[INFO] 已注入 {len(cookies)} 个 Cookie") page = context.new_page() # 只拦截匹配 INFO_API_RE 的 JSON API 请求,其余直接放行 # 避免对图片/日志等请求调用 route.fetch() 导致 DNS 失败崩溃 import json as _json def handle_api_route(route, request): try: resp = route.fetch() try: ct = resp.headers.get("content-type", "") if "application/json" in ct: api_data.append(_json.loads(resp.body())) except Exception: pass route.fulfill(response=resp) except Exception as e: try: route.continue_() except Exception: pass # 只对匹配 API 的 URL 注册拦截,其余请求不拦截(直接走浏览器默认行为) page.route( lambda url: bool(INFO_API_RE.search(url)), handle_api_route, ) if not _navigate(page, url, timeout): result["error"] = "导航失败" browser.close() else: try: page.wait_for_load_state("networkidle", timeout=20000) except PlaywrightTimeoutError: pass _wait_for_content(page) # 从 API 找模型对象 model_obj = None for body in api_data: found = _find_model_in_json(body, target) if found: model_obj = found break if not model_obj: print(f"[WARN] 未从 API 找到模型 '{target}',部分字段将为空") # ── info 模块 ── if "info" in shared_modules: if model_obj: result["info"] = parse_model_info(model_obj) else: result["info"] = {"error": f"未找到模型 '{target}'"} # ── rate 模块 ── if "rate" in shared_modules: rate_text = _get_rate_limit_section_text(page) result["rate_limits"] = parse_rate_limits_from_text(rate_text) if rate_text else {} # ── tool 模块 ── if "tool" in shared_modules: html = page.content() tool_text = _get_tool_price_section_text(html) result["tool_call_prices"] = parse_tool_prices_from_text(tool_text) if tool_text else [] # ── icon 模块 ── if "icon" in shared_modules: icon = _extract_icon_from_page(page) result["icon"] = icon.get("data") if icon.get("type") != "none" else None # ── price 模块(复用共享浏览器) ── if "price" in modules: try: from scrape_aliyun_models import ( extract_price_items_from_html, extract_price_block_html, parse_prices_from_text, _ensure_tiered_pricing, _get_tier_options, _select_tier_option, _normalize_tier_option, ) import time as _time _ensure_tiered_pricing(page) tier_options = _get_tier_options(page) tiered_items = [] if tier_options: for opt in tier_options: if not _select_tier_option(page, opt): continue html = page.content() try: tier_items = extract_price_items_from_html(html) except Exception: tier_items = [] for it in tier_items: it["tier"] = opt tiered_items.extend(tier_items) if tiered_items: items = tiered_items else: html = page.content() items = extract_price_items_from_html(html) if not items: text_block = extract_price_block_html(html) items = parse_prices_from_text(text_block) if text_block else [] # 构建 price_map(复用 scrape_model_price 里的逻辑) def _build_price_map(parsed_items): price_map = {} for it in parsed_items: if isinstance(it, dict) and it.get("tiers") and isinstance(it.get("tiers"), dict): for tier_key, tier_val in it["tiers"].items(): k = _normalize_tier_option(tier_key) price_map.setdefault(k, {}) sub_label = tier_val.get("label") or tier_val.get("raw") or k price_map[k][sub_label] = {kk: v for kk, v in tier_val.items() if kk not in ("tier", "tiers", "label")} continue if it.get("tier"): tk = _normalize_tier_option(it.get("tier")) price_map.setdefault(tk, {}) sub_label = it.get("label") or it.get("raw") or tk price_map[tk][sub_label] = {kk: v for kk, v in it.items() if kk not in ("tier", "label")} continue lbl = it.get("label") or it.get("raw") or "price" if lbl in price_map and not isinstance(price_map[lbl], list): price_map[lbl] = [price_map[lbl]] if isinstance(price_map.get(lbl), list): price_map[lbl].append({kk: v for kk, v in it.items() if kk != "label"}) else: price_map[lbl] = {kk: v for kk, v in it.items() if kk != "label"} return price_map result["prices"] = _build_price_map(items) except Exception as e: import traceback as _tb print(f"[ERROR] 价格模块异常: {e}\n{_tb.format_exc()}") result["prices"] = {} result["price_error"] = str(e) browser.close() # ── price 模块回退:若 shared_modules 为空(不含 info/rate/tool),独立启动浏览器 ── if "price" in modules and not shared_modules: print(f"[INFO] 运行价格模块(独立浏览器)...") try: price_result = scrape_model_price( url, headless=headless, timeout=timeout, executable_path=executable_path, api_key=api_key, cookies_str=aliyun_cookies_str, ) result["prices"] = price_result.get("prices", {}) if price_result.get("error"): result["price_error"] = price_result["error"] print(f"[WARN] 价格模块错误: {price_result['error']}") except Exception as e: import traceback as _tb print(f"[ERROR] 价格模块异常: {e}\n{_tb.format_exc()}") result["prices"] = {} result["price_error"] = str(e) return result def main(): ap = argparse.ArgumentParser( description="阿里云百炼模型完整信息抓取(整合所有模块)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 模块说明: info - 模型基本信息、能力、模态 rate - 限流与上下文(RPM、context window 等) tool - 工具调用价格 price - 模型 token 价格(含阶梯计费) 示例: python main.py --url "https://..." --browser-path "D:\\chrome.exe" python main.py --file urls.txt --headful python main.py --url "https://..." --modules info,rate """, ) group = ap.add_mutually_exclusive_group(required=True) group.add_argument("--url", help="单个模型页面 URL") group.add_argument("--file", help="URL 列表文件(每行一个)") ap.add_argument("--headful", action="store_true", help="有头模式(方便调试)") ap.add_argument("--timeout", type=int, default=20000, help="导航超时毫秒,默认 20000") ap.add_argument("--browser-path", help="浏览器可执行文件路径") ap.add_argument( "--modules", default="info,rate,tool,price", help="要运行的模块,逗号分隔,可选: info,rate,tool,price(默认全部)", ) ap.add_argument("--output-dir", default="output", help="结果保存目录,默认 output/") args = ap.parse_args() urls: List[str] = [] if args.url: urls = [args.url] else: with open(args.file, "r", encoding="utf-8") as f: urls = [ln.strip() for ln in f if ln.strip()] exec_path = args.browser_path or os.environ.get("PLAYWRIGHT_EXECUTABLE") headless = not args.headful if os.environ.get("PLAYWRIGHT_HEADLESS", "").lower() == "false": headless = False modules = [m.strip() for m in args.modules.split(",") if m.strip()] print(f"[INFO] 运行模块: {modules}") os.makedirs(args.output_dir, exist_ok=True) all_results = [] for u in urls: print(f"\n{'='*60}\n[INFO] 抓取: {u}", flush=True) res = scrape_all(u, headless=headless, timeout=args.timeout, executable_path=exec_path, modules=modules) all_results.append(res) # 保存单个结果 model_id = res.get("model_id", "unknown") safe_id = re.sub(r"[^\w\-.]", "_", model_id) out_path = os.path.join(args.output_dir, f"{safe_id}.json") with open(out_path, "w", encoding="utf-8") as f: json.dump(res, f, ensure_ascii=False, indent=2) print(f"[INFO] 已保存: {out_path}") # 输出到 stdout print(json.dumps(all_results, ensure_ascii=False, indent=2)) if __name__ == "__main__": main()