| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- #!/usr/bin/env python3
- """
- main.py - 阿里云百炼模型完整信息抓取入口
- 整合以下模块,对每个 URL 只打开一次浏览器,依次运行所有抓取逻辑:
- - scrape_aliyun_models.py → 模型价格(含阶梯计费)
- - scrape_model_info.py → 模型基本信息 + 能力
- - scrape_rate_limits.py → 限流与上下文
- - scrape_tool_prices.py → 工具调用价格
- 用法:
- python main.py --url "https://bailian.console.aliyun.com/...#/model-market/detail/qwen3-max"
- python main.py --file urls.txt
- python main.py --url "..." --browser-path "D:\\playwright-browsers\\...\\chrome.exe"
- python main.py --url "..." --modules info,price,rate,tool # 只运行指定模块
- python main.py --url "..." --headful # 有头模式调试
- 输出: JSON 到 stdout,同时保存到 output/<model_id>.json
- """
- import argparse
- import json
- import os
- import re
- import time
- from typing import Dict, List, Optional
- from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeoutError
- # 导入各模块的核心解析函数(不启动独立浏览器)
- from scrape_model_info import (
- _extract_model_id_from_url,
- _find_model_in_json,
- parse_model_info,
- API_URL_RE as INFO_API_RE,
- )
- from scrape_rate_limits import (
- parse_rate_limits_from_text,
- _get_rate_limit_section_text,
- )
- from scrape_tool_prices import (
- parse_tool_prices_from_text,
- _get_tool_price_section_text,
- )
- from scrape_aliyun_models import (
- scrape_model_price,
- )
- from scrape_model_icon import _extract_icon_from_page
- def _navigate(page, url: str, timeout: int) -> bool:
- """导航到 URL,返回是否成功。"""
- try:
- page.goto(url, wait_until="domcontentloaded", timeout=timeout)
- return True
- except PlaywrightTimeoutError:
- try:
- page.goto(url, wait_until="load", timeout=timeout)
- return True
- except Exception as e:
- print(f"[ERROR] 导航失败: {e}")
- return False
- def _wait_for_content(page) -> None:
- """等待页面核心内容渲染完成。"""
- for sel in ["text=模型价格", "text=模型介绍", "text=模型能力", "text=模型限流"]:
- try:
- page.wait_for_selector(sel, timeout=6000)
- break
- except PlaywrightTimeoutError:
- pass
- time.sleep(1.5)
- # 滚动触发懒加载
- try:
- page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- time.sleep(0.8)
- page.evaluate("window.scrollTo(0, 0)")
- time.sleep(0.3)
- except Exception:
- pass
- def _parse_cookies_env(cookies_str: str, domain: str = ".aliyun.com") -> List[Dict]:
- """
- 将 Cookie 字符串(浏览器复制的 name=value; name2=value2 格式)
- 解析为 Playwright set_cookies 所需的列表格式。
- """
- cookies = []
- for part in cookies_str.split(";"):
- part = part.strip()
- if not part:
- continue
- if "=" in part:
- name, _, value = part.partition("=")
- cookies.append({
- "name": name.strip(),
- "value": value.strip(),
- "domain": domain,
- "path": "/",
- })
- return cookies
- def scrape_all(
- url: str,
- headless: bool = True,
- timeout: int = 20000,
- executable_path: Optional[str] = None,
- modules: Optional[List[str]] = None,
- api_key: Optional[str] = None,
- model_hint: Optional[str] = None,
- ) -> Dict:
- """
- 对单个 URL 运行所有(或指定)模块,共享一个浏览器实例。
- modules 可选值: ["info", "rate", "tool", "price"]
- 默认全部运行。
- api_key: 可选的 API 密钥,将通过请求头传递给目标站点。
- model_hint: 可选的模型名称提示,优先用于 API JSON 匹配,而不是从 URL 提取。
- """
- if modules is None:
- modules = ["info", "rate", "tool", "price", "icon"]
- # 优先用外部传入的 model_hint,否则从 URL 提取
- target = model_hint.strip() if model_hint and model_hint.strip() else _extract_model_id_from_url(url)
- result: Dict = {"url": url, "model_id": target, "error": None}
- # price 模块复用原始脚本,独立启动浏览器(原脚本结构限制)
- # 其余模块共享一个浏览器实例
- shared_modules = [m for m in modules if m != "price"]
- # 从环境变量读取登录 Cookie
- aliyun_cookies_str = os.environ.get("ALIYUN_COOKIES", "").strip()
- # ── 共享浏览器:info / rate / tool ──────────────────────────────────────────
- if shared_modules:
- api_data: List[Dict] = []
- with sync_playwright() as p:
- launch_kwargs: Dict = {"headless": headless}
- if executable_path:
- launch_kwargs["executable_path"] = executable_path
- # 额外 Chrome 启动参数(生产环境 Linux 可通过 PLAYWRIGHT_EXTRA_ARGS 注入)
- extra_args_env = os.environ.get("PLAYWRIGHT_EXTRA_ARGS", "")
- extra_args = [a.strip() for a in extra_args_env.split(",") if a.strip()]
- if extra_args:
- launch_kwargs["args"] = extra_args
- browser = p.chromium.launch(**launch_kwargs)
- # 如果有 api_key,通过额外请求头传递
- context_kwargs: Dict = {}
- if api_key:
- context_kwargs["extra_http_headers"] = {"Authorization": f"Bearer {api_key}"}
- context = browser.new_context(**context_kwargs)
- # 注入登录 Cookie(避免被重定向到登录/免费试用页)
- if aliyun_cookies_str:
- cookies = _parse_cookies_env(aliyun_cookies_str)
- if cookies:
- context.add_cookies(cookies)
- print(f"[INFO] 已注入 {len(cookies)} 个 Cookie")
- page = context.new_page()
- # 只拦截匹配 INFO_API_RE 的 JSON API 请求,其余直接放行
- # 避免对图片/日志等请求调用 route.fetch() 导致 DNS 失败崩溃
- import json as _json
- def handle_api_route(route, request):
- try:
- resp = route.fetch()
- try:
- ct = resp.headers.get("content-type", "")
- if "application/json" in ct:
- api_data.append(_json.loads(resp.body()))
- except Exception:
- pass
- route.fulfill(response=resp)
- except Exception as e:
- try:
- route.continue_()
- except Exception:
- pass
- # 只对匹配 API 的 URL 注册拦截,其余请求不拦截(直接走浏览器默认行为)
- page.route(
- lambda url: bool(INFO_API_RE.search(url)),
- handle_api_route,
- )
- if not _navigate(page, url, timeout):
- result["error"] = "导航失败"
- browser.close()
- else:
- try:
- page.wait_for_load_state("networkidle", timeout=20000)
- except PlaywrightTimeoutError:
- pass
- _wait_for_content(page)
- # 从 API 找模型对象
- model_obj = None
- for body in api_data:
- found = _find_model_in_json(body, target)
- if found:
- model_obj = found
- break
- if not model_obj:
- print(f"[WARN] 未从 API 找到模型 '{target}',部分字段将为空")
- # ── info 模块 ──
- if "info" in shared_modules:
- if model_obj:
- result["info"] = parse_model_info(model_obj)
- else:
- result["info"] = {"error": f"未找到模型 '{target}'"}
- # ── rate 模块 ──
- if "rate" in shared_modules:
- rate_text = _get_rate_limit_section_text(page)
- result["rate_limits"] = parse_rate_limits_from_text(rate_text) if rate_text else {}
- # ── tool 模块 ──
- if "tool" in shared_modules:
- html = page.content()
- tool_text = _get_tool_price_section_text(html)
- result["tool_call_prices"] = parse_tool_prices_from_text(tool_text) if tool_text else []
- # ── icon 模块 ──
- if "icon" in shared_modules:
- icon = _extract_icon_from_page(page)
- result["icon"] = icon.get("data") if icon.get("type") != "none" else None
- # ── price 模块(复用共享浏览器) ──
- if "price" in modules:
- try:
- from scrape_aliyun_models import (
- extract_price_items_from_html,
- extract_price_block_html,
- parse_prices_from_text,
- _ensure_tiered_pricing,
- _get_tier_options,
- _select_tier_option,
- _normalize_tier_option,
- )
- import time as _time
- _ensure_tiered_pricing(page)
- tier_options = _get_tier_options(page)
- tiered_items = []
- if tier_options:
- for opt in tier_options:
- if not _select_tier_option(page, opt):
- continue
- html = page.content()
- try:
- tier_items = extract_price_items_from_html(html)
- except Exception:
- tier_items = []
- for it in tier_items:
- it["tier"] = opt
- tiered_items.extend(tier_items)
- if tiered_items:
- items = tiered_items
- else:
- html = page.content()
- items = extract_price_items_from_html(html)
- if not items:
- text_block = extract_price_block_html(html)
- items = parse_prices_from_text(text_block) if text_block else []
- # 构建 price_map(复用 scrape_model_price 里的逻辑)
- def _build_price_map(parsed_items):
- price_map = {}
- for it in parsed_items:
- if isinstance(it, dict) and it.get("tiers") and isinstance(it.get("tiers"), dict):
- for tier_key, tier_val in it["tiers"].items():
- k = _normalize_tier_option(tier_key)
- price_map.setdefault(k, {})
- sub_label = tier_val.get("label") or tier_val.get("raw") or k
- price_map[k][sub_label] = {kk: v for kk, v in tier_val.items() if kk not in ("tier", "tiers", "label")}
- continue
- if it.get("tier"):
- tk = _normalize_tier_option(it.get("tier"))
- price_map.setdefault(tk, {})
- sub_label = it.get("label") or it.get("raw") or tk
- price_map[tk][sub_label] = {kk: v for kk, v in it.items() if kk not in ("tier", "label")}
- continue
- lbl = it.get("label") or it.get("raw") or "price"
- if lbl in price_map and not isinstance(price_map[lbl], list):
- price_map[lbl] = [price_map[lbl]]
- if isinstance(price_map.get(lbl), list):
- price_map[lbl].append({kk: v for kk, v in it.items() if kk != "label"})
- else:
- price_map[lbl] = {kk: v for kk, v in it.items() if kk != "label"}
- return price_map
- result["prices"] = _build_price_map(items)
- except Exception as e:
- import traceback as _tb
- print(f"[ERROR] 价格模块异常: {e}\n{_tb.format_exc()}")
- result["prices"] = {}
- result["price_error"] = str(e)
- browser.close()
- # ── price 模块回退:若 shared_modules 为空(不含 info/rate/tool),独立启动浏览器 ──
- if "price" in modules and not shared_modules:
- print(f"[INFO] 运行价格模块(独立浏览器)...")
- try:
- price_result = scrape_model_price(
- url,
- headless=headless,
- timeout=timeout,
- executable_path=executable_path,
- api_key=api_key,
- cookies_str=aliyun_cookies_str,
- )
- result["prices"] = price_result.get("prices", {})
- if price_result.get("error"):
- result["price_error"] = price_result["error"]
- print(f"[WARN] 价格模块错误: {price_result['error']}")
- except Exception as e:
- import traceback as _tb
- print(f"[ERROR] 价格模块异常: {e}\n{_tb.format_exc()}")
- result["prices"] = {}
- result["price_error"] = str(e)
- return result
- def main():
- ap = argparse.ArgumentParser(
- description="阿里云百炼模型完整信息抓取(整合所有模块)",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 模块说明:
- info - 模型基本信息、能力、模态
- rate - 限流与上下文(RPM、context window 等)
- tool - 工具调用价格
- price - 模型 token 价格(含阶梯计费)
- 示例:
- python main.py --url "https://..." --browser-path "D:\\chrome.exe"
- python main.py --file urls.txt --headful
- python main.py --url "https://..." --modules info,rate
- """,
- )
- group = ap.add_mutually_exclusive_group(required=True)
- group.add_argument("--url", help="单个模型页面 URL")
- group.add_argument("--file", help="URL 列表文件(每行一个)")
- ap.add_argument("--headful", action="store_true", help="有头模式(方便调试)")
- ap.add_argument("--timeout", type=int, default=20000, help="导航超时毫秒,默认 20000")
- ap.add_argument("--browser-path", help="浏览器可执行文件路径")
- ap.add_argument(
- "--modules",
- default="info,rate,tool,price",
- help="要运行的模块,逗号分隔,可选: info,rate,tool,price(默认全部)",
- )
- ap.add_argument("--output-dir", default="output", help="结果保存目录,默认 output/")
- args = ap.parse_args()
- urls: List[str] = []
- if args.url:
- urls = [args.url]
- else:
- with open(args.file, "r", encoding="utf-8") as f:
- urls = [ln.strip() for ln in f if ln.strip()]
- exec_path = args.browser_path or os.environ.get("PLAYWRIGHT_EXECUTABLE")
- headless = not args.headful
- if os.environ.get("PLAYWRIGHT_HEADLESS", "").lower() == "false":
- headless = False
- modules = [m.strip() for m in args.modules.split(",") if m.strip()]
- print(f"[INFO] 运行模块: {modules}")
- os.makedirs(args.output_dir, exist_ok=True)
- all_results = []
- for u in urls:
- print(f"\n{'='*60}\n[INFO] 抓取: {u}", flush=True)
- res = scrape_all(u, headless=headless, timeout=args.timeout,
- executable_path=exec_path, modules=modules)
- all_results.append(res)
- # 保存单个结果
- model_id = res.get("model_id", "unknown")
- safe_id = re.sub(r"[^\w\-.]", "_", model_id)
- out_path = os.path.join(args.output_dir, f"{safe_id}.json")
- with open(out_path, "w", encoding="utf-8") as f:
- json.dump(res, f, ensure_ascii=False, indent=2)
- print(f"[INFO] 已保存: {out_path}")
- # 输出到 stdout
- print(json.dumps(all_results, ensure_ascii=False, indent=2))
- if __name__ == "__main__":
- main()
|