Compare commits

..

1 Commits

Author SHA1 Message Date
1db1313d50 fix(health): mark PCIe link downgrade as FAIL instead of WARN
The PCIe link health check was producing inconsistent verdicts: when the
negotiated link did not meet the GPU's expected Gen/Width (e.g. an H200
running at Gen4 instead of Gen5, or any GPU dropping below x16), the code
correctly flipped overall_pass to False — but recorded the per-GPU status
as "WARN" rather than "FAIL".

This mismatch broke the convention used by every other check in the
module (temperature, ECC, throttling), where FAIL is the only status
that drives overall_pass=False, and WARN is purely informational. As a
result the rendered Markdown / table output would show a yellow WARN
badge for the affected GPU while the overall Health Check verdict came
back red FAIL, leaving operators to wonder which signal to trust.

A PCIe link downgrade is not a soft warning — it halves H2D/D2H
bandwidth (Gen5 x16 ~64 GB/s -> Gen4 x16 ~32 GB/s), directly impacting
data loading, checkpoint I/O, and ZeRO/offload throughput. For an
acceptance-test tool this should be a hard failure, consistent with how
overall_pass already treats it.

Change: in modules/health_check.py, set status to "FAIL" (not "WARN")
when pcie_ok is False. This applies to both the known-GPU path
(Gen >= expected and Width >= 16) and the unknown-GPU fallback path
(Width >= 8). No behavioral change to overall_pass — only the per-GPU
status string is corrected so the table view, Markdown report, and the
overall verdict now agree.
2026-05-10 17:23:51 +08:00
10 changed files with 105 additions and 1300 deletions

View File

@ -1,4 +1,4 @@
# GPU type: auto-detect or override to a100/a800/h100/h800/h200/h20/b200/b300
# GPU type: auto-detect or override to a100/a800/h100/h200/b200/b300
gpu_type: auto
benchmark:
@ -14,29 +14,13 @@ benchmark:
- fp16
- bf16
- fp8
# MAMF-style shape sweep: measure each dtype at every shape below and keep the max
# TFLOPS (the realistic achievable peak). A single fixed shape under-reports by
# ~7-12% and can't meet the MAMF-calibrated thresholds in gpu_specs.py.
# Each entry is either N (square N×N×N) or [M, N, K]. K-heavy non-square shapes
# (e.g. 2048×2048×13312) hit the true Hopper MAMF — bf16 ~790 vs ~755 square.
# Empty list => single matrix_size shape (legacy behaviour).
sweep_sizes:
- 3584
- 4608
- 5376
- 8192
- 11520
- [2048, 2048, 13312]
- [2048, 2048, 16384]
matrix_size: 8192 # fallback shape when sweep_sizes is empty
warmup: 20
iterations: 80
# NOTE: torch.compile was dropped — on H100 eager cuBLAS beats Triton for plain
# GEMM, and compiling would re-autotune per shape and make the sweep very slow.
matrix_size: 4096
warmup: 10
iterations: 100
health:
temp_warning: 75
temp_critical: 85
temp_warning: 80
temp_critical: 90
power_limit: null # null = auto-detect from GPU TDP per gpu_specs.py
nccl:
@ -49,7 +33,7 @@ nccl:
test_sendrecv: false
stress:
duration_sec: 600 # 10 min — reaches thermal steady state, validates throttle/jitter beyond warmup
duration_sec: 60
use_doubles: false
use_tensor_cores: true
memory_pct: 90
@ -62,24 +46,6 @@ rdma:
msg_size: 65536
ib_device: null
ib_port: 1
# Cross-node (two-host) RDMA via perftest, orchestrated over SSH from the CLIENT
# node. Replaces the old scripts/rdma_cross_node.sh. Run on the client; it starts
# ib_write_bw/ib_write_lat servers on `server` over SSH (passwordless required),
# then drives the local client per device.
cross_node:
enabled: false # set true on the client node to run cross-node RDMA
server: null # peer ssh address, e.g. 172.72.8.12 (server node)
server_addr: null # OOB addr client connects to (default: = server)
ssh_user: root
devices: [] # e.g. [mlx5_0, mlx5_1, mlx5_6, mlx5_7]; [] = auto-detect active IB
ib_port: 1
gid_index: null # -x <n> for RoCE; null for pure InfiniBand
msg_size: 1048576 # 1 MiB — large enough to reach NDR400 peak
iters: 5000
base_oob_port: 18515 # per-device OOB port = base + device index
server_warmup_sec: 2.0
min_bandwidth_gbps: 350 # per-port PASS floor (NDR400 ≈ 0.9 × 400)
max_latency_us: 5
training:
model: gpt2

View File

@ -30,8 +30,7 @@ class Benchmark:
self.console = Console()
self.bench_cfg = config.get("benchmark", {})
self.tools_dir = resolve_tools_dir(config)
cfg_gpu_type = config.get("gpu_type", "auto")
self.gpu_type = cfg_gpu_type if cfg_gpu_type != "auto" else detect_gpu_type()
self.gpu_type = detect_gpu_type()
self.specs = get_gpu_specs(self.gpu_type)
self.gpu_label = get_gpu_label(self.gpu_type)
@ -126,12 +125,8 @@ class Benchmark:
continue
try:
# --disableAffinity skips nvbandwidth's CPU affinity setup, which
# calls nvmlDeviceGetHandleByUUID() — that lookup fails on hosts
# whose fabricmanager build doesn't expose the UUID format nvml
# expects (seen on H20-3e with custom 570.172.08-1 fabricmanager).
cmd = [nvbw_path, "--disableAffinity", "-t", tc,
"-b", str(buffer_mb), "-i", str(samples), "-j"]
cmd = [nvbw_path, "-t", tc, "-b", str(buffer_mb),
"-i", str(samples), "-j"]
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if r.returncode == 0 and r.stdout.strip():
@ -152,15 +147,6 @@ class Benchmark:
h2d_bw = results_by_test.get("h2d", 0)
d2h_bw = results_by_test.get("d2h", 0)
# If every subtest returned 0 the nvbandwidth binary is broken on this host
# (e.g. CUDA_ERROR_INVALID_CONTEXT, NVML mismatch). Fall back to PyTorch.
if all(v == 0 for v in results_by_test.values()):
self.console.print(
"[yellow]nvbandwidth returned no usable data — "
"falling back to PyTorch memory benchmark[/yellow]"
)
return self._run_memory_pytorch()
# D2D goes through NVLink — compare to NVLink per-direction bandwidth
# (nvlink_bandwidth_gbps is bidirectional, so per-direction = /2)
nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0)
@ -210,12 +196,9 @@ class Benchmark:
for cell in row:
try:
v = float(cell)
values.append(v)
except (ValueError, TypeError):
continue
# Exclude diagonal entries (intra-device, reported as 0 or
# N/A) so they don't drag the off-diagonal average down.
if v > 0:
values.append(v)
if values:
return sum(values) / len(values)
return 0.0
@ -312,31 +295,9 @@ class Benchmark:
def run_compute_benchmark(self, dtypes: Optional[List[str]] = None) -> dict:
comp_cfg = self.bench_cfg.get("compute", {})
configured_dtypes = dtypes or comp_cfg.get("dtypes", ["fp32", "tf32", "fp16", "bf16", "fp8"])
# MAMF-style shape sweep (à la stas00's mamf-finder): a single fixed matmul
# shape under-reports the achievable peak by ~7-12% and therefore can't meet
# the MAMF-calibrated PASS thresholds in gpu_specs.compute_pass_thresholds_tflops.
# So for each dtype we time several matmul shapes and keep the MAXIMUM TFLOPS
# (the realistic peak). matrix_size is the fallback when sweep_sizes is empty.
matrix_size = comp_cfg.get("matrix_size", 8192)
sweep_sizes = comp_cfg.get("sweep_sizes") or [matrix_size]
warmup = comp_cfg.get("warmup", 20)
iterations = comp_cfg.get("iterations", 80)
# Each sweep entry is either an int N (square N×N×N) or an [M, N, K] triple.
# Non-square / K-heavy shapes (e.g. 2048×2048×13312) reach the true MAMF peak
# on Hopper — square-only tops out ~5% lower — so the default set mixes both.
def _to_shape(entry):
if isinstance(entry, (list, tuple)):
if len(entry) == 3:
return tuple(int(x) for x in entry)
if len(entry) == 1:
n = int(entry[0])
return (n, n, n)
raise ValueError(f"sweep size {entry!r} must be an int or [M, N, K]")
n = int(entry)
return (n, n, n)
shapes = [_to_shape(e) for e in sweep_sizes]
matrix_size = comp_cfg.get("matrix_size", 4096)
warmup = comp_cfg.get("warmup", 10)
iterations = comp_cfg.get("iterations", 100)
if not TORCH_AVAILABLE:
self.console.print("[yellow]PyTorch not available - skipping compute benchmark[/yellow]")
@ -344,11 +305,6 @@ class Benchmark:
gpu_count = torch.cuda.device_count()
self.console.print(f"[cyan]Compute Benchmark - {gpu_count} GPU(s)[/cyan]")
if len(sweep_sizes) > 1:
self.console.print(
f"[cyan] MAMF shape sweep over {len(sweep_sizes)} sizes: "
f"{', '.join(str(s) for s in sweep_sizes)}[/cyan]"
)
dtype_map = {
"fp32": (torch.float32, self.specs["fp32_tflops"]),
@ -359,7 +315,6 @@ class Benchmark:
}
results_by_dtype = {}
best_shapes = {}
per_gpu_results = [{"index": i} for i in range(gpu_count)]
with Progress(
@ -384,39 +339,64 @@ class Benchmark:
dtype_val, peak_tflops = dtype_map[dtype_name]
# allow_tf32 only affects float32 matmuls: ON for the TF32 run, OFF for
# the true-FP32 run so the two stay distinct.
old_tf32 = torch.backends.cuda.matmul.allow_tf32
if dtype_name == "tf32":
torch.backends.cuda.matmul.allow_tf32 = True
dtype_val = torch.float32
elif dtype_name == "fp32":
torch.backends.cuda.matmul.allow_tf32 = False
try:
if dtype_name == "tf32":
old_tf32 = torch.backends.cuda.matmul.allow_tf32
torch.backends.cuda.matmul.allow_tf32 = True
dtype_val = torch.float32
best_tflops, best_shape, last_err = 0.0, None, None
for (M, N, K) in shapes:
try:
t = self._bench_matmul_once(dtype_name, dtype_val, M, N, K, warmup, iterations)
if t > best_tflops:
best_tflops, best_shape = t, (M, N, K)
except Exception as e: # noqa: BLE001 - record and try the next shape
last_err = e
M = N = K = matrix_size
torch.backends.cuda.matmul.allow_tf32 = old_tf32
if dtype_name == "fp8":
a = torch.randn(M, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn)
b = torch.randn(N, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn)
scale_a = torch.tensor(1.0, device="cuda")
scale_b = torch.tensor(1.0, device="cuda")
def _fp8_mm():
return torch._scaled_mm(a, b.T, scale_a=scale_a, scale_b=scale_b, out_dtype=torch.bfloat16)
else:
a = torch.randn(M, K, device="cuda", dtype=dtype_val)
b = torch.randn(K, N, device="cuda", dtype=dtype_val)
if dtype_name == "fp8":
for _ in range(warmup):
_fp8_mm()
torch.cuda.synchronize()
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
start_event.record()
for _ in range(iterations):
c = _fp8_mm()
end_event.record()
else:
for _ in range(warmup):
torch.matmul(a, b)
torch.cuda.synchronize()
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
start_event.record()
for _ in range(iterations):
c = torch.matmul(a, b)
end_event.record()
torch.cuda.synchronize()
elapsed_ms = start_event.elapsed_time(end_event)
flops = 2 * M * N * K * iterations
tflops = flops / (elapsed_ms / 1000) / 1e12
results_by_dtype[dtype_name] = round(tflops, 1)
if best_shape is None:
results_by_dtype[dtype_name] = f"error: {last_err}"
self.console.print(f"[yellow] {dtype_name}: {last_err}[/yellow]")
else:
shape_str = "x".join(str(d) for d in best_shape)
results_by_dtype[dtype_name] = round(best_tflops, 1)
best_shapes[dtype_name] = shape_str
for pg in per_gpu_results:
pg[dtype_name] = round(best_tflops, 1)
if len(shapes) > 1:
self.console.print(
f"[dim] {dtype_name}: {best_tflops:.1f} TFLOPS @ {shape_str}[/dim]"
)
pg[dtype_name] = round(tflops, 1)
if dtype_name == "tf32":
torch.backends.cuda.matmul.allow_tf32 = old_tf32
del a, b, c
torch.cuda.empty_cache()
except Exception as e:
results_by_dtype[dtype_name] = f"error: {e}"
self.console.print(f"[yellow] {dtype_name}: {e}[/yellow]")
progress.advance(task)
@ -432,74 +412,13 @@ class Benchmark:
"per_dtype_tflops": results_by_dtype,
"peak_tflops": {dt: dtype_map[dt][1] for dt in dtype_map},
"efficiency_pct": efficiency,
# Absolute TFLOPS PASS thresholds (decoupled from peak). When present,
# report.py judges PASS/WARN/FAIL against these directly instead of
# using % of peak. Empty dict => fall back to legacy 80% rule.
"pass_thresholds_tflops": dict(
self.specs.get("compute_pass_thresholds_tflops") or {}
),
"per_gpu": per_gpu_results,
"sweep_sizes": list(sweep_sizes),
"best_shapes": best_shapes,
"matrix_size": matrix_size,
"warmup": warmup,
"iterations": iterations,
}
}
def _bench_matmul_once(self, dtype_name: str, dtype_val, M: int, N: int, K: int,
warmup: int, iterations: int) -> float:
"""Time one (M×K)·(K×N) matmul for a dtype and return achieved TFLOPS.
Uses an L2-cache-busting pool of matrix pairs (total > 256 MB) so operands
can't be served from L2 across iterations, and CUDA events for timing. FP8
goes through torch._scaled_mm (e4m3); all others through torch.matmul eager
cuBLAS, which on H100 beats torch.compile/Triton for plain GEMM and avoids the
per-shape recompile cost that would make a sweep pathologically slow.
"""
elem_bytes = 1 if dtype_name == "fp8" else torch.tensor([], dtype=dtype_val).element_size()
pair_bytes = (M * K + K * N) * elem_bytes
num_pools = max(4, -(-256 * 1024 * 1024 // pair_bytes)) # ceil(256MB / pair)
if dtype_name == "fp8":
if not hasattr(torch, "_scaled_mm"):
raise RuntimeError("torch._scaled_mm unavailable — upgrade to PyTorch >= 2.1")
pools_a = [torch.randn(M, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)]
pools_b = [torch.randn(N, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)]
scale_a = torch.tensor(1.0, device="cuda")
scale_b = torch.tensor(1.0, device="cuda")
def op(i):
return torch._scaled_mm(pools_a[i], pools_b[i].T, scale_a=scale_a, scale_b=scale_b, out_dtype=torch.bfloat16)
else:
pools_a = [torch.randn(M, K, device="cuda", dtype=dtype_val) for _ in range(num_pools)]
pools_b = [torch.randn(K, N, device="cuda", dtype=dtype_val) for _ in range(num_pools)]
def op(i):
return torch.matmul(pools_a[i], pools_b[i])
try:
# Probe once so a broken/unsupported kernel raises before the timed loop.
_probe = op(0)
torch.cuda.synchronize()
del _probe
for i in range(warmup):
op(i % num_pools)
torch.cuda.synchronize()
start_event = torch.cuda.Event(enable_timing=True)
end_event = torch.cuda.Event(enable_timing=True)
start_event.record()
for i in range(iterations):
op(i % num_pools)
end_event.record()
torch.cuda.synchronize()
elapsed_ms = start_event.elapsed_time(end_event)
finally:
del pools_a, pools_b
torch.cuda.empty_cache()
return (2 * M * N * K * iterations) / (elapsed_ms / 1000) / 1e12
@staticmethod
def print_results(results: dict, console: Console = None):
c = console or Console()
@ -582,78 +501,3 @@ class Benchmark:
table.add_row(dt.upper(), f"{achieved:.1f}", f"{pk:.0f}",
f"[{ec}]{ef:.1f}%[/{ec}]")
c.print(table)
@staticmethod
def judge_compute(results: dict) -> dict:
"""Judge compute results against pass_thresholds_tflops.
Single source of truth for the PASS/WARN/FAIL rule (same one report.py uses):
achieved >= thr -> PASS; >= 0.9*thr -> WARN; else FAIL. A string achieved value
(skipped/error) -> SKIP. A dtype without a threshold falls back to efficiency
(>=80 PASS / >=50 WARN / else FAIL).
Returns {"rows": [(dtype, achieved, threshold, status), ...], "verdict": str}.
"""
comp = results.get("compute", results)
per_dtype = comp.get("per_dtype_tflops", {})
thresholds = comp.get("pass_thresholds_tflops", {}) or {}
eff = comp.get("efficiency_pct", {})
rank = {"PASS": 0, "WARN": 1, "FAIL": 2, "SKIP": 0}
rows, verdict = [], "PASS"
for dt, val in per_dtype.items():
thr = thresholds.get(dt)
if isinstance(val, str):
status = "SKIP"
elif thr:
status = "PASS" if val >= thr else ("WARN" if val >= thr * 0.9 else "FAIL")
else:
e = eff.get(dt, 0)
status = "PASS" if e >= 80 else ("WARN" if e >= 50 else "FAIL")
rows.append((dt, val, thr, status))
if rank[status] > rank[verdict]:
verdict = status
return {"rows": rows, "verdict": verdict}
@staticmethod
def print_compute_verdict(results: dict, console: Console = None) -> str:
"""Print the PASS/WARN/FAIL table for compute results; return the verdict."""
c = console or Console()
judged = Benchmark.judge_compute(results)
color = {"PASS": "green", "WARN": "yellow", "FAIL": "red", "SKIP": "dim"}
c.print("\n[bold cyan]Compute Verdict (vs thresholds)[/bold cyan]")
for dt, val, thr, status in judged["rows"]:
val_s = f"{val:.1f}" if isinstance(val, (int, float)) else str(val)
thr_s = f">= {thr}" if thr else "(efficiency)"
c.print(f" {dt.upper():>4}: {val_s:>8} {thr_s:<12} [{color[status]}]{status}[/{color[status]}]")
v = judged["verdict"]
c.print(f" [bold]VERDICT: [{color[v]}]{v}[/{color[v]}][/bold]")
return v
def _run_cli() -> None:
"""`python -m modules.benchmark` — run ONLY the compute-throughput benchmark."""
import argparse
from pathlib import Path
import yaml
repo_root = Path(__file__).resolve().parent.parent
parser = argparse.ArgumentParser(description="Run the compute-throughput benchmark only.")
parser.add_argument("--config", default=str(repo_root / "configs" / "default.yaml"),
help="path to config YAML (default: configs/default.yaml)")
parser.add_argument("--json", action="store_true", help="also print raw JSON of the compute results")
args = parser.parse_args()
with open(args.config) as f:
config = yaml.safe_load(f) or {}
results = Benchmark(config).run_compute_benchmark()
Benchmark.print_results(results)
Benchmark.print_compute_verdict(results)
if args.json:
print("JSON_RESULT:" + json.dumps(results["compute"]))
if __name__ == "__main__":
_run_cli()

View File

@ -6,14 +6,11 @@ import subprocess
from typing import List, Optional
# GPU name patterns -> internal key mapping
# Order matters: longer/more-specific patterns must come before shorter ones.
GPU_NAME_PATTERNS = {
"A100": "a100",
"A800": "a800",
"H100": "h100",
"H800": "h800", # H800 = H100 SXM with NVLink halved (400 GB/s) and FP64 restricted
"H200": "h200",
"H20": "h20", # H20 / H20-3e is the China-compliance export variant, REDUCED peaks
"B200": "b200",
"B300": "b300",
}
@ -21,10 +18,6 @@ GPU_NAME_PATTERNS = {
# Specs database — ALL values are DENSE (non-sparse) TFLOPS
GPU_SPECS = {
"h100": {
# Peaks below are NVIDIA marketing dense peaks (theoretical Tensor Core max).
# `compute_pass_thresholds_tflops` carries the absolute PASS thresholds used
# by report.py — decoupled from peaks so marketing-spec changes (dense vs
# sparse vs FP8-sparsity) don't shift the validation bar.
"full_name": "NVIDIA H100 SXM5",
"architecture": "Hopper",
"compute_capability": 9.0,
@ -36,18 +29,6 @@ GPU_SPECS = {
"fp16_tflops": 990, # dense (1979 sparse w/ 2:4)
"bf16_tflops": 990, # dense
"fp8_tflops": 1979, # dense
"compute_pass_thresholds_tflops": {
# Recalibrated 2026-05-25 to the H100 eager-cuBLAS achievable floor (each
# threshold ~2-4% below the sustained value measured across 16 GPUs via the
# MAMF shape sweep: fp32 ~52 / tf32 ~405 / fp16 ~732-748 / bf16 ~747-758 /
# fp8 ~1248-1271). The old marketing/MAMF-derived values (fp32 54, tf32 444,
# fp16 734, bf16 745, fp8 1400) sat ON or ABOVE what PyTorch cuBLAS reaches
# on H100, so healthy cards flaked to WARN/FAIL. fp8 1400 in particular was
# an H200/rowwise-scaling figure; H100 tensorwise _scaled_mm tops out ~1310.
"fp32": 50, "tf32": 385, "fp16": 720, "bf16": 730, "fp8": 1200,
# FP64 63 / INT8 1536 — listed for documentation; benchmark module
# doesn't currently exercise these dtypes.
},
"tdp_watts": 700,
"nvlink_gen": 4,
"nvlink_bandwidth_gbps": 900, # bidirectional
@ -67,70 +48,6 @@ GPU_SPECS = {
"fp16_tflops": 990, # dense
"bf16_tflops": 990, # dense
"fp8_tflops": 1979, # dense
# PASS thresholds aligned with H200_production_acceptance.md v2 (2026-05-21):
# calibrated against Semianalysis & stas00 MAMF — H200 shares H100 SMs so
# achievable TFLOPS in PyTorch is in the same band.
"compute_pass_thresholds_tflops": {
"fp32": 50, "tf32": 400, "fp16": 720, "bf16": 720, "fp8": 1400,
},
"tdp_watts": 700,
"nvlink_gen": 4,
"nvlink_bandwidth_gbps": 900,
"pcie_gen": 5,
"min_driver_version": "545",
"min_cuda_version": "12.4",
},
"h800": {
# H800 = China-compliance export variant of H100 SXM5. SAME chip / SMs /
# clocks / HBM as H100 SXM5 — Tensor Core peaks (FP16 / BF16 / FP8 / TF32 /
# FP32) are identical to H100. Two restrictions vs H100:
# 1. NVLink bandwidth halved: 400 GB/s bidirectional (vs H100 900 GB/s)
# 2. FP64 throughput severely cut to ~1 TFLOPS (vs H100 34/67 TFLOPS)
# All other interfaces (PCIe Gen5, NVSwitch, HBM3 80GB @ 3.35 TB/s) match H100.
# NCCL multi-GPU thresholds MUST be downscaled because NVLink BW is halved.
"full_name": "NVIDIA H800 SXM5",
"architecture": "Hopper",
"compute_capability": 9.0,
"hbm_capacity_gb": 80,
"hbm_type": "HBM3",
"memory_bandwidth_gbps": 3350, # GB/s (3.35 TB/s) — same as H100 SXM
"fp32_tflops": 67,
"tf32_tflops": 495, # dense (same as H100)
"fp16_tflops": 990, # dense (same as H100)
"bf16_tflops": 990, # dense (same as H100)
"fp8_tflops": 1979, # dense (same as H100)
# Tensor Core peaks identical to H100, so PASS thresholds reuse the H100
# eager-cuBLAS calibration (2026-05-25). Measured on 8×H800: fp32 ~52 /
# tf32 ~420 / fp16 ~741 / bf16 ~745 / fp8 ~1249 — all clear these. fp8 was
# 1400 (an H200/rowwise-scaling figure) which PyTorch tensorwise _scaled_mm
# can't reach on H100-class silicon (~1310 ceiling); lowered to 1200 to match
# h100. FP64 deliberately NOT listed — H800 is restricted to ~1 TFLOPS FP64.
"compute_pass_thresholds_tflops": {
"fp32": 50, "tf32": 385, "fp16": 720, "bf16": 730, "fp8": 1200,
},
"tdp_watts": 700,
"nvlink_gen": 4,
"nvlink_bandwidth_gbps": 400, # bidirectional — HALF of H100 (export restriction)
"pcie_gen": 5,
"min_driver_version": "535",
"min_cuda_version": "12.1",
},
"h20": {
# China-compliance export variant of H200 (reported as "H20" / "H20-3e" by nvidia-smi).
# Same silicon family / HBM as H200, but Tensor Core peaks are throttled.
# Peaks below are sourced from supplier / NVIDIA China and confirmed against
# measured throughput on 8x H20-3e (FP16 ~741, BF16 ~770, FP8 ~1328 TFLOPS).
"full_name": "NVIDIA H20 / H20-3e",
"architecture": "Hopper",
"compute_capability": 9.0,
"hbm_capacity_gb": 141,
"hbm_type": "HBM3e",
"memory_bandwidth_gbps": 4800,
"fp32_tflops": 54, # China spec (matches measured ~51-52)
"tf32_tflops": 372, # ~75% of H200 (matches measured ~362)
"fp16_tflops": 744, # dense, China spec
"bf16_tflops": 739, # dense, China spec
"fp8_tflops": 1420, # dense, China spec
"tdp_watts": 700,
"nvlink_gen": 4,
"nvlink_bandwidth_gbps": 900,
@ -229,7 +146,6 @@ _UNKNOWN_SPECS = {
"fp16_tflops": 0,
"bf16_tflops": 0,
"fp8_tflops": 0,
"compute_pass_thresholds_tflops": {}, # empty => report.py falls back to 80% of peak
"tdp_watts": 700,
"nvlink_gen": 0,
"nvlink_bandwidth_gbps": 0,
@ -256,10 +172,9 @@ def detect_gpu_type() -> str:
if r.returncode != 0:
return "unknown"
first_line = r.stdout.strip().splitlines()[0].strip().upper()
# Iterate longest-pattern-first so "H200" doesn't get matched by "H20".
for pattern, key in sorted(GPU_NAME_PATTERNS.items(), key=lambda kv: -len(kv[0])):
if pattern in first_line:
first_line = r.stdout.strip().splitlines()[0].strip()
for pattern, key in GPU_NAME_PATTERNS.items():
if pattern in first_line.upper():
return key
return "unknown"
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):

View File

@ -122,7 +122,7 @@ class HealthCheck:
pcie_ok = pw >= 8 # unknown GPU: just check width
if not pcie_ok:
overall_pass = False
checks["pcie_link"] = {"gen": pg, "width": pw, "status": "PASS" if pcie_ok else "WARN"}
checks["pcie_link"] = {"gen": pg, "width": pw, "status": "PASS" if pcie_ok else "FAIL"}
sm = self._safe_int(clock_sms[i] if i < len(clock_sms) else 0)
mm = self._safe_int(clock_mems[i] if i < len(clock_mems) else 0)

View File

@ -1,533 +0,0 @@
"""Multi-node NCCL benchmark wrapper for nccl-tests via mpirun."""
import json
import os
import re
import shutil
import subprocess
from datetime import datetime
from typing import Optional
from rich.console import Console
from rich.table import Table
from modules.gpu_specs import resolve_tools_dir
_TEST_ALIASES = {
"allreduce": "all_reduce_perf",
"all_reduce": "all_reduce_perf",
"all_reduce_perf": "all_reduce_perf",
"allgather": "all_gather_perf",
"all_gather": "all_gather_perf",
"all_gather_perf": "all_gather_perf",
"alltoall": "alltoall_perf",
"all_to_all": "alltoall_perf",
"alltoall_perf": "alltoall_perf",
"broadcast": "broadcast_perf",
"broadcast_perf": "broadcast_perf",
"reducescatter": "reduce_scatter_perf",
"reduce_scatter": "reduce_scatter_perf",
"reduce_scatter_perf": "reduce_scatter_perf",
"sendrecv": "sendrecv_perf",
"send_recv": "sendrecv_perf",
"sendrecv_perf": "sendrecv_perf",
}
_OP_LABELS = {
"all_reduce_perf": "allreduce",
"all_gather_perf": "allgather",
"alltoall_perf": "alltoall",
"broadcast_perf": "broadcast",
"reduce_scatter_perf": "reducescatter",
"sendrecv_perf": "sendrecv",
}
class MultiNodeNCCLTest:
"""Run cross-node NCCL tests with a PDF-style message-size sweep."""
def __init__(self, config: dict):
self.config = config
self.cfg = config.get("multinode_nccl", {}) or {}
self.tools_dir = resolve_tools_dir(config)
self.console = Console()
self.artifact_dir = os.environ.get("MULTINODE_NCCL_ARTIFACT_DIR") or self.cfg.get("artifact_dir")
def _find_mpirun(self) -> Optional[str]:
configured = self.cfg.get("mpirun_path")
if configured and os.path.isfile(str(configured)) and os.access(str(configured), os.X_OK):
return str(configured)
for cmd in ["mpirun", "mpiexec", os.path.join(self.tools_dir, "mpi", "bin", "mpirun")]:
found = shutil.which(cmd)
if found:
return found
return None
def _find_nccl_test(self, binary_name: str) -> Optional[str]:
configured = self.cfg.get("nccl_tests_dir")
candidates = []
if configured:
candidates.append(os.path.join(configured, binary_name))
candidates.append(os.path.join(self.tools_dir, "nccl-tests", "build", binary_name))
found = shutil.which(binary_name)
if found:
candidates.insert(0, found)
for path in candidates:
if path and os.path.isfile(path) and os.access(path, os.X_OK):
return path
return None
def _tests(self) -> list[str]:
configured = self.cfg.get("tests") or ["all_reduce_perf", "alltoall_perf"]
tests = []
for name in configured:
binary = _TEST_ALIASES.get(str(name).lower())
if binary and binary not in tests:
tests.append(binary)
return tests
def _hosts(self) -> list[dict]:
hosts = self.cfg.get("hosts") or []
normalized = []
for host in hosts:
if isinstance(host, str):
normalized.append({"addr": host, "slots": 8})
elif isinstance(host, dict):
normalized.append({
"name": host.get("name") or host.get("addr"),
"addr": host.get("addr") or host.get("host") or host.get("ip"),
"slots": int(host.get("slots", 8)),
})
return [h for h in normalized if h.get("addr")]
def _topologies(self) -> list[dict]:
topologies = self.cfg.get("topologies") or [{"nodes": 2, "gpus_per_node": 8}]
normalized = []
for topo in topologies:
nodes = int(topo.get("nodes", 2))
gpus_per_node = int(topo.get("gpus_per_node", topo.get("gpn", 8)))
normalized.append({
"nodes": nodes,
"gpus_per_node": gpus_per_node,
"label": topo.get("label") or f"{nodes} nodes x {gpus_per_node} GPUs",
"cuda_visible_devices": topo.get("cuda_visible_devices"),
"env": topo.get("env") or {},
"op_env": topo.get("op_env") or topo.get("test_env") or {},
"min_peak_busbw_gbps": topo.get("min_peak_busbw_gbps"),
})
return normalized
def _env_exports(self, topo: dict = None, label: str = None, binary: str = None) -> list[tuple[str, str]]:
env_cfg = {
"NCCL_DEBUG": self.cfg.get("debug", "WARN"),
"NCCL_SOCKET_IFNAME": self.cfg.get("socket_ifname"),
"NCCL_IB_GID_INDEX": self.cfg.get("ib_gid_index"),
"NCCL_IB_SL": self.cfg.get("ib_sl"),
"NCCL_IB_TC": self.cfg.get("ib_tc"),
"NCCL_IB_HCA": self.cfg.get("ib_hca"),
"NCCL_IB_TIMEOUT": self.cfg.get("ib_timeout"),
"NCCL_IB_QPS_PER_CONNECTION": self.cfg.get("qps_per_connection"),
"NCCL_MIN_NCHANNELS": self.cfg.get("min_nchannels"),
"NCCL_NET_PLUGIN": self.cfg.get("net_plugin"),
"NCCL_NVLS_ENABLE": self.cfg.get("nvls_enable"),
"NCCL_IB_SPLIT_DATA_ON_QPS": self.cfg.get("split_data_on_qps"),
}
mpi_ld_preload = self._mpi_ld_preload()
if mpi_ld_preload:
env_cfg["LD_PRELOAD"] = mpi_ld_preload
extra_ld_library_path = self._extra_ld_library_path()
if extra_ld_library_path:
existing = os.environ.get("LD_LIBRARY_PATH", "")
env_cfg["LD_LIBRARY_PATH"] = ":".join(
[extra_ld_library_path] + ([existing] if existing else [])
)
extra_env = self.cfg.get("extra_env") or {}
if isinstance(extra_env, dict):
self._merge_env(env_cfg, extra_env)
if topo:
if topo.get("cuda_visible_devices"):
env_cfg["CUDA_VISIBLE_DEVICES"] = str(topo["cuda_visible_devices"])
if isinstance(topo.get("env"), dict):
self._merge_env(env_cfg, topo["env"])
op_env = topo.get("op_env")
if isinstance(op_env, dict):
for key in (label, binary):
overrides = op_env.get(key)
if isinstance(overrides, dict):
self._merge_env(env_cfg, overrides)
return [(k, str(v)) for k, v in env_cfg.items() if v is not None]
@staticmethod
def _merge_env(env_cfg: dict, overrides: dict):
for key, value in overrides.items():
key = str(key)
if value is None:
env_cfg.pop(key, None)
else:
env_cfg[key] = str(value)
def _mpi_ld_preload(self) -> str:
preload = self.cfg.get("mpi_ld_preload")
if isinstance(preload, list):
return " ".join(str(p) for p in preload if p)
return str(preload) if preload else ""
def _runtime_env(self) -> dict:
env = os.environ.copy()
mpi_ld_preload = self._mpi_ld_preload()
if mpi_ld_preload:
env["LD_PRELOAD"] = mpi_ld_preload
extra_ld_library_path = self._extra_ld_library_path()
if extra_ld_library_path:
existing = env.get("LD_LIBRARY_PATH", "")
env["LD_LIBRARY_PATH"] = ":".join(
[extra_ld_library_path] + ([existing] if existing else [])
)
return env
def _extra_ld_library_path(self) -> str:
paths = self.cfg.get("extra_ld_library_path")
if isinstance(paths, list):
return ":".join(str(p) for p in paths if p)
return str(paths) if paths else ""
def _preflight(self, mpirun: Optional[str], tests: list[str], hosts: list[dict]) -> dict:
checks = []
checks.append({"name": "mpirun", "status": "PASS" if mpirun else "FAIL", "detail": mpirun or "not found"})
checks.append({"name": "hosts", "status": "PASS" if len(hosts) >= 2 else "FAIL", "detail": f"{len(hosts)} configured"})
for binary in tests:
path = self._find_nccl_test(binary)
checks.append({"name": binary, "status": "PASS" if path else "FAIL", "detail": path or "not found"})
if self.cfg.get("ssh_preflight", True):
user = self.cfg.get("ssh_user", "root")
for host in hosts:
target = f"{user}@{host['addr']}"
cmd = [
"ssh",
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=5",
"-o", "StrictHostKeyChecking=accept-new",
target,
"hostname",
]
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=8, env=self._runtime_env())
detail = r.stdout.strip() or r.stderr.strip()[:120]
checks.append({
"name": f"ssh {host['addr']}",
"status": "PASS" if r.returncode == 0 else "WARN",
"detail": detail,
})
except Exception as e:
checks.append({"name": f"ssh {host['addr']}", "status": "WARN", "detail": str(e)})
return {
"checks": checks,
"passed": all(c["status"] == "PASS" for c in checks if not c["name"].startswith("ssh ")),
}
def run(self) -> dict:
mpirun = self._find_mpirun()
tests = self._tests()
hosts = self._hosts()
topologies = self._topologies()
preflight = self._preflight(mpirun, tests, hosts)
if not preflight["passed"]:
return {
"passed": False,
"source": "nccl-tests-mpirun",
"mode": self.cfg.get("mode", "sweep"),
"hosts": hosts,
"preflight": preflight,
"tests": {},
"error": "multinode NCCL preflight failed",
"timestamp": datetime.now().isoformat(),
}
results = {}
for binary in tests:
label = _OP_LABELS[binary]
binary_path = self._find_nccl_test(binary)
op_results = []
for topo in topologies:
op_results.append(self._run_topology(mpirun, binary_path, label, hosts, topo))
results[label] = {"binary": binary_path, "topologies": op_results}
passed = all(
topo.get("status") == "PASS"
for op in results.values()
for topo in op.get("topologies", [])
)
return {
"passed": passed,
"source": "nccl-tests-mpirun",
"mode": self.cfg.get("mode", "sweep"),
"hosts": hosts,
"preflight": preflight,
"tests": results,
"artifact_dir": self.artifact_dir,
"timestamp": datetime.now().isoformat(),
}
def _run_topology(self, mpirun: str, binary: str, label: str, hosts: list[dict], topo: dict) -> dict:
nodes = topo["nodes"]
gpus_per_node = topo["gpus_per_node"]
selected_hosts = hosts[:nodes]
host_arg = ",".join(f"{h['addr']}:{gpus_per_node}" for h in selected_hosts)
ranks = nodes * gpus_per_node
cmd = [
mpirun,
"--allow-run-as-root",
"--mca", "btl_openib_warn_no_device_params_found", "0",
"--mca", "btl_tcp_if_include", str(self.cfg.get("socket_ifname", "bond0")),
"--mca", "oob_tcp_if_include", str(self.cfg.get("oob_tcp_ifname", self.cfg.get("socket_ifname", "bond0"))),
"-H", host_arg,
"--map-by", f"ppr:{gpus_per_node}:node",
"-np", str(ranks),
]
plm_rsh_args = self.cfg.get("plm_rsh_args")
if plm_rsh_args:
cmd.extend(["--mca", "plm_rsh_args", str(plm_rsh_args)])
for key, value in self._env_exports(topo=topo, label=label, binary=os.path.basename(binary)):
cmd.extend(["-x", f"{key}={value}"])
cmd.extend([
binary,
"-b", str(self.cfg.get("begin_size", "1k")),
"-e", str(self.cfg.get("end_size", "16g")),
"-g", str(self.cfg.get("gpus_per_rank", 1)),
"-f", str(self.cfg.get("step_factor", 2)),
"-w", str(self.cfg.get("warmup_iters", 10)),
])
if self.cfg.get("iters") is not None:
cmd.extend(["-n", str(self.cfg["iters"])])
timeout = int(self.cfg.get("timeout_sec", 1800))
started = datetime.now().isoformat()
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, env=self._runtime_env())
except subprocess.TimeoutExpired:
result = {
"label": topo["label"],
"nodes": nodes,
"gpus_per_node": gpus_per_node,
"ranks": ranks,
"hosts": selected_hosts,
"command": " ".join(cmd),
"status": "FAIL",
"error": f"timeout after {timeout}s",
"started_at": started,
}
self._write_artifacts(label, topo, result, "", "")
return result
parsed = self._parse_nccl_output(r.stdout)
net_diag = self._parse_network_diagnostics(r.stdout + "\n" + r.stderr)
threshold = self._threshold_for(label, topo)
wrong = sum(row.get("wrong", 0) for row in parsed["by_size"])
has_bw = parsed["peak_busbw_gbps"] > 0
status = "PASS" if r.returncode == 0 and has_bw and wrong == 0 and parsed["peak_busbw_gbps"] >= threshold else "FAIL"
result = {
"label": topo["label"],
"nodes": nodes,
"gpus_per_node": gpus_per_node,
"ranks": ranks,
"hosts": selected_hosts,
"cuda_visible_devices": topo.get("cuda_visible_devices"),
"command": " ".join(cmd),
"returncode": r.returncode,
"status": status,
"peak_busbw_gbps": parsed["peak_busbw_gbps"],
"peak_algbw_gbps": parsed["peak_algbw_gbps"],
"peak_size": parsed["peak_size"],
"avg_busbw_gbps": parsed["avg_busbw_gbps"],
"min_required_gbps": threshold,
"wrong_count": wrong,
"network": net_diag,
"by_size": parsed["by_size"],
"stderr_tail": r.stderr[-1200:],
"stdout_tail": r.stdout[-1200:],
"started_at": started,
"finished_at": datetime.now().isoformat(),
}
self._write_artifacts(label, topo, result, r.stdout, r.stderr)
return result
def _write_artifacts(self, label: str, topo: dict, result: dict, stdout: str, stderr: str):
if not self.artifact_dir:
return
os.makedirs(self.artifact_dir, exist_ok=True)
prefix = _safe_name(f"{label}_{topo.get('nodes')}x{topo.get('gpus_per_node')}_{topo.get('label')}")
base = os.path.join(self.artifact_dir, prefix)
with open(base + ".cmd.txt", "w") as f:
f.write(result.get("command", ""))
f.write("\n")
with open(base + ".stdout.txt", "w") as f:
f.write(stdout)
with open(base + ".stderr.txt", "w") as f:
f.write(stderr)
artifact_result = {k: v for k, v in result.items() if k not in ("stdout_tail", "stderr_tail")}
with open(base + ".json", "w") as f:
json.dump(artifact_result, f, indent=2, default=str)
result["artifact_prefix"] = base
def _threshold_for(self, label: str, topo: dict = None) -> float:
if topo and topo.get("min_peak_busbw_gbps") is not None:
topo_thresholds = topo.get("min_peak_busbw_gbps")
if isinstance(topo_thresholds, dict):
return float(topo_thresholds.get(label, 0) or 0)
return float(topo_thresholds or 0)
thresholds = self.cfg.get("min_peak_busbw_gbps") or {}
if isinstance(thresholds, dict):
op_threshold = thresholds.get(label, 0)
if isinstance(op_threshold, dict):
keys = []
if topo:
keys.extend([
topo.get("label"),
f"{topo.get('nodes')}x{topo.get('gpus_per_node')}",
f"{topo.get('nodes')} nodes x {topo.get('gpus_per_node')} GPUs",
str(topo.get("gpus_per_node")),
])
keys.append("default")
for key in keys:
if key in op_threshold:
return float(op_threshold.get(key) or 0)
return 0.0
return float(op_threshold or 0)
return float(thresholds or 0)
@staticmethod
def _parse_nccl_output(stdout: str) -> dict:
rows = []
avg_bus = 0.0
for line in stdout.splitlines():
stripped = line.strip()
if not stripped:
continue
avg_match = re.search(r"Avg bus bandwidth\s*:\s*([0-9.]+)", stripped)
if avg_match:
avg_bus = float(avg_match.group(1))
continue
if stripped.startswith("#"):
continue
parts = stripped.split()
if len(parts) < 9:
continue
try:
size_bytes = int(parts[0])
time_us = float(parts[5])
algbw = float(parts[6])
busbw = float(parts[7])
wrong = int(parts[8])
except (ValueError, IndexError):
continue
rows.append({
"size_bytes": size_bytes,
"size": _format_size(size_bytes),
"time_us": time_us,
"algbw_gbps": algbw,
"busbw_gbps": busbw,
"wrong": wrong,
})
peak_row = max(rows, key=lambda r: r["busbw_gbps"], default={})
return {
"peak_busbw_gbps": round(float(peak_row.get("busbw_gbps", 0)), 2),
"peak_algbw_gbps": round(float(peak_row.get("algbw_gbps", 0)), 2),
"peak_size": peak_row.get("size", ""),
"avg_busbw_gbps": round(avg_bus, 2),
"by_size": rows,
}
@staticmethod
def _parse_network_diagnostics(output: str) -> dict:
networks = sorted(set(re.findall(r"Using network (\S+)", output)))
gdr_enabled = sorted(set(re.findall(r"GPU Direct RDMA Enabled for HCA \d+ '([^']+)'", output)))
gdr_disabled = sorted(set(re.findall(r"GPU Direct RDMA Disabled for HCA \d+ '([^']+)'", output)))
ib_using = []
for line in output.splitlines():
if "NET/IB : Using" in line:
text = line.split("NET/IB : ", 1)[-1].strip()
if text not in ib_using:
ib_using.append(text)
if gdr_disabled:
gdr_state = "DISABLED"
elif gdr_enabled or "/GDRDMA" in output:
gdr_state = "ENABLED"
elif networks:
gdr_state = "NOT_DISABLED_IN_LOG"
else:
gdr_state = "UNKNOWN"
return {
"networks": networks,
"ib_using": ib_using[:8],
"gdr_enabled_hcas": gdr_enabled,
"gdr_disabled_hcas": gdr_disabled,
"gpu_direct_rdma": gdr_state,
}
@staticmethod
def print_results(results: dict, console: Console = None):
c = console or Console()
if results.get("error"):
c.print(f"[bold red]Multi-node NCCL failed: {results['error']}[/bold red]")
else:
c.print("[bold green]Multi-node NCCL complete[/bold green]" if results.get("passed") else "[bold red]Multi-node NCCL failed[/bold red]")
preflight = results.get("preflight", {})
if preflight.get("checks"):
table = Table(title="Preflight")
table.add_column("Check")
table.add_column("Status")
table.add_column("Detail")
for check in preflight["checks"]:
table.add_row(check["name"], check["status"], str(check.get("detail", "")))
c.print(table)
for op, data in (results.get("tests") or {}).items():
table = Table(title=f"Multi-node NCCL {op}")
table.add_column("Topology")
table.add_column("Peak Bus BW")
table.add_column("Peak Size")
table.add_column("Threshold")
table.add_column("Status")
for topo in data.get("topologies", []):
table.add_row(
topo.get("label", ""),
f"{topo.get('peak_busbw_gbps', 0):.2f} GB/s",
str(topo.get("peak_size", "")),
f">= {_format_gbps(topo.get('min_required_gbps', 0))} GB/s" if topo.get("min_required_gbps") else "-",
topo.get("status", "?"),
)
c.print(table)
def _format_size(size_bytes: int) -> str:
units = [("G", 1024 ** 3), ("M", 1024 ** 2), ("K", 1024)]
for suffix, factor in units:
if size_bytes >= factor and size_bytes % factor == 0:
return f"{size_bytes // factor}{suffix}"
return str(size_bytes)
def _format_gbps(value) -> str:
try:
numeric = float(value)
except (TypeError, ValueError):
return str(value)
if numeric.is_integer():
return f"{numeric:.0f}"
return f"{numeric:.2f}"
def _safe_name(value: str) -> str:
text = re.sub(r"[^A-Za-z0-9_.-]+", "_", value.strip())
text = re.sub(r"_+", "_", text).strip("_")
return text[:160] or "case"

View File

@ -23,20 +23,6 @@ except ImportError:
pass
# Per-operation bandwidth thresholds, as a fraction of NVLink bidirectional BW.
# Values aligned with the H100 production acceptance criteria (acceptance doc §5).
# AllToAll runs ~10-20% lower than AllReduce on 8-GPU NVSwitch, so its fraction is
# set lower; broadcast/sendrecv sit between.
_OP_BW_FRACTIONS = {
"allreduce": 0.45,
"allgather": 0.45,
"reducescatter": 0.45,
"broadcast": 0.40,
"sendrecv": 0.40,
"alltoall": 0.35,
}
class NCCLTest:
def __init__(self, config: dict):
@ -94,17 +80,12 @@ class NCCLTest:
tests.append(("sendrecv_perf", "SendRecv"))
nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0)
# User-provided override applies uniformly across all ops; otherwise
# each op gets its own threshold from _OP_BW_FRACTIONS.
user_override = self.nccl_cfg.get("min_bandwidth_gbps")
def threshold_for(label: str) -> float:
if user_override:
return float(user_override)
if nvlink_bw <= 0:
return 10.0 # conservative floor
frac = _OP_BW_FRACTIONS.get(label.lower(), 0.45)
return round(nvlink_bw * frac)
if nvlink_bw > 0:
default_min_bw = nvlink_bw * 0.4
else:
# Conservative floor: any working NVLink should exceed 10 GB/s
default_min_bw = 10
min_bw = self.nccl_cfg.get("min_bandwidth_gbps") or round(default_min_bw)
if self.gpu_type == "unknown":
self.console.print("[yellow]Unknown GPU — using conservative bandwidth thresholds[/yellow]")
@ -122,9 +103,8 @@ class NCCLTest:
for binary, label in tests:
progress.update(task, description=f"NCCL {label}...")
op_min_bw = threshold_for(label)
result = self._run_one_nccl_test_direct(
binary, label, gpu_count, op_min_bw
binary, label, gpu_count, min_bw
)
if result.get("status") not in ("SKIP", None) and "error" not in result:
any_binary_worked = True
@ -134,7 +114,7 @@ class NCCLTest:
mpirun = self._find_mpirun()
if mpirun:
result = self._run_one_nccl_test_mpirun(
binary, label, gpu_count, mpirun, op_min_bw
binary, label, gpu_count, mpirun, min_bw
)
if result.get("status") not in ("SKIP", None) and "error" not in result:
any_binary_worked = True
@ -154,9 +134,7 @@ class NCCLTest:
return {
"passed": all_passed,
"source": "nccl-tests",
"min_bandwidth_gbps": {
lbl.lower(): threshold_for(lbl) for _, lbl in tests
},
"min_bandwidth_gbps": min_bw,
"tests": results,
"gpu_count": gpu_count,
"timestamp": datetime.now().isoformat(),

View File

@ -3,7 +3,6 @@
import os
import shutil
import subprocess
import time
from datetime import datetime
from typing import Optional, List
@ -38,69 +37,15 @@ class RDMATest:
ports = sorted(os.listdir(ports_dir))
return ports
@staticmethod
def _read_sys(path: str) -> str:
try:
with open(path) as f:
return f.read().strip()
except (FileNotFoundError, PermissionError, OSError):
return ""
def run(self) -> dict:
devices = self._get_ib_devices()
if not devices:
self.console.print(
"[yellow]No InfiniBand devices found — skipping RDMA test[/yellow]"
)
return {
"status": "SKIP", "skipped": True,
"reason": "no IB hardware detected",
"timestamp": datetime.now().isoformat(),
}
# Only consider ports whose link_layer is InfiniBand — Ethernet
# bond/management interfaces (e.g. mlx5_bond_0) can show ACTIVE state
# without actually providing IB fabric connectivity.
ib_devices = []
active_ib_port = False
for dev in devices:
for port in self._get_ib_ports(dev):
link_layer = self._read_sys(
f"/sys/class/infiniband/{dev}/ports/{port}/link_layer")
if link_layer != "InfiniBand":
continue
ib_devices.append((dev, port))
state = self._read_sys(
f"/sys/class/infiniband/{dev}/ports/{port}/state")
if "ACTIVE" in state.upper():
active_ib_port = True
device_info = self._collect_device_info(devices)
if not ib_devices:
self.console.print(
"[yellow]No InfiniBand-link_layer ports present — "
"skipping RDMA benchmarks[/yellow]"
)
return {
"status": "SKIP", "skipped": True,
"reason": "no InfiniBand link_layer ports (only Ethernet/RoCE)",
"devices": device_info,
"timestamp": datetime.now().isoformat(),
}
if not active_ib_port:
self.console.print(
f"[yellow]{len(ib_devices)} IB port(s) detected but all DOWN — "
f"fabric not wired, skipping RDMA benchmarks[/yellow]"
)
return {
"status": "SKIP", "skipped": True,
"reason": f"{len(ib_devices)} IB port(s) found but all DOWN (fabric not wired)",
"devices": device_info,
"timestamp": datetime.now().isoformat(),
}
self.console.print("[yellow]No InfiniBand devices found[/yellow]")
return {"error": "no_ib_devices", "passed": False}
self.console.print(f"[cyan]RDMA Test - Devices: {', '.join(devices)}[/cyan]")
device_info = self._collect_device_info(devices)
bw_results = self._run_bandwidth_tests(devices)
latency_results = self._run_latency_tests(devices)
@ -110,17 +55,13 @@ class RDMATest:
if isinstance(r, dict)
)
result = {
return {
"passed": all_passed,
"devices": device_info,
"bandwidth_tests": bw_results,
"latency_tests": latency_results,
"timestamp": datetime.now().isoformat(),
}
# Cross-node (two-host) RDMA, run only when a peer is configured.
if (self.rdma_cfg.get("cross_node", {}) or {}).get("enabled"):
result["cross_node"] = self.run_cross_node()
return result
def _collect_device_info(self, devices: List[str]) -> List[dict]:
info = []
@ -257,207 +198,9 @@ class RDMATest:
return results
# ------------------------------------------------------------------
# Cross-node (two-host) RDMA over perftest, orchestrated via SSH.
# Runs FROM the client host: for each IB device it launches the matching
# perftest server on the peer over SSH (held open in a live ssh channel),
# then runs the local client against the peer's OOB address and parses the
# result. Replaces the old standalone scripts/rdma_cross_node.sh.
# ------------------------------------------------------------------
def _active_ib_devices(self) -> List[str]:
"""IB devices whose port 1 is InfiniBand link_layer and ACTIVE."""
out = []
for dev in self._get_ib_devices():
for port in self._get_ib_ports(dev):
ll = self._read_sys(f"/sys/class/infiniband/{dev}/ports/{port}/link_layer")
st = self._read_sys(f"/sys/class/infiniband/{dev}/ports/{port}/state")
if ll == "InfiniBand" and "ACTIVE" in st.upper():
out.append(dev)
break
return out
def run_cross_node(self) -> dict:
cn = self.rdma_cfg.get("cross_node", {}) or {}
if not cn.get("enabled"):
return {"status": "SKIP", "skipped": True,
"reason": "rdma.cross_node.enabled is false"}
server = cn.get("server")
if not server:
return {"status": "SKIP", "skipped": True,
"reason": "rdma.cross_node.server (peer ssh address) not set"}
ssh_user = cn.get("ssh_user", "root")
server_target = server if "@" in server else f"{ssh_user}@{server}"
# OOB address the client's perftest connects to (defaults to the ssh host).
server_addr = cn.get("server_addr") or server.split("@")[-1]
ib_port = cn.get("ib_port", 1)
gid_index = cn.get("gid_index")
msg_size = cn.get("msg_size", 1048576)
iters = cn.get("iters", 5000)
base_port = cn.get("base_oob_port", 18515)
warmup = cn.get("server_warmup_sec", 2.0)
min_bw = cn.get("min_bandwidth_gbps", 350)
max_lat = cn.get("max_latency_us", 5)
devices = cn.get("devices") or self._active_ib_devices()
if not devices:
return {"status": "SKIP", "skipped": True,
"reason": "no active InfiniBand devices to test"}
has_bw = self._find_tool("ib_write_bw") is not None
has_lat = self._find_tool("ib_write_lat") is not None
if not has_bw and not has_lat:
return {"status": "SKIP", "skipped": True,
"reason": "perftest (ib_write_bw / ib_write_lat) not installed"}
self.console.print(
f"[cyan]Cross-node RDMA — client → {server_addr}, "
f"devices: {', '.join(devices)}[/cyan]")
per_device = []
for idx, dev in enumerate(devices):
oob = base_port + idx
entry = {"device": dev}
if has_bw:
bw = self._cross_node_perftest(
"ib_write_bw", dev, server_target, server_addr, ib_port,
oob, gid_index, warmup,
extra=["--report_gbits", "-s", str(msg_size), "-n", str(iters)],
parse="bw")
entry["bandwidth_gbps"] = bw
if isinstance(bw, (int, float)):
entry["bw_status"] = "PASS" if bw >= min_bw else "WARN"
else:
entry["bw_status"] = "FAIL"
if has_lat:
lat = self._cross_node_perftest(
"ib_write_lat", dev, server_target, server_addr, ib_port,
oob, gid_index, warmup, extra=[], parse="lat")
if isinstance(lat, dict):
entry["latency_us"] = lat.get("typical")
entry["latency_p99_us"] = lat.get("p99")
t = lat.get("typical")
entry["lat_status"] = ("PASS" if isinstance(t, (int, float)) and 0 < t <= max_lat
else ("WARN" if isinstance(t, (int, float)) else "FAIL"))
else:
entry["latency_us"] = lat
entry["lat_status"] = "FAIL"
per_device.append(entry)
statuses = [e.get(k) for e in per_device for k in ("bw_status", "lat_status") if e.get(k)]
verdict = "PASS"
for s in statuses:
if s == "FAIL":
verdict = "FAIL"
break
if s == "WARN" and verdict == "PASS":
verdict = "WARN"
return {
"status": verdict,
"server": server_addr,
"min_bandwidth_gbps": min_bw,
"max_latency_us": max_lat,
"per_device": per_device,
"timestamp": datetime.now().isoformat(),
}
def _cross_node_perftest(self, tool: str, dev: str, server_target: str,
server_addr: str, ib_port: int, oob_port: int,
gid_index, warmup: float, extra: List[str], parse: str):
"""Start `tool` server on the peer via SSH, run the local client, parse output.
Returns a float (bw, Gb/s), a dict {typical, p99} (lat, µs), or an error string.
"""
tool_path = self._find_tool(tool)
if not tool_path:
return f"{tool} not installed"
flags = ["-d", dev, "-i", str(ib_port), "-p", str(oob_port), "-F"]
if gid_index is not None:
flags += ["-x", str(gid_index)]
flags += extra
server_cmd = " ".join([tool] + flags) # server: no host argument
server_proc = None
try:
server_proc = subprocess.Popen(
["ssh", "-o", "BatchMode=yes", "-o", "StrictHostKeyChecking=no",
server_target, server_cmd],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
time.sleep(warmup) # let the remote server bind before the client connects
client = subprocess.run([tool_path] + flags + [server_addr],
capture_output=True, text=True, timeout=120)
out = client.stdout + "\n" + (client.stderr or "")
return self._parse_perftest_lat(out) if parse == "lat" else self._parse_perftest_bw(out)
except subprocess.TimeoutExpired:
return "timeout"
except Exception as e: # noqa: BLE001
return f"error: {e}"
finally:
if server_proc and server_proc.poll() is None:
server_proc.terminate()
try:
server_proc.wait(timeout=5)
except Exception:
server_proc.kill()
# ib_write_* server normally exits after one run; pkill cleans up a
# leftover one if the client failed mid-handshake. -x matches the exact
# process name so it never kills this ssh command itself.
try:
subprocess.run(
["ssh", "-o", "BatchMode=yes", server_target, f"pkill -x {tool}"],
capture_output=True, timeout=10)
except Exception:
pass
@staticmethod
def _parse_perftest_bw(output: str) -> float:
"""Parse ib_write_bw rows (#bytes #iter BW_peak BW_avg ...); return max BW avg."""
best = 0.0
for line in output.splitlines():
parts = line.split()
if len(parts) >= 4:
try:
int(parts[0]) # #bytes column
best = max(best, float(parts[3])) # BW average[Gb/sec]
except ValueError:
continue
return round(best, 2) if best else 0.0
@staticmethod
def _parse_perftest_lat(output: str) -> dict:
"""Parse ib_write_lat row (#bytes #iter t_min t_max t_typical t_avg ... 99%)."""
for line in output.splitlines():
parts = line.split()
if len(parts) >= 6:
try:
int(parts[0]); int(parts[1])
typical = float(parts[4]) # t_typical[usec]
except ValueError:
continue
p99 = None
if len(parts) >= 8:
try:
p99 = float(parts[7]) # 99% percentile[usec]
except ValueError:
p99 = None
return {"typical": round(typical, 2), "p99": round(p99, 2) if p99 else None}
return {"typical": None, "p99": None}
@staticmethod
def print_results(results: dict, console: Console = None):
c = console or Console()
if results.get("skipped") or results.get("status") == "SKIP":
c.print(f"\n[bold yellow]RDMA/InfiniBand: SKIPPED[/bold yellow] "
f"[dim]({results.get('reason', 'no IB hardware')})[/dim]")
return
if "error" in results:
c.print(f"[bold red]Error: {results['error']}[/bold red]")
return
@ -495,29 +238,3 @@ class RDMATest:
c.print(f" {t['test']}: [{sc}]{status}[/{sc}] "
f"({lat:.2f} us, max: {t.get('max_allowed_us', 'N/A')} us)" if status != "SKIP"
else f" {t['test']}: [dim]SKIPPED[/dim]")
cn = results.get("cross_node")
if cn:
if cn.get("skipped"):
c.print(f"\n [bold]Cross-node RDMA[/bold]: [dim]SKIPPED "
f"({cn.get('reason', '')})[/dim]")
else:
v = cn.get("status", "?")
vc = "green" if v == "PASS" else ("yellow" if v == "WARN" else "red")
c.print(f"\n [bold]Cross-node RDMA[/bold] (server {cn.get('server')}) "
f"[{vc}]{v}[/{vc}] "
f"[dim]min {cn.get('min_bandwidth_gbps')} Gb/s, "
f"max {cn.get('max_latency_us')} µs[/dim]")
for e in cn.get("per_device", []):
bw = e.get("bandwidth_gbps")
lat = e.get("latency_us")
bws = e.get("bw_status", "")
lts = e.get("lat_status", "")
bc = "green" if bws == "PASS" else ("yellow" if bws == "WARN" else "red")
lc = "green" if lts == "PASS" else ("yellow" if lts == "WARN" else "red")
bw_s = f"{bw:.1f} Gb/s" if isinstance(bw, (int, float)) else str(bw)
lat_s = f"{lat:.2f} µs" if isinstance(lat, (int, float)) else str(lat)
p99 = e.get("latency_p99_us")
p99_s = f", p99 {p99:.2f}" if isinstance(p99, (int, float)) else ""
c.print(f" {e['device']}: BW [{bc}]{bw_s}[/{bc}] | "
f"lat [{lc}]{lat_s}[/{lc}]{p99_s}")

View File

@ -274,18 +274,8 @@ class ReportGenerator:
lines.append(f"| D2H (PCIe) | {d2h:.1f} GB/s | {d2h_peak:.0f} GB/s | {d2h_eff:.1f}% |")
lines.append(f"| D2D (NVLink) | {d2d:.1f} GB/s | {d2d_peak:.0f} GB/s | {d2d_eff:.1f}% |")
lines.append("")
# PyTorch fallback can't accurately measure HBM peak (intra-GPU copy_()
# only reaches ~20% of HBM bandwidth). When fallback is used, report
# the number but mark as WARN with a note instead of evaluating as FAIL.
if mem_data.get("source") == "pytorch":
lines.append(
f"**Verdict: WARN** (D2D {d2d:.1f} GB/s via PyTorch fallback; "
"nvbandwidth unavailable — figure is indicative only, not a true HBM peak)\n"
)
else:
# Tightened to match production acceptance: PASS >= 80%, WARN 6080%, FAIL < 60%.
verdict = "PASS" if d2d_eff >= 80 else ("WARN" if d2d_eff >= 60 else "FAIL")
lines.append(f"**Verdict: {verdict}** (D2D efficiency {d2d_eff:.1f}%)\n")
verdict = "PASS" if d2d_eff >= 50 else ("WARN" if d2d_eff >= 30 else "FAIL")
lines.append(f"**Verdict: {verdict}** (D2D efficiency {d2d_eff:.1f}%)\n")
# --- Compute Throughput ---
comp_data = self._extract_compute_results(results)
@ -294,18 +284,9 @@ class ReportGenerator:
per_dtype = comp_data.get("per_dtype_tflops", {})
peak_tflops = comp_data.get("peak_tflops", {})
eff_pct = comp_data.get("efficiency_pct", {})
# Absolute PASS thresholds (TFLOPS) from gpu_specs.compute_pass_thresholds_tflops.
# When present, override the legacy 80%-of-peak rule on a per-dtype basis.
pass_thresholds = comp_data.get("pass_thresholds_tflops", {}) or {}
use_abs = bool(pass_thresholds)
if use_abs:
lines.append("| DType | Achieved (TFLOPS) | Peak | Threshold | Status |")
else:
lines.append("| DType | Achieved (TFLOPS) | Peak | Efficiency | Status |")
lines.append("| DType | Achieved (TFLOPS) | Peak | Efficiency | Status |")
lines.append("|-------|-------------------|------|------------|--------|")
worst_eff = 100.0
overall_status = "PASS"
rank = {"PASS": 0, "WARN": 1, "FAIL": 2, "SKIP": 0}
for dt, val in per_dtype.items():
if isinstance(val, str):
# skipped or error
@ -315,26 +296,11 @@ class ReportGenerator:
ef = eff_pct.get(dt, 0)
if isinstance(ef, (int, float)) and ef > 0:
worst_eff = min(worst_eff, ef)
thr = pass_thresholds.get(dt)
if use_abs and thr:
if val >= thr:
status = "PASS"
elif val >= thr * 0.9:
status = "WARN"
else:
status = "FAIL"
lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | >= {thr} | {status} |")
else:
status = "PASS" if ef >= 80 else ("WARN" if ef >= 50 else "FAIL")
lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | {ef:.1f}% | {status} |")
if rank.get(status, 0) > rank.get(overall_status, 0):
overall_status = status
status = "PASS" if ef >= 80 else ("WARN" if ef >= 50 else "FAIL")
lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | {ef:.1f}% | {status} |")
lines.append("")
if use_abs:
lines.append(f"**Verdict: {overall_status}** (absolute TFLOPS thresholds; worst efficiency {worst_eff:.1f}%)\n")
else:
overall_status = "PASS" if worst_eff >= 80 else ("WARN" if worst_eff >= 50 else "FAIL")
lines.append(f"**Verdict: {overall_status}** (worst efficiency {worst_eff:.1f}%)\n")
overall = "PASS" if worst_eff >= 80 else ("WARN" if worst_eff >= 50 else "FAIL")
lines.append(f"**Verdict: {overall}** (worst efficiency {worst_eff:.1f}%)\n")
# --- NCCL ---
nccl = results.get("nccl")
@ -373,10 +339,7 @@ class ReportGenerator:
# --- RDMA ---
rdma = results.get("rdma")
if rdma and (rdma.get("skipped") or rdma.get("status") == "SKIP"):
lines.append("## RDMA/InfiniBand\n")
lines.append(f"**Overall: SKIP** [{rdma.get('reason', 'no IB hardware detected')}]\n")
elif rdma and not rdma.get("error"):
if rdma and not rdma.get("error"):
lines.append("## RDMA/InfiniBand\n")
bw_tests = rdma.get("bandwidth_tests", [])
lat_tests = rdma.get("latency_tests", [])
@ -468,13 +431,9 @@ class ReportGenerator:
if mem:
if mem.get("error"):
items.append(("Memory Bandwidth", f"ERROR: {mem['error']}"))
elif mem.get("source") == "pytorch":
# PyTorch fallback can't reach HBM peak — report as WARN, not FAIL.
d2d = mem.get("d2d_bandwidth_gbps") or 0
items.append(("Memory Bandwidth", f"WARN ({d2d:.0f} GB/s via PyTorch fallback)"))
else:
eff = mem.get("efficiency_pct") or 0
verdict = "PASS" if eff >= 80 else ("WARN" if eff >= 60 else "FAIL")
verdict = "PASS" if eff >= 80 else ("WARN" if eff >= 50 else "FAIL")
items.append(("Memory Bandwidth", f"{verdict} ({eff:.1f}%)"))
# Compute
@ -483,43 +442,14 @@ class ReportGenerator:
if comp.get("error"):
items.append(("Compute Throughput", f"ERROR: {comp['error']}"))
else:
per_dtype = comp.get("per_dtype_tflops", {})
eff_pct = comp.get("efficiency_pct", {})
pass_thresholds = comp.get("pass_thresholds_tflops", {}) or {}
if pass_thresholds:
# Absolute TFLOPS judgment, mirroring the per-dtype table above.
rank = {"PASS": 0, "WARN": 1, "FAIL": 2}
worst_status = "PASS"
worst_dt = None
for dt, thr in pass_thresholds.items():
val = per_dtype.get(dt)
if not isinstance(val, (int, float)):
continue
if val >= thr:
st = "PASS"
elif val >= thr * 0.9:
st = "WARN"
else:
st = "FAIL"
if rank[st] > rank[worst_status]:
worst_status = st
worst_dt = dt
if worst_dt:
items.append((
"Compute Throughput",
f"{worst_status} (worst {worst_dt.upper()} "
f"{per_dtype[worst_dt]:.0f} vs >= {pass_thresholds[worst_dt]})"
))
else:
items.append(("Compute Throughput", f"{worst_status}"))
valid_effs = [v for v in eff_pct.values() if isinstance(v, (int, float)) and v > 0]
if valid_effs:
worst = min(valid_effs)
verdict = "PASS" if worst >= 80 else ("WARN" if worst >= 50 else "FAIL")
items.append(("Compute Throughput", f"{verdict} (worst {worst:.1f}%)"))
else:
valid_effs = [v for v in eff_pct.values() if isinstance(v, (int, float)) and v > 0]
if valid_effs:
worst = min(valid_effs)
verdict = "PASS" if worst >= 80 else ("WARN" if worst >= 50 else "FAIL")
items.append(("Compute Throughput", f"{verdict} (worst {worst:.1f}%)"))
else:
items.append(("Compute Throughput", "N/A"))
items.append(("Compute Throughput", "N/A"))
# NCCL
if "nccl" in results:
@ -544,9 +474,7 @@ class ReportGenerator:
# RDMA
if "rdma" in results:
r = results["rdma"]
if r.get("skipped") or r.get("status") == "SKIP":
items.append(("RDMA", f"SKIP ({r.get('reason', 'no IB hardware')})"))
elif r.get("error"):
if r.get("error"):
items.append(("RDMA", f"ERROR: {r['error']}"))
elif r.get("passed"):
items.append(("RDMA", "PASS"))

View File

@ -144,14 +144,8 @@ class StressTest:
alloc_bytes = min(target_mem, int(free_mem * 0.95))
# matmul(A, A.T) needs 2x input memory (input + output)
mem_side = int((alloc_bytes / 4 / 2) ** 0.5)
# Cap compute matrix so a single matmul completes in ~2s on H100/H200
# (FP32 ≈ 67 TFLOPS → 2*4096³/67e12 ≈ 2s). Without this cap, a 141GB
# HBM yields side ≈ 131K → single matmul ~68s × 8 GPUs serial → loop
# overshoots a 60s duration request by 10×+.
MAX_COMPUTE_SIDE = 4096
side = min(mem_side, MAX_COMPUTE_SIDE)
side = int((alloc_bytes / 4 / 2) ** 0.5) # float32 = 4 bytes
actual_mem_mb = side * side * 4 / 1024 / 1024
total_mem_mb = total_mem / 1024 / 1024
free_mem_mb = free_mem / 1024 / 1024
@ -167,16 +161,12 @@ class StressTest:
elapsed_check = 0
while time.time() - t0 < duration:
# Dispatch matmul on all GPUs in parallel — do NOT synchronize between
# GPUs, otherwise the 8 GPUs run serially and overshoot the duration.
for i in range(gpu_count):
with torch.cuda.device(i):
tensors[i] = torch.matmul(tensors[i], tensors[i].T)
# Single sync per pass — waits for all 8 streams concurrently
for i in range(gpu_count):
with torch.cuda.device(i):
torch.cuda.synchronize()
time.sleep(0.1)
# Show progress every 10 seconds
current_elapsed = time.time() - t0
if int(current_elapsed) != int(elapsed_check) and int(current_elapsed) % 10 == 0: