"""
price_parser.py
统一价格结构,所有模型类型输出相同字段:
tier_min : 档位下限(token数 或 秒数),0 表示从0开始,None 表示无档位
tier_max : 档位上限(token数 或 秒数),None 表示无上限
tier_unit : 档位单位,"tokens" 或 "seconds",None 表示无档位
input_price : 输入价格(元/百万tokens 或 0),视频/图像为 0
output_price : 输出价格(元/百万tokens)或视频/图像的生成价格
currency : "CNY"
unit : 计费单位原始字符串
label : 原始 key
视频规格 -> 秒数映射:
480P -> 0 ~ 480
720P -> 0 ~ 720 (或 481 ~ 720)
1080P -> 0 ~ 1080 (或 721 ~ 1080)
4K -> 0 ~ 2160
"""
from __future__ import annotations
import re
from typing import Any, Dict, List, Optional
# ── 视频规格 -> 最大秒数 ────────────────────────────────────────────────────────
_VIDEO_SPEC_MAX: Dict[str, int] = {
"480P": 480,
"480p": 480,
"720P": 720,
"720p": 720,
"1080P": 1080,
"1080p": 1080,
"2K": 1440,
"4K": 2160,
}
# 非 token 计费单位
_NON_TOKEN_UNITS = re.compile(r"每秒|每张|每次|每帧|/秒|/张|/次", re.I)
# token 阶梯 key 正则
# 情况1:input<=128k / 32k Optional[int]:
"""把 '32k'/'128K'/'1M' 转成 token 整数。"""
s = str(val).strip().upper().replace(",", "")
m = re.match(r"^([\d.]+)\s*([KMG]?)$", s)
if not m:
return None
num = float(m.group(1))
suffix = m.group(2)
if suffix == "K":
return int(num * 1_000)
if suffix == "M":
return int(num * 1_000_000)
return int(num)
def _parse_price(obj: Any) -> Optional[float]:
if isinstance(obj, (int, float)):
return float(obj)
if isinstance(obj, dict):
v = obj.get("price")
if v is not None:
try:
return float(v)
except (TypeError, ValueError):
pass
return None
def _parse_unit(obj: Any) -> Optional[str]:
if isinstance(obj, dict):
return obj.get("unit")
return None
def _parse_tier_key(key: str):
"""解析 token 阶梯 key,返回 (min_tokens, max_tokens) 或 None。"""
k = key.strip().lower().replace(" ", "")
m = _TIER_RE.match(k)
if m:
lo_str, hi_str = m.group(1), m.group(2)
lo = _to_tokens(lo_str) if lo_str else 0
hi = _to_tokens(hi_str) if hi_str else None
return (lo, hi)
# 只有下限:256k Optional[str]:
"""从 label 中提取视频规格,如 '视频生成(720P)' -> '720P'。"""
m = re.search(r"[((]([^))]+)[))]", label)
if m:
spec = m.group(1).strip()
if spec.upper() in {k.upper() for k in _VIDEO_SPEC_MAX}:
return spec.upper()
# 直接在 label 里找
for spec in _VIDEO_SPEC_MAX:
if spec.upper() in label.upper():
return spec.upper()
return None
def _build_video_tiers(items: List[Dict]) -> List[Dict]:
"""
把多个视频规格条目转成连续区间:
720P(0.6) + 1080P(1.0) ->
[0, 720, input=0, output=0.6]
[721, 1080, input=0, output=1.0]
"""
# 按 tier_max 排序
sorted_items = sorted(items, key=lambda x: x["_spec_max"])
result = []
prev_max = 0
for item in sorted_items:
spec_max = item["_spec_max"]
result.append({
"label": item["label"],
"tier_min": prev_max + (1 if prev_max > 0 else 0),
"tier_max": spec_max,
"tier_unit": "seconds",
"input_price": 0.0,
"output_price": item["price"],
"currency": item["currency"],
"unit": item["unit"],
})
prev_max = spec_max
return result
def parse_prices(prices: Dict[str, Any]) -> List[Dict]:
result: List[Dict] = []
video_items: List[Dict] = []
input_entry: Optional[Dict] = None
for key, val in prices.items():
# ── token 阶梯 ──
tier = _parse_tier_key(key)
if tier is not None and isinstance(val, dict):
entry: Dict = {
"label": key,
"tier_min": tier[0],
"tier_max": tier[1],
"tier_unit": "tokens",
"input_price": None,
"output_price": None,
"currency": "CNY",
"unit": None,
}
for sub_key, sub_val in val.items():
sk = sub_key.strip()
price = _parse_price(sub_val)
unit = _parse_unit(sub_val)
if unit:
entry["unit"] = unit
if re.match(r"^输入$|^input$", sk, re.I):
entry["input_price"] = price
elif re.match(r"^输出$|^output$", sk, re.I):
entry["output_price"] = price
result.append(entry)
continue
if not isinstance(val, dict):
continue
price = _parse_price(val)
unit = _parse_unit(val)
# ── 视频/图像按单位计费 ──
if _NON_TOKEN_UNITS.search(unit or ""):
spec = _extract_video_spec(key)
if spec and spec in _VIDEO_SPEC_MAX:
video_items.append({
"label": key,
"_spec_max": _VIDEO_SPEC_MAX[spec],
"price": price,
"currency": "CNY",
"unit": unit,
})
else:
# 未知规格,直接输出
result.append({
"label": key,
"tier_min": None,
"tier_max": None,
"tier_unit": None,
"input_price": 0.0,
"output_price": price,
"currency": "CNY",
"unit": unit,
})
continue
# ── 简单非阶梯(输入/输出) ──
if re.match(r"^输入$|^input$", key.strip(), re.I):
input_entry = {"price": price, "unit": unit}
continue
if re.match(r"^输出$|^output$", key.strip(), re.I):
result.append({
"label": "input/output",
"tier_min": None,
"tier_max": None,
"tier_unit": None,
"input_price": input_entry["price"] if input_entry else None,
"output_price": price,
"currency": "CNY",
"unit": unit or (input_entry["unit"] if input_entry else None),
})
input_entry = None
continue
# 其他普通标签
result.append({
"label": key,
"tier_min": None,
"tier_max": None,
"tier_unit": None,
"input_price": price,
"output_price": None,
"currency": "CNY",
"unit": unit,
})
# 处理只有输入没有输出的情况
if input_entry:
result.append({
"label": "input",
"tier_min": None,
"tier_max": None,
"tier_unit": None,
"input_price": input_entry["price"],
"output_price": None,
"currency": "CNY",
"unit": input_entry["unit"],
})
# 把视频条目转成连续区间
if video_items:
result.extend(_build_video_tiers(video_items))
return result