| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- """
- price_parser.py
- 统一价格结构,所有模型类型输出相同字段:
- tier_min : 档位下限(token数 或 秒数),0 表示从0开始,None 表示无档位
- tier_max : 档位上限(token数 或 秒数),None 表示无上限
- tier_unit : 档位单位,"tokens" 或 "seconds",None 表示无档位
- input_price : 输入价格(元/百万tokens 或 0),视频/图像为 0
- output_price : 输出价格(元/百万tokens)或视频/图像的生成价格
- currency : "CNY"
- unit : 计费单位原始字符串
- label : 原始 key
- 视频规格 -> 秒数映射:
- 480P -> 0 ~ 480
- 720P -> 0 ~ 720 (或 481 ~ 720)
- 1080P -> 0 ~ 1080 (或 721 ~ 1080)
- 4K -> 0 ~ 2160
- """
- from __future__ import annotations
- import re
- from typing import Any, Dict, List, Optional
- # ── 视频规格 -> 最大秒数 ────────────────────────────────────────────────────────
- _VIDEO_SPEC_MAX: Dict[str, int] = {
- "480P": 480,
- "480p": 480,
- "720P": 720,
- "720p": 720,
- "1080P": 1080,
- "1080p": 1080,
- "2K": 1440,
- "4K": 2160,
- }
- # 非 token 计费单位
- _NON_TOKEN_UNITS = re.compile(r"每秒|每张|每次|每帧|/秒|/张|/次", re.I)
- # token 阶梯 key 正则
- # 情况1:input<=128k / 32k<input<=128k(有上限)
- _TIER_RE = re.compile(
- r"^(?:([\d.]+[KkMm]?)\s*<\s*)?(?:input|输入)\s*<=?\s*([\d.]+[KkMm]?)$",
- re.I,
- )
- # 情况2:256k<input(只有下限,无上限)
- _TIER_RE_LO_ONLY = re.compile(
- r"^([\d.]+[KkMm]?)\s*<\s*(?:input|输入)$",
- re.I,
- )
- def _to_tokens(val: str) -> Optional[int]:
- """把 '32k'/'128K'/'1M' 转成 token 整数。"""
- s = str(val).strip().upper().replace(",", "")
- m = re.match(r"^([\d.]+)\s*([KMG]?)$", s)
- if not m:
- return None
- num = float(m.group(1))
- suffix = m.group(2)
- if suffix == "K":
- return int(num * 1_000)
- if suffix == "M":
- return int(num * 1_000_000)
- return int(num)
- def _parse_price(obj: Any) -> Optional[float]:
- if isinstance(obj, (int, float)):
- return float(obj)
- if isinstance(obj, dict):
- v = obj.get("price")
- if v is not None:
- try:
- return float(v)
- except (TypeError, ValueError):
- pass
- return None
- def _parse_unit(obj: Any) -> Optional[str]:
- if isinstance(obj, dict):
- return obj.get("unit")
- return None
- def _parse_tier_key(key: str):
- """解析 token 阶梯 key,返回 (min_tokens, max_tokens) 或 None。"""
- k = key.strip().lower().replace(" ", "")
- m = _TIER_RE.match(k)
- if m:
- lo_str, hi_str = m.group(1), m.group(2)
- lo = _to_tokens(lo_str) if lo_str else 0
- hi = _to_tokens(hi_str) if hi_str else None
- return (lo, hi)
- # 只有下限:256k<input
- m2 = _TIER_RE_LO_ONLY.match(k)
- if m2:
- lo = _to_tokens(m2.group(1))
- return (lo, None)
- return None
- def _extract_video_spec(label: str) -> Optional[str]:
- """从 label 中提取视频规格,如 '视频生成(720P)' -> '720P'。"""
- m = re.search(r"[((]([^))]+)[))]", label)
- if m:
- spec = m.group(1).strip()
- if spec.upper() in {k.upper() for k in _VIDEO_SPEC_MAX}:
- return spec.upper()
- # 直接在 label 里找
- for spec in _VIDEO_SPEC_MAX:
- if spec.upper() in label.upper():
- return spec.upper()
- return None
- def _build_video_tiers(items: List[Dict]) -> List[Dict]:
- """
- 把多个视频规格条目转成连续区间:
- 720P(0.6) + 1080P(1.0) ->
- [0, 720, input=0, output=0.6]
- [721, 1080, input=0, output=1.0]
- """
- # 按 tier_max 排序
- sorted_items = sorted(items, key=lambda x: x["_spec_max"])
- result = []
- prev_max = 0
- for item in sorted_items:
- spec_max = item["_spec_max"]
- result.append({
- "label": item["label"],
- "tier_min": prev_max + (1 if prev_max > 0 else 0),
- "tier_max": spec_max,
- "tier_unit": "seconds",
- "input_price": 0.0,
- "output_price": item["price"],
- "currency": item["currency"],
- "unit": item["unit"],
- })
- prev_max = spec_max
- return result
- def parse_prices(prices: Dict[str, Any]) -> List[Dict]:
- result: List[Dict] = []
- video_items: List[Dict] = []
- input_entry: Optional[Dict] = None
- for key, val in prices.items():
- # ── token 阶梯 ──
- tier = _parse_tier_key(key)
- if tier is not None and isinstance(val, dict):
- entry: Dict = {
- "label": key,
- "tier_min": tier[0],
- "tier_max": tier[1],
- "tier_unit": "tokens",
- "input_price": None,
- "output_price": None,
- "currency": "CNY",
- "unit": None,
- }
- for sub_key, sub_val in val.items():
- sk = sub_key.strip()
- price = _parse_price(sub_val)
- unit = _parse_unit(sub_val)
- if unit:
- entry["unit"] = unit
- if re.match(r"^输入$|^input$", sk, re.I):
- entry["input_price"] = price
- elif re.match(r"^输出$|^output$", sk, re.I):
- entry["output_price"] = price
- result.append(entry)
- continue
- if not isinstance(val, dict):
- continue
- price = _parse_price(val)
- unit = _parse_unit(val)
- # ── 视频/图像按单位计费 ──
- if _NON_TOKEN_UNITS.search(unit or ""):
- spec = _extract_video_spec(key)
- if spec and spec in _VIDEO_SPEC_MAX:
- video_items.append({
- "label": key,
- "_spec_max": _VIDEO_SPEC_MAX[spec],
- "price": price,
- "currency": "CNY",
- "unit": unit,
- })
- else:
- # 未知规格,直接输出
- result.append({
- "label": key,
- "tier_min": None,
- "tier_max": None,
- "tier_unit": None,
- "input_price": 0.0,
- "output_price": price,
- "currency": "CNY",
- "unit": unit,
- })
- continue
- # ── 简单非阶梯(输入/输出) ──
- if re.match(r"^输入$|^input$", key.strip(), re.I):
- input_entry = {"price": price, "unit": unit}
- continue
- if re.match(r"^输出$|^output$", key.strip(), re.I):
- result.append({
- "label": "input/output",
- "tier_min": None,
- "tier_max": None,
- "tier_unit": None,
- "input_price": input_entry["price"] if input_entry else None,
- "output_price": price,
- "currency": "CNY",
- "unit": unit or (input_entry["unit"] if input_entry else None),
- })
- input_entry = None
- continue
- # 其他普通标签
- result.append({
- "label": key,
- "tier_min": None,
- "tier_max": None,
- "tier_unit": None,
- "input_price": price,
- "output_price": None,
- "currency": "CNY",
- "unit": unit,
- })
- # 处理只有输入没有输出的情况
- if input_entry:
- result.append({
- "label": "input",
- "tier_min": None,
- "tier_max": None,
- "tier_unit": None,
- "input_price": input_entry["price"],
- "output_price": None,
- "currency": "CNY",
- "unit": input_entry["unit"],
- })
- # 把视频条目转成连续区间
- if video_items:
- result.extend(_build_video_tiers(video_items))
- return result
|