""" price_parser.py 统一价格结构,所有模型类型输出相同字段: tier_min : 档位下限(token数 或 秒数),0 表示从0开始,None 表示无档位 tier_max : 档位上限(token数 或 秒数),None 表示无上限 tier_unit : 档位单位,"tokens" 或 "seconds",None 表示无档位 input_price : 输入价格(元/百万tokens 或 0),视频/图像为 0 output_price : 输出价格(元/百万tokens)或视频/图像的生成价格 currency : "CNY" unit : 计费单位原始字符串 label : 原始 key 视频规格 -> 秒数映射: 480P -> 0 ~ 480 720P -> 0 ~ 720 (或 481 ~ 720) 1080P -> 0 ~ 1080 (或 721 ~ 1080) 4K -> 0 ~ 2160 """ from __future__ import annotations import re from typing import Any, Dict, List, Optional # ── 视频规格 -> 最大秒数 ──────────────────────────────────────────────────────── _VIDEO_SPEC_MAX: Dict[str, int] = { "480P": 480, "480p": 480, "720P": 720, "720p": 720, "1080P": 1080, "1080p": 1080, "2K": 1440, "4K": 2160, } # 非 token 计费单位 _NON_TOKEN_UNITS = re.compile(r"每秒|每张|每次|每帧|/秒|/张|/次", re.I) # token 阶梯 key 正则 # 情况1:input<=128k / 32k Optional[int]: """把 '32k'/'128K'/'1M' 转成 token 整数。""" s = str(val).strip().upper().replace(",", "") m = re.match(r"^([\d.]+)\s*([KMG]?)$", s) if not m: return None num = float(m.group(1)) suffix = m.group(2) if suffix == "K": return int(num * 1_000) if suffix == "M": return int(num * 1_000_000) return int(num) def _parse_price(obj: Any) -> Optional[float]: if isinstance(obj, (int, float)): return float(obj) if isinstance(obj, dict): v = obj.get("price") if v is not None: try: return float(v) except (TypeError, ValueError): pass return None def _parse_unit(obj: Any) -> Optional[str]: if isinstance(obj, dict): return obj.get("unit") return None def _parse_tier_key(key: str): """解析 token 阶梯 key,返回 (min_tokens, max_tokens) 或 None。""" k = key.strip().lower().replace(" ", "") m = _TIER_RE.match(k) if m: lo_str, hi_str = m.group(1), m.group(2) lo = _to_tokens(lo_str) if lo_str else 0 hi = _to_tokens(hi_str) if hi_str else None return (lo, hi) # 只有下限:256k Optional[str]: """从 label 中提取视频规格,如 '视频生成(720P)' -> '720P'。""" m = re.search(r"[((]([^))]+)[))]", label) if m: spec = m.group(1).strip() if spec.upper() in {k.upper() for k in _VIDEO_SPEC_MAX}: return spec.upper() # 直接在 label 里找 for spec in _VIDEO_SPEC_MAX: if spec.upper() in label.upper(): return spec.upper() return None def _build_video_tiers(items: List[Dict]) -> List[Dict]: """ 把多个视频规格条目转成连续区间: 720P(0.6) + 1080P(1.0) -> [0, 720, input=0, output=0.6] [721, 1080, input=0, output=1.0] """ # 按 tier_max 排序 sorted_items = sorted(items, key=lambda x: x["_spec_max"]) result = [] prev_max = 0 for item in sorted_items: spec_max = item["_spec_max"] result.append({ "label": item["label"], "tier_min": prev_max + (1 if prev_max > 0 else 0), "tier_max": spec_max, "tier_unit": "seconds", "input_price": 0.0, "output_price": item["price"], "currency": item["currency"], "unit": item["unit"], }) prev_max = spec_max return result def parse_prices(prices: Dict[str, Any]) -> List[Dict]: result: List[Dict] = [] video_items: List[Dict] = [] input_entry: Optional[Dict] = None for key, val in prices.items(): # ── token 阶梯 ── tier = _parse_tier_key(key) if tier is not None and isinstance(val, dict): entry: Dict = { "label": key, "tier_min": tier[0], "tier_max": tier[1], "tier_unit": "tokens", "input_price": None, "output_price": None, "currency": "CNY", "unit": None, } for sub_key, sub_val in val.items(): sk = sub_key.strip() price = _parse_price(sub_val) unit = _parse_unit(sub_val) if unit: entry["unit"] = unit if re.match(r"^输入$|^input$", sk, re.I): entry["input_price"] = price elif re.match(r"^输出$|^output$", sk, re.I): entry["output_price"] = price result.append(entry) continue if not isinstance(val, dict): continue price = _parse_price(val) unit = _parse_unit(val) # ── 视频/图像按单位计费 ── if _NON_TOKEN_UNITS.search(unit or ""): spec = _extract_video_spec(key) if spec and spec in _VIDEO_SPEC_MAX: video_items.append({ "label": key, "_spec_max": _VIDEO_SPEC_MAX[spec], "price": price, "currency": "CNY", "unit": unit, }) else: # 未知规格,直接输出 result.append({ "label": key, "tier_min": None, "tier_max": None, "tier_unit": None, "input_price": 0.0, "output_price": price, "currency": "CNY", "unit": unit, }) continue # ── 简单非阶梯(输入/输出) ── if re.match(r"^输入$|^input$", key.strip(), re.I): input_entry = {"price": price, "unit": unit} continue if re.match(r"^输出$|^output$", key.strip(), re.I): result.append({ "label": "input/output", "tier_min": None, "tier_max": None, "tier_unit": None, "input_price": input_entry["price"] if input_entry else None, "output_price": price, "currency": "CNY", "unit": unit or (input_entry["unit"] if input_entry else None), }) input_entry = None continue # 其他普通标签 result.append({ "label": key, "tier_min": None, "tier_max": None, "tier_unit": None, "input_price": price, "output_price": None, "currency": "CNY", "unit": unit, }) # 处理只有输入没有输出的情况 if input_entry: result.append({ "label": "input", "tier_min": None, "tier_max": None, "tier_unit": None, "input_price": input_entry["price"], "output_price": None, "currency": "CNY", "unit": input_entry["unit"], }) # 把视频条目转成连续区间 if video_items: result.extend(_build_video_tiers(video_items)) return result