test_gpu_scripts/modules/benchmark.py

628 lines
28 KiB
Python

"""GPU benchmark module — nvbandwidth + PyTorch compute throughput."""
import json
import os
import shutil
import subprocess
import time
from datetime import datetime
from typing import Optional, List
from rich.console import Console
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn
from modules.gpu_specs import detect_gpu_type, get_gpu_specs, get_gpu_label, resolve_tools_dir
TORCH_AVAILABLE = False
try:
import torch
if torch.cuda.is_available():
TORCH_AVAILABLE = True
except ImportError:
pass
class Benchmark:
def __init__(self, config: dict):
self.config = config
self.console = Console()
self.bench_cfg = config.get("benchmark", {})
self.tools_dir = resolve_tools_dir(config)
cfg_gpu_type = config.get("gpu_type", "auto")
self.gpu_type = cfg_gpu_type if cfg_gpu_type != "auto" else detect_gpu_type()
self.specs = get_gpu_specs(self.gpu_type)
self.gpu_label = get_gpu_label(self.gpu_type)
def run(self) -> dict:
results = {}
results.update(self.run_memory_benchmark())
results.update(self.run_compute_benchmark())
return results
def _find_nvbandwidth(self) -> Optional[str]:
# 1. System PATH
p = shutil.which("nvbandwidth")
if p:
return p
# 2. tools_dir
local = os.path.join(self.tools_dir, "nvbandwidth", "nvbandwidth")
if os.path.isfile(local) and os.access(local, os.X_OK):
return local
# 3. Common DCGM / system locations
extra_paths = [
"/usr/libexec/datacenter-gpu-manager-4/plugins/cuda12/nvbandwidth",
"/usr/libexec/datacenter-gpu-manager/plugins/cuda12/nvbandwidth",
"/usr/local/bin/nvbandwidth",
"/opt/nvidia/nvbandwidth/nvbandwidth",
]
for ep in extra_paths:
if os.path.isfile(ep) and os.access(ep, os.X_OK):
return ep
return None
def run_memory_benchmark(self) -> dict:
nvbw = self._find_nvbandwidth()
if nvbw:
return self._run_nvbandwidth(nvbw)
self.console.print("[yellow]nvbandwidth not found, falling back to PyTorch[/yellow]")
return self._run_memory_pytorch()
def _run_nvbandwidth(self, nvbw_path: str) -> dict:
mem_cfg = self.bench_cfg.get("memory", {})
buffer_mb = mem_cfg.get("nvbandwidth_buffer_mb", 512)
samples = mem_cfg.get("nvbandwidth_samples", 3)
self.console.print(f"[cyan]Memory Benchmark via nvbandwidth ({nvbw_path})[/cyan]")
results_by_test = {}
# Testcases to run — keys used internally, try both old and new names
testcases = [
("h2d", ["host_to_device_memcpy_ce", "host_to_device_memcpy_read_ce"]),
("d2h", ["device_to_host_memcpy_ce", "device_to_host_memcpy_write_ce"]),
("d2d_write", ["device_to_device_memcpy_write_ce"]),
("d2d_read", ["device_to_device_memcpy_read_ce"]),
("d2d_bidir", ["device_to_device_bidirectional_memcpy_write_sm",
"device_to_device_bidirectional_sm"]),
]
# Discover available testcase names
available_names: list[str] = []
try:
list_r = subprocess.run(
[nvbw_path, "-l"], capture_output=True, text=True, timeout=15,
)
if list_r.returncode == 0:
for line in list_r.stdout.splitlines():
line = line.strip()
if line and ", " in line and line[0].isdigit():
parts = line.split(", ", 1)
name = parts[1].rstrip(":").strip()
if name:
available_names.append(name)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
with Progress(
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
BarColumn(), TextColumn("{task.completed}/{task.total}"),
TimeElapsedColumn(), console=self.console,
) as progress:
task = progress.add_task("nvbandwidth tests...", total=len(testcases))
for key, name_candidates in testcases:
# Pick the first available test name
tc = None
for candidate in name_candidates:
if not available_names or candidate in available_names:
tc = candidate
break
if tc is None:
progress.advance(task)
continue
try:
# --disableAffinity skips nvbandwidth's CPU affinity setup, which
# calls nvmlDeviceGetHandleByUUID() — that lookup fails on hosts
# whose fabricmanager build doesn't expose the UUID format nvml
# expects (seen on H20-3e with custom 570.172.08-1 fabricmanager).
cmd = [nvbw_path, "--disableAffinity", "-t", tc,
"-b", str(buffer_mb), "-i", str(samples), "-j"]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if r.returncode == 0 and r.stdout.strip():
avg_bw = self._parse_nvbandwidth_json(r.stdout)
results_by_test[key] = round(avg_bw, 1)
else:
results_by_test[key] = 0
except (subprocess.TimeoutExpired, FileNotFoundError):
results_by_test[key] = 0
progress.advance(task)
d2d_bw = max(
results_by_test.get("d2d_write", 0),
results_by_test.get("d2d_read", 0),
results_by_test.get("d2d_bidir", 0),
)
h2d_bw = results_by_test.get("h2d", 0)
d2h_bw = results_by_test.get("d2h", 0)
# If every subtest returned 0 the nvbandwidth binary is broken on this host
# (e.g. CUDA_ERROR_INVALID_CONTEXT, NVML mismatch). Fall back to PyTorch.
if all(v == 0 for v in results_by_test.values()):
self.console.print(
"[yellow]nvbandwidth returned no usable data — "
"falling back to PyTorch memory benchmark[/yellow]"
)
return self._run_memory_pytorch()
# D2D goes through NVLink — compare to NVLink per-direction bandwidth
# (nvlink_bandwidth_gbps is bidirectional, so per-direction = /2)
nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0)
d2d_peak = nvlink_bw / 2 if nvlink_bw else 0
d2d_efficiency = round((d2d_bw / d2d_peak) * 100, 1) if (d2d_bw and d2d_peak) else None
# H2D/D2H goes through PCIe — estimate peak from PCIe gen
pcie_gen = self.specs.get("pcie_gen", 0)
pcie_peak = {3: 16, 4: 32, 5: 64, 6: 128}.get(pcie_gen, 32) if pcie_gen > 0 else 0 # GB/s x16
h2d_efficiency = round((h2d_bw / pcie_peak) * 100, 1) if (h2d_bw and pcie_peak) else None
d2h_efficiency = round((d2h_bw / pcie_peak) * 100, 1) if (d2h_bw and pcie_peak) else None
return {
"memory": {
"source": "nvbandwidth",
"h2d_bandwidth_gbps": round(h2d_bw, 1),
"d2h_bandwidth_gbps": round(d2h_bw, 1),
"d2d_bandwidth_gbps": round(d2d_bw, 1),
"h2d_peak_gbps": pcie_peak if pcie_peak else None,
"d2h_peak_gbps": pcie_peak if pcie_peak else None,
"d2d_peak_gbps": round(d2d_peak, 1) if d2d_peak else None,
"h2d_efficiency_pct": h2d_efficiency,
"d2h_efficiency_pct": d2h_efficiency,
"d2d_efficiency_pct": d2d_efficiency,
"peak_bandwidth_gbps": self.specs["memory_bandwidth_gbps"],
"efficiency_pct": d2d_efficiency,
"results_by_test": results_by_test,
"per_gpu": [],
}
}
@staticmethod
def _parse_nvbandwidth_json(raw: str) -> float:
"""Parse nvbandwidth JSON output (supports v0.5+ and v0.8+ formats)."""
try:
data = json.loads(raw)
except json.JSONDecodeError:
return 0.0
# v0.8+ format: {"nvbandwidth": {"testcases": [{"bandwidth_matrix": [...], "sum": N}]}}
if isinstance(data, dict) and "nvbandwidth" in data:
testcases = data["nvbandwidth"].get("testcases", [])
for tc in testcases:
matrix = tc.get("bandwidth_matrix", [])
values = []
for row in matrix:
for cell in row:
try:
v = float(cell)
except (ValueError, TypeError):
continue
# Exclude diagonal entries (intra-device, reported as 0 or
# N/A) so they don't drag the off-diagonal average down.
if v > 0:
values.append(v)
if values:
return sum(values) / len(values)
return 0.0
# v0.5 format: list of dicts with "results" array
entries = data if isinstance(data, list) else [data]
bw_values = []
for entry in entries:
if isinstance(entry, dict):
for row in entry.get("results", []):
val = row.get("value", 0)
if isinstance(val, (int, float)):
bw_values.append(val)
return sum(bw_values) / len(bw_values) if bw_values else 0.0
def _run_memory_pytorch(self) -> dict:
mem_cfg = self.bench_cfg.get("memory", {})
test_sizes_mb = [1, 4, 16, 64, 256, 1024, 4096]
iterations = mem_cfg.get("iterations", 10)
if not TORCH_AVAILABLE:
self.console.print("[yellow]PyTorch not available - skipping memory benchmark[/yellow]")
return {"memory": {"error": "pytorch_not_available"}}
gpu_count = torch.cuda.device_count()
self.console.print(f"[cyan]Memory Benchmark (PyTorch fallback) - {gpu_count} GPU(s)[/cyan]")
bandwidth_by_size = {}
with Progress(
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
BarColumn(), TextColumn("{task.completed}/{task.total}"),
TimeElapsedColumn(), console=self.console,
) as progress:
task = progress.add_task("Testing sizes...", total=len(test_sizes_mb))
for size_mb in test_sizes_mb:
size_bytes = size_mb * 1024 * 1024
h2d_times, d2h_times, d2d_times = [], [], []
x_cpu = torch.randn(size_bytes // 4, dtype=torch.float32)
for _ in range(iterations):
t0 = time.perf_counter()
x_gpu = x_cpu.cuda()
torch.cuda.synchronize()
h2d_times.append(time.perf_counter() - t0)
t0 = time.perf_counter()
x_gpu.cpu()
torch.cuda.synchronize()
d2h_times.append(time.perf_counter() - t0)
x_gpu2 = torch.randn_like(x_gpu)
t0 = time.perf_counter()
x_gpu2.copy_(x_gpu)
torch.cuda.synchronize()
d2d_times.append(time.perf_counter() - t0)
del x_gpu, x_gpu2
torch.cuda.empty_cache()
def median(lst):
s = sorted(lst)
return s[len(s) // 2]
def bw_gb(t, sz):
return (sz / t) / 1e9
bandwidth_by_size[str(size_mb)] = {
"h2d_gbps": round(bw_gb(median(h2d_times), size_bytes), 1),
"d2h_gbps": round(bw_gb(median(d2h_times), size_bytes), 1),
"d2d_gbps": round(bw_gb(median(d2d_times), size_bytes), 1),
}
progress.advance(task)
best_d2d = max(v["d2d_gbps"] for v in bandwidth_by_size.values())
peak_bw = self.specs["memory_bandwidth_gbps"]
efficiency = round((best_d2d / peak_bw) * 100, 1) if peak_bw else None
return {
"memory": {
"source": "pytorch",
"h2d_bandwidth_gbps": round(max(v["h2d_gbps"] for v in bandwidth_by_size.values()), 1),
"d2h_bandwidth_gbps": round(max(v["d2h_gbps"] for v in bandwidth_by_size.values()), 1),
"d2d_bandwidth_gbps": round(best_d2d, 1),
"peak_bandwidth_gbps": self.specs["memory_bandwidth_gbps"],
"efficiency_pct": efficiency,
"test_sizes_mb": test_sizes_mb,
"bandwidth_by_size": bandwidth_by_size,
"per_gpu": [],
}
}
def run_compute_benchmark(self, dtypes: Optional[List[str]] = None) -> dict:
comp_cfg = self.bench_cfg.get("compute", {})
configured_dtypes = dtypes or comp_cfg.get("dtypes", ["fp32", "tf32", "fp16", "bf16", "fp8"])
matrix_size = comp_cfg.get("matrix_size", 4096)
warmup = comp_cfg.get("warmup", 10)
iterations = comp_cfg.get("iterations", 100)
use_compile = comp_cfg.get("use_compile", False)
if not TORCH_AVAILABLE:
self.console.print("[yellow]PyTorch not available - skipping compute benchmark[/yellow]")
return {"compute": {"error": "pytorch_not_available"}}
gpu_count = torch.cuda.device_count()
self.console.print(f"[cyan]Compute Benchmark - {gpu_count} GPU(s)[/cyan]")
# torch.compile(max-autotune) benchmarks cuBLAS vs Triton kernels and picks
# the fastest for this GPU/shape, typically improving efficiency by 8-15%.
# compile_warmup must be larger than warmup to absorb JIT + autotuning time.
mm_fn = torch.matmul
compile_warmup = warmup
if use_compile:
try:
_compiled = torch.compile(torch.matmul, mode="max-autotune")
# Trial call to trigger JIT and verify compilation succeeds before the dtype loop.
_t = torch.randn(64, 64, device="cuda", dtype=torch.float32)
_compiled(_t, _t)
torch.cuda.synchronize()
del _t
mm_fn = _compiled
compile_warmup = max(warmup, 50)
self.console.print("[cyan] torch.compile(max-autotune) enabled[/cyan]")
except Exception as e:
self.console.print(f"[yellow] torch.compile unavailable ({type(e).__name__}), using eager[/yellow]")
dtype_map = {
"fp32": (torch.float32, self.specs.get("fp32_tflops", 0)),
"tf32": ("tf32", self.specs.get("tf32_tflops", 0)),
"fp16": (torch.float16, self.specs.get("fp16_tflops", 0)),
"bf16": (torch.bfloat16, self.specs.get("bf16_tflops", 0)),
"fp8": (getattr(torch, "float8_e4m3fn", None), self.specs.get("fp8_tflops", 0)),
"fp64": (torch.float64, self.specs.get("fp64_tflops", 0)),
"int8": (torch.int8, self.specs.get("int8_tflops", 0)),
}
results_by_dtype = {}
per_gpu_results = [{"index": i} for i in range(gpu_count)]
with Progress(
SpinnerColumn(), TextColumn("[progress.description]{task.description}"),
BarColumn(), TextColumn("{task.completed}/{task.total}"),
TimeElapsedColumn(), console=self.console,
) as progress:
task = progress.add_task("Testing dtypes...", total=len(configured_dtypes))
for dtype_name in configured_dtypes:
if dtype_name not in dtype_map:
progress.advance(task)
continue
# Skip FP8 if GPU architecture doesn't support it
if dtype_name == "fp8" and self.specs.get("fp8_tflops", 0) == 0:
arch = self.specs.get("architecture", "unknown")
results_by_dtype["fp8"] = f"skipped ({arch} does not support FP8)"
self.console.print(f"[dim] fp8: skipped - {arch} architecture has no FP8 support[/dim]")
progress.advance(task)
continue
gpu_values = []
errors = []
for gpu_idx in range(gpu_count):
try:
val = self._benchmark_dtype_on_gpu(
dtype_name, dtype_map[dtype_name][0], matrix_size,
warmup, compile_warmup, iterations, mm_fn, gpu_idx,
)
gpu_values.append(val)
per_gpu_results[gpu_idx][dtype_name] = round(val, 1)
except Exception as e:
errors.append(f"gpu{gpu_idx}: {e}")
per_gpu_results[gpu_idx][dtype_name] = f"error: {e}"
finally:
torch.cuda.empty_cache()
if gpu_values:
results_by_dtype[dtype_name] = round(sum(gpu_values) / len(gpu_values), 1)
else:
results_by_dtype[dtype_name] = "error: " + "; ".join(errors[:3])
self.console.print(f"[yellow] {dtype_name}: {results_by_dtype[dtype_name]}[/yellow]")
progress.advance(task)
efficiency = {}
for dt, achieved in results_by_dtype.items():
if isinstance(achieved, (int, float)) and dt in dtype_map:
peak_tp = dtype_map[dt][1]
if peak_tp:
efficiency[dt] = round((achieved / peak_tp) * 100, 1)
consistency = {}
for dt in results_by_dtype:
vals = [pg.get(dt) for pg in per_gpu_results]
nums = [v for v in vals if isinstance(v, (int, float))]
if len(nums) >= 2:
mean = sum(nums) / len(nums)
spread_pct = ((max(nums) - min(nums)) / mean * 100) if mean else 0
consistency[dt] = {
"mean_tflops": round(mean, 1),
"min_tflops": round(min(nums), 1),
"max_tflops": round(max(nums), 1),
"spread_pct": round(spread_pct, 2),
"max_allowed_pct": 3,
"passed": spread_pct <= 3,
}
pass_thresholds = dict(self.specs.get("compute_pass_thresholds_tflops") or {})
threshold_passed = True
for dt, threshold in pass_thresholds.items():
val = results_by_dtype.get(dt)
if not isinstance(val, (int, float)) or val < threshold:
threshold_passed = False
break
consistency_passed = all(row.get("passed", False) for row in consistency.values()) if consistency else True
return {
"compute": {
"passed": threshold_passed and consistency_passed,
"per_dtype_tflops": results_by_dtype,
"peak_tflops": {dt: dtype_map[dt][1] for dt in dtype_map},
"efficiency_pct": efficiency,
# Absolute TFLOPS PASS thresholds (decoupled from peak). When present,
# report.py judges PASS/WARN/FAIL against these directly instead of
# using % of peak. Empty dict => fall back to legacy 80% rule.
"pass_thresholds_tflops": pass_thresholds,
"per_gpu": per_gpu_results,
"consistency": consistency,
"matrix_size": matrix_size,
"warmup": warmup,
"iterations": iterations,
}
}
def _benchmark_dtype_on_gpu(self, dtype_name: str, dtype_val, matrix_size: int,
warmup: int, compile_warmup: int, iterations: int,
mm_fn, gpu_idx: int) -> float:
if dtype_name == "fp8" and dtype_val is None:
raise RuntimeError("torch.float8_e4m3fn unavailable")
device = f"cuda:{gpu_idx}"
old_tf32 = torch.backends.cuda.matmul.allow_tf32
try:
with torch.cuda.device(gpu_idx):
if dtype_name == "tf32":
torch.backends.cuda.matmul.allow_tf32 = True
dtype_val = torch.float32
M = N = K = matrix_size
if dtype_name == "int8" and M > 4096:
# torch._int_mm on 8192 can be extremely memory hungry because the
# output is int32. Keep it production-visible, but bounded.
M = N = K = 4096
elem_bytes = 1 if dtype_name in ("fp8", "int8") else torch.tensor([], dtype=dtype_val).element_size()
pair_bytes = 2 * M * K * elem_bytes
num_pools = max(4, -(-256 * 1024 * 1024 // pair_bytes))
if dtype_name == "fp8":
if not hasattr(torch, "_scaled_mm"):
raise RuntimeError("torch._scaled_mm unavailable")
pools_a = [torch.randn(M, K, device=device, dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)]
pools_b = [torch.randn(N, K, device=device, dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)]
scale_a = torch.tensor(1.0, device=device)
scale_b = torch.tensor(1.0, device=device)
def run(i):
return torch._scaled_mm(pools_a[i], pools_b[i].T, scale_a=scale_a, scale_b=scale_b, out_dtype=torch.bfloat16)
effective_warmup = warmup
elif dtype_name == "int8":
if not hasattr(torch, "_int_mm"):
raise RuntimeError("torch._int_mm unavailable")
pools_a = [torch.randint(-128, 127, (M, K), device=device, dtype=torch.int8) for _ in range(num_pools)]
pools_b = [torch.randint(-128, 127, (K, N), device=device, dtype=torch.int8) for _ in range(num_pools)]
def run(i):
return torch._int_mm(pools_a[i], pools_b[i])
effective_warmup = warmup
else:
pools_a = [torch.randn(M, K, device=device, dtype=dtype_val) for _ in range(num_pools)]
pools_b = [torch.randn(K, N, device=device, dtype=dtype_val) for _ in range(num_pools)]
def run(i):
return mm_fn(pools_a[i], pools_b[i])
effective_warmup = compile_warmup
for i in range(effective_warmup):
run(i % num_pools)
torch.cuda.synchronize()
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
start_event.record()
for i in range(iterations):
c = run(i % num_pools)
end_event.record()
torch.cuda.synchronize()
elapsed_ms = start_event.elapsed_time(end_event)
del pools_a, pools_b, c
flops = 2 * M * N * K * iterations
return flops / (elapsed_ms / 1000) / 1e12
finally:
torch.backends.cuda.matmul.allow_tf32 = old_tf32
@staticmethod
def print_results(results: dict, console: Console = None):
c = console or Console()
if "memory" in results and "error" not in results["memory"]:
mem = results["memory"]
source = mem.get("source", "unknown")
c.print(f"\n[bold cyan]Memory Bandwidth Results (via {source})[/bold cyan]")
table = Table(box=None, padding=(0, 1))
table.add_column("Metric", style="bold")
table.add_column("Value", justify="right")
table.add_column("Peak", justify="right")
table.add_column("Efficiency", justify="right")
for label, achieved, peak_key, eff_key in [
("H2D (PCIe)", mem["h2d_bandwidth_gbps"], "h2d_peak_gbps", "h2d_efficiency_pct"),
("D2H (PCIe)", mem["d2h_bandwidth_gbps"], "d2h_peak_gbps", "d2h_efficiency_pct"),
("D2D (NVLink)", mem["d2d_bandwidth_gbps"], "d2d_peak_gbps", "d2d_efficiency_pct"),
]:
val_str = f"{achieved:.1f} GB/s" if isinstance(achieved, (int, float)) else "N/A"
peak = mem.get(peak_key, 0)
peak_str = f"{peak:.0f} GB/s" if peak else "N/A"
eff = mem.get(eff_key, 0)
if eff:
ec = "green" if eff >= 80 else ("yellow" if eff >= 50 else "red")
eff_str = f"[{ec}]{eff:.1f}%[/{ec}]"
else:
eff_str = "N/A"
table.add_row(label, val_str, peak_str, eff_str)
c.print(table)
by_test = mem.get("results_by_test", {})
if by_test:
c.print("\n [dim]nvbandwidth breakdown:[/dim]")
for tc, bw in sorted(by_test.items()):
c.print(f" {tc}: {bw} GB/s")
by_size = mem.get("bandwidth_by_size", {})
if by_size:
t2 = Table(title="Bandwidth by Transfer Size", box=None, padding=(0, 1))
t2.add_column("Size (MB)", style="bold", justify="right")
t2.add_column("H2D (GB/s)", justify="right")
t2.add_column("D2H (GB/s)", justify="right")
t2.add_column("D2D (GB/s)", justify="right")
for sz, vals in sorted(by_size.items(), key=lambda x: int(x[0])):
peak = mem["peak_bandwidth_gbps"]
if peak:
d2d_eff = (vals["d2d_gbps"] / peak) * 100
ec = "green" if d2d_eff >= 80 else ("yellow" if d2d_eff >= 50 else "red")
d2d_cell = f"[{ec}]{vals['d2d_gbps']:.1f}[/{ec}]"
else:
d2d_cell = f"{vals['d2d_gbps']:.1f}"
t2.add_row(sz, f"{vals['h2d_gbps']:.1f}", f"{vals['d2h_gbps']:.1f}", d2d_cell)
c.print(t2)
if "compute" in results and "error" not in results["compute"]:
comp = results["compute"]
c.print(f"\n[bold cyan]Compute Throughput Results[/bold cyan]")
table = Table(box=None, padding=(0, 1))
table.add_column("DType", style="bold")
table.add_column("Achieved (TFLOPS)", justify="right")
table.add_column("Peak", justify="right")
table.add_column("Efficiency", justify="right")
peak = comp.get("peak_tflops", {})
per_dtype = comp.get("per_dtype_tflops", {})
eff = comp.get("efficiency_pct", {})
for dt in per_dtype:
achieved = per_dtype[dt]
if isinstance(achieved, str):
table.add_row(dt, f"[red]{achieved}[/red]", str(peak.get(dt, "N/A")), "N/A")
continue
pk = peak.get(dt, 0)
ef = eff.get(dt, 0)
ec = "green" if ef >= 80 else ("yellow" if ef >= 50 else "red")
table.add_row(dt.upper(), f"{achieved:.1f}", f"{pk:.0f}",
f"[{ec}]{ef:.1f}%[/{ec}]")
c.print(table)
consistency = comp.get("consistency", {})
if consistency:
t_cons = Table(title="Per-GPU Consistency", box=None, padding=(0, 1))
t_cons.add_column("DType", style="bold")
t_cons.add_column("Min", justify="right")
t_cons.add_column("Mean", justify="right")
t_cons.add_column("Max", justify="right")
t_cons.add_column("Spread", justify="right")
t_cons.add_column("Status", justify="right")
for dt, row in consistency.items():
status = "PASS" if row.get("passed") else "FAIL"
color = "green" if row.get("passed") else "red"
t_cons.add_row(
dt.upper(),
f"{row.get('min_tflops', 0):.1f}",
f"{row.get('mean_tflops', 0):.1f}",
f"{row.get('max_tflops', 0):.1f}",
f"{row.get('spread_pct', 0):.2f}%",
f"[{color}]{status}[/{color}]",
)
c.print(t_cons)