#!/usr/bin/env python3
"""
Generate a latency optimization markdown document from benchmark result JSON files.
Example:
python3 hack/perf/generate_latency_optimization_doc.py \
--title "Optimizing Qwen3-8B Latency" \
--model Qwen/Qwen3-8B-FP8 \
--baseline-file ./output/baseline.json \
--optimized-file ./output/optimized.json \
--group "Inference Engine=./output/vllm.json,./output/sglang.json" \
--group "Inference Engine=vLLM::./output/vllm.json,SGLang::./output/sglang.json" \
--group "Speculative Decoding=./output/speculative.json" \
--other-group "ShareGPT BS=1=./output/sharegpt_bs1_baseline.json,./output/sharegpt_bs1_optimized.json" \
--other-group "ShareGPT BS=2=./output/sharegpt_bs2_baseline.json,./output/sharegpt_bs2_optimized.json" \
--output ./output/latency_doc.md
"""
from __future__ import annotations
from perf_doc_common import (
BenchmarkRecord,
build_arg_parser,
collect_experimental_setup,
collect_profiles_from_results,
fmt_number,
load_record,
load_grouped_records,
request_success_rate,
render_benchmark_method_intro,
render_benchmark_result_block,
render_backend_parameters,
render_profile_config,
render_recommended_configuration,
render_record_section,
render_result_image,
render_setup_list,
render_open_source_replacement,
render_standard_note_block,
)
def fmt_latency_compare(baseline_value: float, candidate_value: float) -> str:
if baseline_value <= 0 or candidate_value <= 0:
return "N/A"
ratio = baseline_value / candidate_value
if ratio >= 1:
return (
f'({ratio:.2f}x faster)'
)
return f'({(1 / ratio):.2f}x slower)'
def is_successful_record(record: BenchmarkRecord) -> bool:
success_rate = request_success_rate(record)
return success_rate is None or success_rate >= 1
def latency_annotation(
baseline_latency: float,
candidate_latency: float,
candidate_record: BenchmarkRecord,
) -> str:
success_rate = request_success_rate(candidate_record)
if success_rate is not None and success_rate < 1:
return (
''
f'(Success rate: {success_rate * 100:.1f}%, optimization skipped)'
""
)
return fmt_latency_compare(baseline_latency, candidate_latency)
def format_latency_delta_summary(
metric_name: str, faster_value: float, slower_value: float
) -> str:
delta = slower_value - faster_value
if faster_value <= 0 or slower_value <= 0:
return f"{metric_name} = {fmt_number(faster_value)} ms vs {fmt_number(slower_value)} ms"
if abs(delta) < 1e-9:
return f"{metric_name} = {fmt_number(faster_value)} ms vs {fmt_number(slower_value)} ms, unchanged"
if delta > 0:
ratio = slower_value / faster_value
return (
f"{metric_name} = {fmt_number(faster_value)} ms vs {fmt_number(slower_value)} ms, "
f"reduced by {fmt_number(delta)} ms ({fmt_number(ratio)}x faster)"
)
increase = abs(delta)
ratio = faster_value / slower_value
return (
f"{metric_name} = {fmt_number(faster_value)} ms vs {fmt_number(slower_value)} ms, "
f"increased by {fmt_number(increase)} ms ({fmt_number(ratio)}x slower)"
)
def build_summary_rows(
baseline: BenchmarkRecord, records: list[BenchmarkRecord]
) -> list[str]:
baseline_latency = float(baseline.payload.get("request_latency_mean") or 0)
best_by_group: dict[str, BenchmarkRecord] = {}
for record in records:
if not is_successful_record(record):
continue
current_best = best_by_group.get(record.group_name)
record_latency = float(
record.payload.get("request_latency_mean") or float("inf")
)
if current_best is None:
best_by_group[record.group_name] = record
continue
current_best_latency = float(
current_best.payload.get("request_latency_mean") or float("inf")
)
if record_latency < current_best_latency:
best_by_group[record.group_name] = record
rows = [
"| Benchmark Case | Group | Optimized | Baseline | Comparison |",
"|---|---|---:|---:|---|",
]
for record in best_by_group.values():
candidate_latency = float(record.payload.get("request_latency_mean") or 0)
benchmark_case = record.profile
if record.request_rate is not None:
benchmark_case = f"{benchmark_case} (r={record.request_rate})"
rows.append(
"| "
+ f"{benchmark_case} | {record.group_name} | "
+ f"{candidate_latency:.2f}s | {baseline_latency:.2f}s | "
+ f"{latency_annotation(baseline_latency, candidate_latency, record)} |"
)
return rows
def build_profile_comparison_rows(
other_grouped_records: list[tuple[str, list[BenchmarkRecord]]],
) -> list[str]:
rows = [
"| Benchmark Case | Baseline (vLLM without any optimizations) | Optimized |",
"|----------|-------------------------------------------|-----------|",
]
for profile_name, records in other_grouped_records:
if len(records) < 2:
continue
baseline, optimized = records[0], records[1]
baseline_latency = float(baseline.payload.get("request_latency_mean") or 0)
optimized_latency = float(optimized.payload.get("request_latency_mean") or 0)
rows.append(
"| "
+ f"**{profile_name}** | "
+ f"Mean latency: {baseline_latency:.2f}s/req | "
+ f"Mean latency: {optimized_latency:.2f}s/req "
+ f"{latency_annotation(baseline_latency, optimized_latency, optimized)} |"
)
return rows
def record_display_name(record: BenchmarkRecord) -> str:
return record.custom_title or record.name
def render_group_latency_summary(records: list[BenchmarkRecord]) -> str:
successful_records = [record for record in records if is_successful_record(record)]
if len(successful_records) < 2:
return ""
ranked_records = sorted(
successful_records,
key=lambda record: float(
record.payload.get("request_latency_mean") or float("inf")
),
)
fastest = ranked_records[0]
slowest = ranked_records[-1]
fastest_latency = float(fastest.payload.get("request_latency_mean") or 0)
slowest_latency = float(slowest.payload.get("request_latency_mean") or 0)
fastest_ttft = float(fastest.payload.get("time_to_first_token_mean") or 0)
slowest_ttft = float(slowest.payload.get("time_to_first_token_mean") or 0)
fastest_tpot = float(fastest.payload.get("time_per_output_token_mean") or 0)
slowest_tpot = float(slowest.payload.get("time_per_output_token_mean") or 0)
if fastest_latency <= 0 or slowest_latency <= 0:
return ""
latency_delta = slowest_latency - fastest_latency
latency_ratio = slowest_latency / fastest_latency
summary = (
f"- Summary: `{record_display_name(fastest)}` Mean Latency = {fmt_number(fastest_latency)}s, "
f"`{record_display_name(slowest)}` Mean Latency = {fmt_number(slowest_latency)}s. "
f"`{record_display_name(fastest)}` is faster by {fmt_number(latency_delta)}s "
f"({fmt_number(latency_ratio)}x faster)."
)
extra_metrics: list[str] = []
if slowest_ttft > 0 and fastest_ttft > 0:
extra_metrics.append(
format_latency_delta_summary("TTFT", fastest_ttft, slowest_ttft)
)
if slowest_tpot > 0 and fastest_tpot > 0:
extra_metrics.append(
format_latency_delta_summary("TPOT", fastest_tpot, slowest_tpot)
)
if extra_metrics:
summary += " " + "; ".join(extra_metrics) + "."
return summary
def render_other_group_pair_section(
group_name: str, records: list[BenchmarkRecord]
) -> str:
if not records:
return ""
lines = [f"#### {group_name}", ""]
if len(records) >= 1:
baseline = records[0]
baseline_instance = baseline.payload.get("snapshot", {}).get("instances", {})
if isinstance(baseline_instance, dict) and baseline_instance:
baseline_instance = next(iter(baseline_instance.values()))
else:
baseline_instance = {}
lines.extend(
[
f"- Baseline Backend Parameters:{render_backend_parameters(baseline_instance)}",
"",
render_benchmark_result_block(baseline.payload).replace(
'??? info "Benchmark result"',
'??? info "Baseline benchmark result"',
1,
),
"",
]
)
if len(records) >= 2:
optimized = records[1]
optimized_instance = optimized.payload.get("snapshot", {}).get("instances", {})
if isinstance(optimized_instance, dict) and optimized_instance:
optimized_instance = next(iter(optimized_instance.values()))
else:
optimized_instance = {}
lines.extend(
[
f"- Optimized Backend Parameters:{render_backend_parameters(optimized_instance)}",
"",
render_benchmark_result_block(optimized.payload).replace(
'??? info "Benchmark result"',
'??? info "Optimized benchmark result"',
1,
),
"",
]
)
return "\n".join(lines)
def generate_latency_markdown(
title: str,
baseline: BenchmarkRecord,
grouped_records: list[tuple[str, list[BenchmarkRecord]]],
other_grouped_records: list[tuple[str, list[BenchmarkRecord]]],
declared_models: list[str] | None = None,
optimized_record: BenchmarkRecord | None = None,
image_name: str = "replace-this-image.png",
) -> str:
summary_records = [record for _, records in grouped_records for record in records]
all_records = summary_records + [
record for _, records in other_grouped_records for record in records
]
summary_rows = build_profile_comparison_rows(other_grouped_records)
optimization_rows = build_summary_rows(baseline, summary_records)
models, hardware, engine_versions = collect_experimental_setup(
all_records, declared_models=declared_models
)
profile_configs = collect_profiles_from_results(all_records)
used_profiles = list(profile_configs.keys())
sections = [
f"# {title}",
"",
"## Conclusion",
"",
render_result_image("Latency Optimization Result", image_name),
"",
*render_recommended_configuration(
"latency", optimized_record, declared_models=declared_models
),
"Comparison of benchmark results before and after optimization:",
"",
*summary_rows,
"",
*render_standard_note_block(),
"## Experimental Setup",
"",
*render_setup_list("Model", models),
*render_setup_list("Hardware", hardware),
*render_setup_list("Engine Version", engine_versions),
*render_benchmark_method_intro(),
]
for profile_name in used_profiles:
sections.extend(
render_profile_config(profile_name, profile_configs.get(profile_name))
)
sections.extend(render_open_source_replacement(profile_configs))
sections.extend(
[
"## Experiment Results",
"",
]
)
for group_name, records in grouped_records:
sections.append(f"### {group_name}")
sections.append("")
include_subheading = len(records) > 1
for record in records:
sections.append(
render_record_section(record, include_heading=include_subheading)
)
group_summary = render_group_latency_summary(records)
if group_summary:
sections.extend([group_summary, ""])
sections.extend(
[
"### Summary of Optimization Options",
"",
"| Benchmark Case | Group | Optimized | Baseline | Comparison |",
"|---|---|---:|---:|---|",
*optimization_rows[2:],
"",
]
)
if other_grouped_records:
sections.extend(
[
"### Other Benchmark Cases",
"",
]
)
for group_name, records in other_grouped_records:
rendered = render_other_group_pair_section(group_name, records)
if rendered:
sections.append(rendered)
return "\n".join(sections).rstrip() + "\n"
def main() -> None:
args = build_arg_parser("latency").parse_args()
baseline_record, grouped_records, other_grouped_records = load_grouped_records(
args.baseline_file, args.group, args.other_group
)
optimized_record = load_record(args.optimized_file) if args.optimized_file else None
output_path = args.output
from pathlib import Path
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text(
generate_latency_markdown(
args.title,
baseline_record,
grouped_records,
other_grouped_records,
declared_models=args.model,
optimized_record=optimized_record,
image_name=args.image_name,
),
encoding="utf-8",
)
print(f"Generated markdown document at {output_file}")
if __name__ == "__main__":
main()