price_parser.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. """
  2. price_parser.py
  3. 统一价格结构,所有模型类型输出相同字段:
  4. tier_min : 档位下限(token数 或 秒数),0 表示从0开始,None 表示无档位
  5. tier_max : 档位上限(token数 或 秒数),None 表示无上限
  6. tier_unit : 档位单位,"tokens" 或 "seconds",None 表示无档位
  7. input_price : 输入价格(元/百万tokens 或 0),视频/图像为 0
  8. output_price : 输出价格(元/百万tokens)或视频/图像的生成价格
  9. currency : "CNY"
  10. unit : 计费单位原始字符串
  11. label : 原始 key
  12. 视频规格 -> 秒数映射:
  13. 480P -> 0 ~ 480
  14. 720P -> 0 ~ 720 (或 481 ~ 720)
  15. 1080P -> 0 ~ 1080 (或 721 ~ 1080)
  16. 4K -> 0 ~ 2160
  17. """
  18. from __future__ import annotations
  19. import re
  20. from typing import Any, Dict, List, Optional
  21. # ── 视频规格 -> 最大秒数 ────────────────────────────────────────────────────────
  22. _VIDEO_SPEC_MAX: Dict[str, int] = {
  23. "480P": 480,
  24. "480p": 480,
  25. "720P": 720,
  26. "720p": 720,
  27. "1080P": 1080,
  28. "1080p": 1080,
  29. "2K": 1440,
  30. "4K": 2160,
  31. }
  32. # 非 token 计费单位
  33. _NON_TOKEN_UNITS = re.compile(r"每秒|每张|每次|每帧|/秒|/张|/次", re.I)
  34. # token 阶梯 key 正则
  35. # 情况1:input<=128k / 32k<input<=128k(有上限)
  36. _TIER_RE = re.compile(
  37. r"^(?:([\d.]+[KkMm]?)\s*<\s*)?(?:input|输入)\s*<=?\s*([\d.]+[KkMm]?)$",
  38. re.I,
  39. )
  40. # 情况2:256k<input(只有下限,无上限)
  41. _TIER_RE_LO_ONLY = re.compile(
  42. r"^([\d.]+[KkMm]?)\s*<\s*(?:input|输入)$",
  43. re.I,
  44. )
  45. def _to_tokens(val: str) -> Optional[int]:
  46. """把 '32k'/'128K'/'1M' 转成 token 整数。"""
  47. s = str(val).strip().upper().replace(",", "")
  48. m = re.match(r"^([\d.]+)\s*([KMG]?)$", s)
  49. if not m:
  50. return None
  51. num = float(m.group(1))
  52. suffix = m.group(2)
  53. if suffix == "K":
  54. return int(num * 1_000)
  55. if suffix == "M":
  56. return int(num * 1_000_000)
  57. return int(num)
  58. def _parse_price(obj: Any) -> Optional[float]:
  59. if isinstance(obj, (int, float)):
  60. return float(obj)
  61. if isinstance(obj, dict):
  62. v = obj.get("price")
  63. if v is not None:
  64. try:
  65. return float(v)
  66. except (TypeError, ValueError):
  67. pass
  68. return None
  69. def _parse_unit(obj: Any) -> Optional[str]:
  70. if isinstance(obj, dict):
  71. return obj.get("unit")
  72. return None
  73. def _parse_tier_key(key: str):
  74. """解析 token 阶梯 key,返回 (min_tokens, max_tokens) 或 None。"""
  75. k = key.strip().lower().replace(" ", "")
  76. m = _TIER_RE.match(k)
  77. if m:
  78. lo_str, hi_str = m.group(1), m.group(2)
  79. lo = _to_tokens(lo_str) if lo_str else 0
  80. hi = _to_tokens(hi_str) if hi_str else None
  81. return (lo, hi)
  82. # 只有下限:256k<input
  83. m2 = _TIER_RE_LO_ONLY.match(k)
  84. if m2:
  85. lo = _to_tokens(m2.group(1))
  86. return (lo, None)
  87. return None
  88. def _extract_video_spec(label: str) -> Optional[str]:
  89. """从 label 中提取视频规格,如 '视频生成(720P)' -> '720P'。"""
  90. m = re.search(r"[((]([^))]+)[))]", label)
  91. if m:
  92. spec = m.group(1).strip()
  93. if spec.upper() in {k.upper() for k in _VIDEO_SPEC_MAX}:
  94. return spec.upper()
  95. # 直接在 label 里找
  96. for spec in _VIDEO_SPEC_MAX:
  97. if spec.upper() in label.upper():
  98. return spec.upper()
  99. return None
  100. def _build_video_tiers(items: List[Dict]) -> List[Dict]:
  101. """
  102. 把多个视频规格条目转成连续区间:
  103. 720P(0.6) + 1080P(1.0) ->
  104. [0, 720, input=0, output=0.6]
  105. [721, 1080, input=0, output=1.0]
  106. """
  107. # 按 tier_max 排序
  108. sorted_items = sorted(items, key=lambda x: x["_spec_max"])
  109. result = []
  110. prev_max = 0
  111. for item in sorted_items:
  112. spec_max = item["_spec_max"]
  113. result.append({
  114. "label": item["label"],
  115. "tier_min": prev_max + (1 if prev_max > 0 else 0),
  116. "tier_max": spec_max,
  117. "tier_unit": "seconds",
  118. "input_price": 0.0,
  119. "output_price": item["price"],
  120. "currency": item["currency"],
  121. "unit": item["unit"],
  122. })
  123. prev_max = spec_max
  124. return result
  125. def parse_prices(prices: Dict[str, Any]) -> List[Dict]:
  126. result: List[Dict] = []
  127. video_items: List[Dict] = []
  128. input_entry: Optional[Dict] = None
  129. for key, val in prices.items():
  130. # ── token 阶梯 ──
  131. tier = _parse_tier_key(key)
  132. if tier is not None and isinstance(val, dict):
  133. entry: Dict = {
  134. "label": key,
  135. "tier_min": tier[0],
  136. "tier_max": tier[1],
  137. "tier_unit": "tokens",
  138. "input_price": None,
  139. "output_price": None,
  140. "currency": "CNY",
  141. "unit": None,
  142. }
  143. for sub_key, sub_val in val.items():
  144. sk = sub_key.strip()
  145. price = _parse_price(sub_val)
  146. unit = _parse_unit(sub_val)
  147. if unit:
  148. entry["unit"] = unit
  149. if re.match(r"^输入$|^input$", sk, re.I):
  150. entry["input_price"] = price
  151. elif re.match(r"^输出$|^output$", sk, re.I):
  152. entry["output_price"] = price
  153. result.append(entry)
  154. continue
  155. if not isinstance(val, dict):
  156. continue
  157. price = _parse_price(val)
  158. unit = _parse_unit(val)
  159. # ── 视频/图像按单位计费 ──
  160. if _NON_TOKEN_UNITS.search(unit or ""):
  161. spec = _extract_video_spec(key)
  162. if spec and spec in _VIDEO_SPEC_MAX:
  163. video_items.append({
  164. "label": key,
  165. "_spec_max": _VIDEO_SPEC_MAX[spec],
  166. "price": price,
  167. "currency": "CNY",
  168. "unit": unit,
  169. })
  170. else:
  171. # 未知规格,直接输出
  172. result.append({
  173. "label": key,
  174. "tier_min": None,
  175. "tier_max": None,
  176. "tier_unit": None,
  177. "input_price": 0.0,
  178. "output_price": price,
  179. "currency": "CNY",
  180. "unit": unit,
  181. })
  182. continue
  183. # ── 简单非阶梯(输入/输出) ──
  184. if re.match(r"^输入$|^input$", key.strip(), re.I):
  185. input_entry = {"price": price, "unit": unit}
  186. continue
  187. if re.match(r"^输出$|^output$", key.strip(), re.I):
  188. result.append({
  189. "label": "input/output",
  190. "tier_min": None,
  191. "tier_max": None,
  192. "tier_unit": None,
  193. "input_price": input_entry["price"] if input_entry else None,
  194. "output_price": price,
  195. "currency": "CNY",
  196. "unit": unit or (input_entry["unit"] if input_entry else None),
  197. })
  198. input_entry = None
  199. continue
  200. # 其他普通标签
  201. result.append({
  202. "label": key,
  203. "tier_min": None,
  204. "tier_max": None,
  205. "tier_unit": None,
  206. "input_price": price,
  207. "output_price": None,
  208. "currency": "CNY",
  209. "unit": unit,
  210. })
  211. # 处理只有输入没有输出的情况
  212. if input_entry:
  213. result.append({
  214. "label": "input",
  215. "tier_min": None,
  216. "tier_max": None,
  217. "tier_unit": None,
  218. "input_price": input_entry["price"],
  219. "output_price": None,
  220. "currency": "CNY",
  221. "unit": input_entry["unit"],
  222. })
  223. # 把视频条目转成连续区间
  224. if video_items:
  225. result.extend(_build_video_tiers(video_items))
  226. return result