- gpu_specs: H100 新增 compute_pass_thresholds_tflops 字段 (fp32:54 / tf32:444 / fp16:734 / bf16:745 / fp8:1400), 与 marketing peak 解耦,作为绝对 TFLOPS PASS 门槛 - benchmark: compute 结果中透出 pass_thresholds_tflops 供 report 使用 - report: compute 判定改用绝对 TFLOPS (PASS ≥门槛 / WARN ≥门槛×90% / FAIL <门槛×90%);表头切换为 Threshold 列;Memory D2D verdict 由 50/30 收紧至 80/60;无阈值配置的 GPU 保留旧 % 效率逻辑 - nccl: _OP_BW_FRACTIONS 收紧至 AllReduce/AllGather/ReduceScatter 0.45、Broadcast/SendRecv 0.40、AllToAll 0.35,与验收文档 §5 一致 - configs: benchmark 默认 matrix_size 4096→8192、warmup 10→50、 iterations 100→500、use_compile 改 true;health temp_warning 80→75、temp_critical 90→85,匹配生产验收稳态温度要求 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
567 lines
26 KiB
Python
567 lines
26 KiB
Python
"""GPU benchmark module — nvbandwidth + PyTorch compute throughput."""
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Optional, List
|
|
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn
|
|
|
|
from modules.gpu_specs import detect_gpu_type, get_gpu_specs, get_gpu_label, resolve_tools_dir
|
|
|
|
TORCH_AVAILABLE = False
|
|
try:
|
|
import torch
|
|
if torch.cuda.is_available():
|
|
TORCH_AVAILABLE = True
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
class Benchmark:
|
|
|
|
def __init__(self, config: dict):
|
|
self.config = config
|
|
self.console = Console()
|
|
self.bench_cfg = config.get("benchmark", {})
|
|
self.tools_dir = resolve_tools_dir(config)
|
|
cfg_gpu_type = config.get("gpu_type", "auto")
|
|
self.gpu_type = cfg_gpu_type if cfg_gpu_type != "auto" else detect_gpu_type()
|
|
self.specs = get_gpu_specs(self.gpu_type)
|
|
self.gpu_label = get_gpu_label(self.gpu_type)
|
|
|
|
def run(self) -> dict:
|
|
results = {}
|
|
results.update(self.run_memory_benchmark())
|
|
results.update(self.run_compute_benchmark())
|
|
return results
|
|
|
|
def _find_nvbandwidth(self) -> Optional[str]:
|
|
# 1. System PATH
|
|
p = shutil.which("nvbandwidth")
|
|
if p:
|
|
return p
|
|
# 2. tools_dir
|
|
local = os.path.join(self.tools_dir, "nvbandwidth", "nvbandwidth")
|
|
if os.path.isfile(local) and os.access(local, os.X_OK):
|
|
return local
|
|
# 3. Common DCGM / system locations
|
|
extra_paths = [
|
|
"/usr/libexec/datacenter-gpu-manager-4/plugins/cuda12/nvbandwidth",
|
|
"/usr/libexec/datacenter-gpu-manager/plugins/cuda12/nvbandwidth",
|
|
"/usr/local/bin/nvbandwidth",
|
|
"/opt/nvidia/nvbandwidth/nvbandwidth",
|
|
]
|
|
for ep in extra_paths:
|
|
if os.path.isfile(ep) and os.access(ep, os.X_OK):
|
|
return ep
|
|
return None
|
|
|
|
def run_memory_benchmark(self) -> dict:
|
|
nvbw = self._find_nvbandwidth()
|
|
|
|
if nvbw:
|
|
return self._run_nvbandwidth(nvbw)
|
|
|
|
self.console.print("[yellow]nvbandwidth not found, falling back to PyTorch[/yellow]")
|
|
return self._run_memory_pytorch()
|
|
|
|
def _run_nvbandwidth(self, nvbw_path: str) -> dict:
|
|
mem_cfg = self.bench_cfg.get("memory", {})
|
|
buffer_mb = mem_cfg.get("nvbandwidth_buffer_mb", 512)
|
|
samples = mem_cfg.get("nvbandwidth_samples", 3)
|
|
|
|
self.console.print(f"[cyan]Memory Benchmark via nvbandwidth ({nvbw_path})[/cyan]")
|
|
|
|
results_by_test = {}
|
|
|
|
# Testcases to run — keys used internally, try both old and new names
|
|
testcases = [
|
|
("h2d", ["host_to_device_memcpy_ce", "host_to_device_memcpy_read_ce"]),
|
|
("d2h", ["device_to_host_memcpy_ce", "device_to_host_memcpy_write_ce"]),
|
|
("d2d_write", ["device_to_device_memcpy_write_ce"]),
|
|
("d2d_read", ["device_to_device_memcpy_read_ce"]),
|
|
("d2d_bidir", ["device_to_device_bidirectional_memcpy_write_sm",
|
|
"device_to_device_bidirectional_sm"]),
|
|
]
|
|
|
|
# Discover available testcase names
|
|
available_names: list[str] = []
|
|
try:
|
|
list_r = subprocess.run(
|
|
[nvbw_path, "-l"], capture_output=True, text=True, timeout=15,
|
|
)
|
|
if list_r.returncode == 0:
|
|
for line in list_r.stdout.splitlines():
|
|
line = line.strip()
|
|
if line and ", " in line and line[0].isdigit():
|
|
parts = line.split(", ", 1)
|
|
name = parts[1].rstrip(":").strip()
|
|
if name:
|
|
available_names.append(name)
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
pass
|
|
|
|
with Progress(
|
|
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(), TextColumn("{task.completed}/{task.total}"),
|
|
TimeElapsedColumn(), console=self.console,
|
|
) as progress:
|
|
task = progress.add_task("nvbandwidth tests...", total=len(testcases))
|
|
|
|
for key, name_candidates in testcases:
|
|
# Pick the first available test name
|
|
tc = None
|
|
for candidate in name_candidates:
|
|
if not available_names or candidate in available_names:
|
|
tc = candidate
|
|
break
|
|
if tc is None:
|
|
progress.advance(task)
|
|
continue
|
|
|
|
try:
|
|
# --disableAffinity skips nvbandwidth's CPU affinity setup, which
|
|
# calls nvmlDeviceGetHandleByUUID() — that lookup fails on hosts
|
|
# whose fabricmanager build doesn't expose the UUID format nvml
|
|
# expects (seen on H20-3e with custom 570.172.08-1 fabricmanager).
|
|
cmd = [nvbw_path, "--disableAffinity", "-t", tc,
|
|
"-b", str(buffer_mb), "-i", str(samples), "-j"]
|
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
|
|
|
|
if r.returncode == 0 and r.stdout.strip():
|
|
avg_bw = self._parse_nvbandwidth_json(r.stdout)
|
|
results_by_test[key] = round(avg_bw, 1)
|
|
else:
|
|
results_by_test[key] = 0
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
results_by_test[key] = 0
|
|
|
|
progress.advance(task)
|
|
|
|
d2d_bw = max(
|
|
results_by_test.get("d2d_write", 0),
|
|
results_by_test.get("d2d_read", 0),
|
|
results_by_test.get("d2d_bidir", 0),
|
|
)
|
|
h2d_bw = results_by_test.get("h2d", 0)
|
|
d2h_bw = results_by_test.get("d2h", 0)
|
|
|
|
# If every subtest returned 0 the nvbandwidth binary is broken on this host
|
|
# (e.g. CUDA_ERROR_INVALID_CONTEXT, NVML mismatch). Fall back to PyTorch.
|
|
if all(v == 0 for v in results_by_test.values()):
|
|
self.console.print(
|
|
"[yellow]nvbandwidth returned no usable data — "
|
|
"falling back to PyTorch memory benchmark[/yellow]"
|
|
)
|
|
return self._run_memory_pytorch()
|
|
|
|
# D2D goes through NVLink — compare to NVLink per-direction bandwidth
|
|
# (nvlink_bandwidth_gbps is bidirectional, so per-direction = /2)
|
|
nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0)
|
|
d2d_peak = nvlink_bw / 2 if nvlink_bw else 0
|
|
d2d_efficiency = round((d2d_bw / d2d_peak) * 100, 1) if (d2d_bw and d2d_peak) else None
|
|
|
|
# H2D/D2H goes through PCIe — estimate peak from PCIe gen
|
|
pcie_gen = self.specs.get("pcie_gen", 0)
|
|
pcie_peak = {3: 16, 4: 32, 5: 64, 6: 128}.get(pcie_gen, 32) if pcie_gen > 0 else 0 # GB/s x16
|
|
h2d_efficiency = round((h2d_bw / pcie_peak) * 100, 1) if (h2d_bw and pcie_peak) else None
|
|
d2h_efficiency = round((d2h_bw / pcie_peak) * 100, 1) if (d2h_bw and pcie_peak) else None
|
|
|
|
return {
|
|
"memory": {
|
|
"source": "nvbandwidth",
|
|
"h2d_bandwidth_gbps": round(h2d_bw, 1),
|
|
"d2h_bandwidth_gbps": round(d2h_bw, 1),
|
|
"d2d_bandwidth_gbps": round(d2d_bw, 1),
|
|
"h2d_peak_gbps": pcie_peak if pcie_peak else None,
|
|
"d2h_peak_gbps": pcie_peak if pcie_peak else None,
|
|
"d2d_peak_gbps": round(d2d_peak, 1) if d2d_peak else None,
|
|
"h2d_efficiency_pct": h2d_efficiency,
|
|
"d2h_efficiency_pct": d2h_efficiency,
|
|
"d2d_efficiency_pct": d2d_efficiency,
|
|
"peak_bandwidth_gbps": self.specs["memory_bandwidth_gbps"],
|
|
"efficiency_pct": d2d_efficiency,
|
|
"results_by_test": results_by_test,
|
|
"per_gpu": [],
|
|
}
|
|
}
|
|
|
|
@staticmethod
|
|
def _parse_nvbandwidth_json(raw: str) -> float:
|
|
"""Parse nvbandwidth JSON output (supports v0.5+ and v0.8+ formats)."""
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
return 0.0
|
|
|
|
# v0.8+ format: {"nvbandwidth": {"testcases": [{"bandwidth_matrix": [...], "sum": N}]}}
|
|
if isinstance(data, dict) and "nvbandwidth" in data:
|
|
testcases = data["nvbandwidth"].get("testcases", [])
|
|
for tc in testcases:
|
|
matrix = tc.get("bandwidth_matrix", [])
|
|
values = []
|
|
for row in matrix:
|
|
for cell in row:
|
|
try:
|
|
v = float(cell)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
# Exclude diagonal entries (intra-device, reported as 0 or
|
|
# N/A) so they don't drag the off-diagonal average down.
|
|
if v > 0:
|
|
values.append(v)
|
|
if values:
|
|
return sum(values) / len(values)
|
|
return 0.0
|
|
|
|
# v0.5 format: list of dicts with "results" array
|
|
entries = data if isinstance(data, list) else [data]
|
|
bw_values = []
|
|
for entry in entries:
|
|
if isinstance(entry, dict):
|
|
for row in entry.get("results", []):
|
|
val = row.get("value", 0)
|
|
if isinstance(val, (int, float)):
|
|
bw_values.append(val)
|
|
return sum(bw_values) / len(bw_values) if bw_values else 0.0
|
|
|
|
def _run_memory_pytorch(self) -> dict:
|
|
mem_cfg = self.bench_cfg.get("memory", {})
|
|
test_sizes_mb = [1, 4, 16, 64, 256, 1024, 4096]
|
|
iterations = mem_cfg.get("iterations", 10)
|
|
|
|
if not TORCH_AVAILABLE:
|
|
self.console.print("[yellow]PyTorch not available - skipping memory benchmark[/yellow]")
|
|
return {"memory": {"error": "pytorch_not_available"}}
|
|
|
|
gpu_count = torch.cuda.device_count()
|
|
self.console.print(f"[cyan]Memory Benchmark (PyTorch fallback) - {gpu_count} GPU(s)[/cyan]")
|
|
|
|
bandwidth_by_size = {}
|
|
|
|
with Progress(
|
|
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(), TextColumn("{task.completed}/{task.total}"),
|
|
TimeElapsedColumn(), console=self.console,
|
|
) as progress:
|
|
task = progress.add_task("Testing sizes...", total=len(test_sizes_mb))
|
|
|
|
for size_mb in test_sizes_mb:
|
|
size_bytes = size_mb * 1024 * 1024
|
|
h2d_times, d2h_times, d2d_times = [], [], []
|
|
x_cpu = torch.randn(size_bytes // 4, dtype=torch.float32)
|
|
|
|
for _ in range(iterations):
|
|
t0 = time.perf_counter()
|
|
x_gpu = x_cpu.cuda()
|
|
torch.cuda.synchronize()
|
|
h2d_times.append(time.perf_counter() - t0)
|
|
|
|
t0 = time.perf_counter()
|
|
x_gpu.cpu()
|
|
torch.cuda.synchronize()
|
|
d2h_times.append(time.perf_counter() - t0)
|
|
|
|
x_gpu2 = torch.randn_like(x_gpu)
|
|
t0 = time.perf_counter()
|
|
x_gpu2.copy_(x_gpu)
|
|
torch.cuda.synchronize()
|
|
d2d_times.append(time.perf_counter() - t0)
|
|
|
|
del x_gpu, x_gpu2
|
|
torch.cuda.empty_cache()
|
|
|
|
def median(lst):
|
|
s = sorted(lst)
|
|
return s[len(s) // 2]
|
|
|
|
def bw_gb(t, sz):
|
|
return (sz / t) / 1e9
|
|
|
|
bandwidth_by_size[str(size_mb)] = {
|
|
"h2d_gbps": round(bw_gb(median(h2d_times), size_bytes), 1),
|
|
"d2h_gbps": round(bw_gb(median(d2h_times), size_bytes), 1),
|
|
"d2d_gbps": round(bw_gb(median(d2d_times), size_bytes), 1),
|
|
}
|
|
progress.advance(task)
|
|
|
|
best_d2d = max(v["d2d_gbps"] for v in bandwidth_by_size.values())
|
|
peak_bw = self.specs["memory_bandwidth_gbps"]
|
|
efficiency = round((best_d2d / peak_bw) * 100, 1) if peak_bw else None
|
|
|
|
return {
|
|
"memory": {
|
|
"source": "pytorch",
|
|
"h2d_bandwidth_gbps": round(max(v["h2d_gbps"] for v in bandwidth_by_size.values()), 1),
|
|
"d2h_bandwidth_gbps": round(max(v["d2h_gbps"] for v in bandwidth_by_size.values()), 1),
|
|
"d2d_bandwidth_gbps": round(best_d2d, 1),
|
|
"peak_bandwidth_gbps": self.specs["memory_bandwidth_gbps"],
|
|
"efficiency_pct": efficiency,
|
|
"test_sizes_mb": test_sizes_mb,
|
|
"bandwidth_by_size": bandwidth_by_size,
|
|
"per_gpu": [],
|
|
}
|
|
}
|
|
|
|
def run_compute_benchmark(self, dtypes: Optional[List[str]] = None) -> dict:
|
|
comp_cfg = self.bench_cfg.get("compute", {})
|
|
configured_dtypes = dtypes or comp_cfg.get("dtypes", ["fp32", "tf32", "fp16", "bf16", "fp8"])
|
|
matrix_size = comp_cfg.get("matrix_size", 4096)
|
|
warmup = comp_cfg.get("warmup", 10)
|
|
iterations = comp_cfg.get("iterations", 100)
|
|
use_compile = comp_cfg.get("use_compile", False)
|
|
|
|
if not TORCH_AVAILABLE:
|
|
self.console.print("[yellow]PyTorch not available - skipping compute benchmark[/yellow]")
|
|
return {"compute": {"error": "pytorch_not_available"}}
|
|
|
|
gpu_count = torch.cuda.device_count()
|
|
self.console.print(f"[cyan]Compute Benchmark - {gpu_count} GPU(s)[/cyan]")
|
|
|
|
# torch.compile(max-autotune) benchmarks cuBLAS vs Triton kernels and picks
|
|
# the fastest for this GPU/shape, typically improving efficiency by 8-15%.
|
|
# compile_warmup must be larger than warmup to absorb JIT + autotuning time.
|
|
mm_fn = torch.matmul
|
|
compile_warmup = warmup
|
|
if use_compile:
|
|
try:
|
|
_compiled = torch.compile(torch.matmul, mode="max-autotune")
|
|
# Trial call to trigger JIT and verify compilation succeeds before the dtype loop.
|
|
_t = torch.randn(64, 64, device="cuda", dtype=torch.float32)
|
|
_compiled(_t, _t)
|
|
torch.cuda.synchronize()
|
|
del _t
|
|
mm_fn = _compiled
|
|
compile_warmup = max(warmup, 50)
|
|
self.console.print("[cyan] torch.compile(max-autotune) enabled[/cyan]")
|
|
except Exception as e:
|
|
self.console.print(f"[yellow] torch.compile unavailable ({type(e).__name__}), using eager[/yellow]")
|
|
|
|
dtype_map = {
|
|
"fp32": (torch.float32, self.specs["fp32_tflops"]),
|
|
"tf32": ("tf32", self.specs["tf32_tflops"]),
|
|
"fp16": (torch.float16, self.specs["fp16_tflops"]),
|
|
"bf16": (torch.bfloat16, self.specs["bf16_tflops"]),
|
|
"fp8": (torch.float8_e4m3fn, self.specs["fp8_tflops"]),
|
|
}
|
|
|
|
results_by_dtype = {}
|
|
per_gpu_results = [{"index": i} for i in range(gpu_count)]
|
|
|
|
with Progress(
|
|
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(), TextColumn("{task.completed}/{task.total}"),
|
|
TimeElapsedColumn(), console=self.console,
|
|
) as progress:
|
|
task = progress.add_task("Testing dtypes...", total=len(configured_dtypes))
|
|
|
|
for dtype_name in configured_dtypes:
|
|
if dtype_name not in dtype_map:
|
|
progress.advance(task)
|
|
continue
|
|
|
|
# Skip FP8 if GPU architecture doesn't support it
|
|
if dtype_name == "fp8" and self.specs.get("fp8_tflops", 0) == 0:
|
|
arch = self.specs.get("architecture", "unknown")
|
|
results_by_dtype["fp8"] = f"skipped ({arch} does not support FP8)"
|
|
self.console.print(f"[dim] fp8: skipped - {arch} architecture has no FP8 support[/dim]")
|
|
progress.advance(task)
|
|
continue
|
|
|
|
dtype_val, peak_tflops = dtype_map[dtype_name]
|
|
|
|
try:
|
|
if dtype_name == "tf32":
|
|
old_tf32 = torch.backends.cuda.matmul.allow_tf32
|
|
torch.backends.cuda.matmul.allow_tf32 = True
|
|
dtype_val = torch.float32
|
|
|
|
M = N = K = matrix_size
|
|
|
|
# Allocate enough matrix pairs so total memory exceeds GPU L2 cache
|
|
# (H100/H200 L2 = 50 MB), preventing cross-iteration cache reuse.
|
|
elem_bytes = 1 if dtype_name == "fp8" else torch.tensor([], dtype=dtype_val).element_size()
|
|
pair_bytes = 2 * M * K * elem_bytes
|
|
num_pools = max(4, -(-256 * 1024 * 1024 // pair_bytes)) # ceil(256MB / pair)
|
|
|
|
pools_a = pools_b = c = None
|
|
|
|
if dtype_name == "fp8":
|
|
pools_a = [torch.randn(M, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)]
|
|
pools_b = [torch.randn(N, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)]
|
|
scale_a = torch.tensor(1.0, device="cuda")
|
|
scale_b = torch.tensor(1.0, device="cuda")
|
|
def _fp8_mm(i):
|
|
return torch._scaled_mm(pools_a[i], pools_b[i].T, scale_a=scale_a, scale_b=scale_b, out_dtype=torch.bfloat16)
|
|
# Probe: verify _scaled_mm is functional before the timed loop.
|
|
# It requires PyTorch >= 2.1 + CUDA >= 12.0 + sm90 (Hopper).
|
|
if not hasattr(torch, "_scaled_mm"):
|
|
raise RuntimeError("torch._scaled_mm unavailable — upgrade to PyTorch >= 2.1")
|
|
try:
|
|
_probe = _fp8_mm(0)
|
|
torch.cuda.synchronize()
|
|
del _probe
|
|
except Exception as probe_err:
|
|
raise RuntimeError(f"FP8 _scaled_mm probe failed: {probe_err}") from probe_err
|
|
for i in range(warmup):
|
|
_fp8_mm(i % num_pools)
|
|
torch.cuda.synchronize()
|
|
start_event = torch.cuda.Event(enable_timing=True)
|
|
end_event = torch.cuda.Event(enable_timing=True)
|
|
start_event.record()
|
|
for i in range(iterations):
|
|
c = _fp8_mm(i % num_pools)
|
|
end_event.record()
|
|
torch.cuda.synchronize()
|
|
elapsed_ms = start_event.elapsed_time(end_event)
|
|
else:
|
|
pools_a = [torch.randn(M, K, device="cuda", dtype=dtype_val) for _ in range(num_pools)]
|
|
pools_b = [torch.randn(K, N, device="cuda", dtype=dtype_val) for _ in range(num_pools)]
|
|
|
|
indexed_a = [pools_a[i % num_pools] for i in range(compile_warmup + iterations)]
|
|
indexed_b = [pools_b[i % num_pools] for i in range(compile_warmup + iterations)]
|
|
|
|
for i in range(compile_warmup):
|
|
mm_fn(indexed_a[i], indexed_b[i])
|
|
torch.cuda.synchronize()
|
|
start_event = torch.cuda.Event(enable_timing=True)
|
|
end_event = torch.cuda.Event(enable_timing=True)
|
|
start_event.record()
|
|
for i in range(compile_warmup, compile_warmup + iterations):
|
|
c = mm_fn(indexed_a[i], indexed_b[i])
|
|
end_event.record()
|
|
torch.cuda.synchronize()
|
|
elapsed_ms = start_event.elapsed_time(end_event)
|
|
flops = 2 * M * N * K * iterations
|
|
tflops = flops / (elapsed_ms / 1000) / 1e12
|
|
results_by_dtype[dtype_name] = round(tflops, 1)
|
|
|
|
for pg in per_gpu_results:
|
|
pg[dtype_name] = round(tflops, 1)
|
|
|
|
if dtype_name == "tf32":
|
|
torch.backends.cuda.matmul.allow_tf32 = old_tf32
|
|
|
|
del pools_a, pools_b, c
|
|
torch.cuda.empty_cache()
|
|
|
|
except Exception as e:
|
|
results_by_dtype[dtype_name] = f"error: {e}"
|
|
self.console.print(f"[yellow] {dtype_name}: {e}[/yellow]")
|
|
|
|
progress.advance(task)
|
|
|
|
efficiency = {}
|
|
for dt, achieved in results_by_dtype.items():
|
|
if isinstance(achieved, (int, float)) and dt in dtype_map:
|
|
peak_tp = dtype_map[dt][1]
|
|
if peak_tp:
|
|
efficiency[dt] = round((achieved / peak_tp) * 100, 1)
|
|
|
|
return {
|
|
"compute": {
|
|
"per_dtype_tflops": results_by_dtype,
|
|
"peak_tflops": {dt: dtype_map[dt][1] for dt in dtype_map},
|
|
"efficiency_pct": efficiency,
|
|
# Absolute TFLOPS PASS thresholds (decoupled from peak). When present,
|
|
# report.py judges PASS/WARN/FAIL against these directly instead of
|
|
# using % of peak. Empty dict => fall back to legacy 80% rule.
|
|
"pass_thresholds_tflops": dict(
|
|
self.specs.get("compute_pass_thresholds_tflops") or {}
|
|
),
|
|
"per_gpu": per_gpu_results,
|
|
"matrix_size": matrix_size,
|
|
"warmup": warmup,
|
|
"iterations": iterations,
|
|
}
|
|
}
|
|
|
|
@staticmethod
|
|
def print_results(results: dict, console: Console = None):
|
|
c = console or Console()
|
|
|
|
if "memory" in results and "error" not in results["memory"]:
|
|
mem = results["memory"]
|
|
source = mem.get("source", "unknown")
|
|
c.print(f"\n[bold cyan]Memory Bandwidth Results (via {source})[/bold cyan]")
|
|
|
|
table = Table(box=None, padding=(0, 1))
|
|
table.add_column("Metric", style="bold")
|
|
table.add_column("Value", justify="right")
|
|
table.add_column("Peak", justify="right")
|
|
table.add_column("Efficiency", justify="right")
|
|
|
|
for label, achieved, peak_key, eff_key in [
|
|
("H2D (PCIe)", mem["h2d_bandwidth_gbps"], "h2d_peak_gbps", "h2d_efficiency_pct"),
|
|
("D2H (PCIe)", mem["d2h_bandwidth_gbps"], "d2h_peak_gbps", "d2h_efficiency_pct"),
|
|
("D2D (NVLink)", mem["d2d_bandwidth_gbps"], "d2d_peak_gbps", "d2d_efficiency_pct"),
|
|
]:
|
|
val_str = f"{achieved:.1f} GB/s" if isinstance(achieved, (int, float)) else "N/A"
|
|
peak = mem.get(peak_key, 0)
|
|
peak_str = f"{peak:.0f} GB/s" if peak else "N/A"
|
|
eff = mem.get(eff_key, 0)
|
|
if eff:
|
|
ec = "green" if eff >= 80 else ("yellow" if eff >= 50 else "red")
|
|
eff_str = f"[{ec}]{eff:.1f}%[/{ec}]"
|
|
else:
|
|
eff_str = "N/A"
|
|
table.add_row(label, val_str, peak_str, eff_str)
|
|
|
|
c.print(table)
|
|
|
|
by_test = mem.get("results_by_test", {})
|
|
if by_test:
|
|
c.print("\n [dim]nvbandwidth breakdown:[/dim]")
|
|
for tc, bw in sorted(by_test.items()):
|
|
c.print(f" {tc}: {bw} GB/s")
|
|
|
|
by_size = mem.get("bandwidth_by_size", {})
|
|
if by_size:
|
|
t2 = Table(title="Bandwidth by Transfer Size", box=None, padding=(0, 1))
|
|
t2.add_column("Size (MB)", style="bold", justify="right")
|
|
t2.add_column("H2D (GB/s)", justify="right")
|
|
t2.add_column("D2H (GB/s)", justify="right")
|
|
t2.add_column("D2D (GB/s)", justify="right")
|
|
for sz, vals in sorted(by_size.items(), key=lambda x: int(x[0])):
|
|
peak = mem["peak_bandwidth_gbps"]
|
|
if peak:
|
|
d2d_eff = (vals["d2d_gbps"] / peak) * 100
|
|
ec = "green" if d2d_eff >= 80 else ("yellow" if d2d_eff >= 50 else "red")
|
|
d2d_cell = f"[{ec}]{vals['d2d_gbps']:.1f}[/{ec}]"
|
|
else:
|
|
d2d_cell = f"{vals['d2d_gbps']:.1f}"
|
|
t2.add_row(sz, f"{vals['h2d_gbps']:.1f}", f"{vals['d2h_gbps']:.1f}", d2d_cell)
|
|
c.print(t2)
|
|
|
|
if "compute" in results and "error" not in results["compute"]:
|
|
comp = results["compute"]
|
|
c.print(f"\n[bold cyan]Compute Throughput Results[/bold cyan]")
|
|
|
|
table = Table(box=None, padding=(0, 1))
|
|
table.add_column("DType", style="bold")
|
|
table.add_column("Achieved (TFLOPS)", justify="right")
|
|
table.add_column("Peak", justify="right")
|
|
table.add_column("Efficiency", justify="right")
|
|
|
|
peak = comp.get("peak_tflops", {})
|
|
per_dtype = comp.get("per_dtype_tflops", {})
|
|
eff = comp.get("efficiency_pct", {})
|
|
|
|
for dt in per_dtype:
|
|
achieved = per_dtype[dt]
|
|
if isinstance(achieved, str):
|
|
table.add_row(dt, f"[red]{achieved}[/red]", str(peak.get(dt, "N/A")), "N/A")
|
|
continue
|
|
pk = peak.get(dt, 0)
|
|
ef = eff.get(dt, 0)
|
|
ec = "green" if ef >= 80 else ("yellow" if ef >= 50 else "red")
|
|
table.add_row(dt.upper(), f"{achieved:.1f}", f"{pk:.0f}",
|
|
f"[{ec}]{ef:.1f}%[/{ec}]")
|
|
c.print(table)
|