Compare commits
No commits in common. "main" and "zulifeng" have entirely different histories.
1
.gitignore
vendored
1
.gitignore
vendored
@ -14,4 +14,3 @@ reports/
|
|||||||
.venv/
|
.venv/
|
||||||
venv/
|
venv/
|
||||||
.qoder/*
|
.qoder/*
|
||||||
.claude/settings.local.json
|
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
# GPU type: auto-detect or override to a100/a800/h100/h800/h200/h20/b200/b300
|
# GPU type: auto-detect or override to a100/a800/h100/h200/b200/b300
|
||||||
gpu_type: auto
|
gpu_type: auto
|
||||||
|
|
||||||
benchmark:
|
benchmark:
|
||||||
@ -14,29 +14,13 @@ benchmark:
|
|||||||
- fp16
|
- fp16
|
||||||
- bf16
|
- bf16
|
||||||
- fp8
|
- fp8
|
||||||
# MAMF-style shape sweep: measure each dtype at every shape below and keep the max
|
matrix_size: 4096
|
||||||
# TFLOPS (the realistic achievable peak). A single fixed shape under-reports by
|
warmup: 10
|
||||||
# ~7-12% and can't meet the MAMF-calibrated thresholds in gpu_specs.py.
|
iterations: 100
|
||||||
# Each entry is either N (square N×N×N) or [M, N, K]. K-heavy non-square shapes
|
|
||||||
# (e.g. 2048×2048×13312) hit the true Hopper MAMF — bf16 ~790 vs ~755 square.
|
|
||||||
# Empty list => single matrix_size shape (legacy behaviour).
|
|
||||||
sweep_sizes:
|
|
||||||
- 3584
|
|
||||||
- 4608
|
|
||||||
- 5376
|
|
||||||
- 8192
|
|
||||||
- 11520
|
|
||||||
- [2048, 2048, 13312]
|
|
||||||
- [2048, 2048, 16384]
|
|
||||||
matrix_size: 8192 # fallback shape when sweep_sizes is empty
|
|
||||||
warmup: 20
|
|
||||||
iterations: 80
|
|
||||||
# NOTE: torch.compile was dropped — on H100 eager cuBLAS beats Triton for plain
|
|
||||||
# GEMM, and compiling would re-autotune per shape and make the sweep very slow.
|
|
||||||
|
|
||||||
health:
|
health:
|
||||||
temp_warning: 75
|
temp_warning: 80
|
||||||
temp_critical: 85
|
temp_critical: 90
|
||||||
power_limit: null # null = auto-detect from GPU TDP per gpu_specs.py
|
power_limit: null # null = auto-detect from GPU TDP per gpu_specs.py
|
||||||
|
|
||||||
nccl:
|
nccl:
|
||||||
@ -49,7 +33,7 @@ nccl:
|
|||||||
test_sendrecv: false
|
test_sendrecv: false
|
||||||
|
|
||||||
stress:
|
stress:
|
||||||
duration_sec: 600 # 10 min — reaches thermal steady state, validates throttle/jitter beyond warmup
|
duration_sec: 60
|
||||||
use_doubles: false
|
use_doubles: false
|
||||||
use_tensor_cores: true
|
use_tensor_cores: true
|
||||||
memory_pct: 90
|
memory_pct: 90
|
||||||
@ -62,24 +46,6 @@ rdma:
|
|||||||
msg_size: 65536
|
msg_size: 65536
|
||||||
ib_device: null
|
ib_device: null
|
||||||
ib_port: 1
|
ib_port: 1
|
||||||
# Cross-node (two-host) RDMA via perftest, orchestrated over SSH from the CLIENT
|
|
||||||
# node. Replaces the old scripts/rdma_cross_node.sh. Run on the client; it starts
|
|
||||||
# ib_write_bw/ib_write_lat servers on `server` over SSH (passwordless required),
|
|
||||||
# then drives the local client per device.
|
|
||||||
cross_node:
|
|
||||||
enabled: false # set true on the client node to run cross-node RDMA
|
|
||||||
server: null # peer ssh address, e.g. 172.72.8.12 (server node)
|
|
||||||
server_addr: null # OOB addr client connects to (default: = server)
|
|
||||||
ssh_user: root
|
|
||||||
devices: [] # e.g. [mlx5_0, mlx5_1, mlx5_6, mlx5_7]; [] = auto-detect active IB
|
|
||||||
ib_port: 1
|
|
||||||
gid_index: null # -x <n> for RoCE; null for pure InfiniBand
|
|
||||||
msg_size: 1048576 # 1 MiB — large enough to reach NDR400 peak
|
|
||||||
iters: 5000
|
|
||||||
base_oob_port: 18515 # per-device OOB port = base + device index
|
|
||||||
server_warmup_sec: 2.0
|
|
||||||
min_bandwidth_gbps: 350 # per-port PASS floor (NDR400 ≈ 0.9 × 400)
|
|
||||||
max_latency_us: 5
|
|
||||||
|
|
||||||
training:
|
training:
|
||||||
model: gpt2
|
model: gpt2
|
||||||
|
|||||||
@ -30,8 +30,7 @@ class Benchmark:
|
|||||||
self.console = Console()
|
self.console = Console()
|
||||||
self.bench_cfg = config.get("benchmark", {})
|
self.bench_cfg = config.get("benchmark", {})
|
||||||
self.tools_dir = resolve_tools_dir(config)
|
self.tools_dir = resolve_tools_dir(config)
|
||||||
cfg_gpu_type = config.get("gpu_type", "auto")
|
self.gpu_type = detect_gpu_type()
|
||||||
self.gpu_type = cfg_gpu_type if cfg_gpu_type != "auto" else detect_gpu_type()
|
|
||||||
self.specs = get_gpu_specs(self.gpu_type)
|
self.specs = get_gpu_specs(self.gpu_type)
|
||||||
self.gpu_label = get_gpu_label(self.gpu_type)
|
self.gpu_label = get_gpu_label(self.gpu_type)
|
||||||
|
|
||||||
@ -126,12 +125,8 @@ class Benchmark:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# --disableAffinity skips nvbandwidth's CPU affinity setup, which
|
cmd = [nvbw_path, "-t", tc, "-b", str(buffer_mb),
|
||||||
# calls nvmlDeviceGetHandleByUUID() — that lookup fails on hosts
|
"-i", str(samples), "-j"]
|
||||||
# whose fabricmanager build doesn't expose the UUID format nvml
|
|
||||||
# expects (seen on H20-3e with custom 570.172.08-1 fabricmanager).
|
|
||||||
cmd = [nvbw_path, "--disableAffinity", "-t", tc,
|
|
||||||
"-b", str(buffer_mb), "-i", str(samples), "-j"]
|
|
||||||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
|
r = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
|
||||||
|
|
||||||
if r.returncode == 0 and r.stdout.strip():
|
if r.returncode == 0 and r.stdout.strip():
|
||||||
@ -152,15 +147,6 @@ class Benchmark:
|
|||||||
h2d_bw = results_by_test.get("h2d", 0)
|
h2d_bw = results_by_test.get("h2d", 0)
|
||||||
d2h_bw = results_by_test.get("d2h", 0)
|
d2h_bw = results_by_test.get("d2h", 0)
|
||||||
|
|
||||||
# If every subtest returned 0 the nvbandwidth binary is broken on this host
|
|
||||||
# (e.g. CUDA_ERROR_INVALID_CONTEXT, NVML mismatch). Fall back to PyTorch.
|
|
||||||
if all(v == 0 for v in results_by_test.values()):
|
|
||||||
self.console.print(
|
|
||||||
"[yellow]nvbandwidth returned no usable data — "
|
|
||||||
"falling back to PyTorch memory benchmark[/yellow]"
|
|
||||||
)
|
|
||||||
return self._run_memory_pytorch()
|
|
||||||
|
|
||||||
# D2D goes through NVLink — compare to NVLink per-direction bandwidth
|
# D2D goes through NVLink — compare to NVLink per-direction bandwidth
|
||||||
# (nvlink_bandwidth_gbps is bidirectional, so per-direction = /2)
|
# (nvlink_bandwidth_gbps is bidirectional, so per-direction = /2)
|
||||||
nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0)
|
nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0)
|
||||||
@ -210,12 +196,9 @@ class Benchmark:
|
|||||||
for cell in row:
|
for cell in row:
|
||||||
try:
|
try:
|
||||||
v = float(cell)
|
v = float(cell)
|
||||||
|
values.append(v)
|
||||||
except (ValueError, TypeError):
|
except (ValueError, TypeError):
|
||||||
continue
|
continue
|
||||||
# Exclude diagonal entries (intra-device, reported as 0 or
|
|
||||||
# N/A) so they don't drag the off-diagonal average down.
|
|
||||||
if v > 0:
|
|
||||||
values.append(v)
|
|
||||||
if values:
|
if values:
|
||||||
return sum(values) / len(values)
|
return sum(values) / len(values)
|
||||||
return 0.0
|
return 0.0
|
||||||
@ -312,31 +295,9 @@ class Benchmark:
|
|||||||
def run_compute_benchmark(self, dtypes: Optional[List[str]] = None) -> dict:
|
def run_compute_benchmark(self, dtypes: Optional[List[str]] = None) -> dict:
|
||||||
comp_cfg = self.bench_cfg.get("compute", {})
|
comp_cfg = self.bench_cfg.get("compute", {})
|
||||||
configured_dtypes = dtypes or comp_cfg.get("dtypes", ["fp32", "tf32", "fp16", "bf16", "fp8"])
|
configured_dtypes = dtypes or comp_cfg.get("dtypes", ["fp32", "tf32", "fp16", "bf16", "fp8"])
|
||||||
|
matrix_size = comp_cfg.get("matrix_size", 4096)
|
||||||
# MAMF-style shape sweep (à la stas00's mamf-finder): a single fixed matmul
|
warmup = comp_cfg.get("warmup", 10)
|
||||||
# shape under-reports the achievable peak by ~7-12% and therefore can't meet
|
iterations = comp_cfg.get("iterations", 100)
|
||||||
# the MAMF-calibrated PASS thresholds in gpu_specs.compute_pass_thresholds_tflops.
|
|
||||||
# So for each dtype we time several matmul shapes and keep the MAXIMUM TFLOPS
|
|
||||||
# (the realistic peak). matrix_size is the fallback when sweep_sizes is empty.
|
|
||||||
matrix_size = comp_cfg.get("matrix_size", 8192)
|
|
||||||
sweep_sizes = comp_cfg.get("sweep_sizes") or [matrix_size]
|
|
||||||
warmup = comp_cfg.get("warmup", 20)
|
|
||||||
iterations = comp_cfg.get("iterations", 80)
|
|
||||||
|
|
||||||
# Each sweep entry is either an int N (square N×N×N) or an [M, N, K] triple.
|
|
||||||
# Non-square / K-heavy shapes (e.g. 2048×2048×13312) reach the true MAMF peak
|
|
||||||
# on Hopper — square-only tops out ~5% lower — so the default set mixes both.
|
|
||||||
def _to_shape(entry):
|
|
||||||
if isinstance(entry, (list, tuple)):
|
|
||||||
if len(entry) == 3:
|
|
||||||
return tuple(int(x) for x in entry)
|
|
||||||
if len(entry) == 1:
|
|
||||||
n = int(entry[0])
|
|
||||||
return (n, n, n)
|
|
||||||
raise ValueError(f"sweep size {entry!r} must be an int or [M, N, K]")
|
|
||||||
n = int(entry)
|
|
||||||
return (n, n, n)
|
|
||||||
shapes = [_to_shape(e) for e in sweep_sizes]
|
|
||||||
|
|
||||||
if not TORCH_AVAILABLE:
|
if not TORCH_AVAILABLE:
|
||||||
self.console.print("[yellow]PyTorch not available - skipping compute benchmark[/yellow]")
|
self.console.print("[yellow]PyTorch not available - skipping compute benchmark[/yellow]")
|
||||||
@ -344,11 +305,6 @@ class Benchmark:
|
|||||||
|
|
||||||
gpu_count = torch.cuda.device_count()
|
gpu_count = torch.cuda.device_count()
|
||||||
self.console.print(f"[cyan]Compute Benchmark - {gpu_count} GPU(s)[/cyan]")
|
self.console.print(f"[cyan]Compute Benchmark - {gpu_count} GPU(s)[/cyan]")
|
||||||
if len(sweep_sizes) > 1:
|
|
||||||
self.console.print(
|
|
||||||
f"[cyan] MAMF shape sweep over {len(sweep_sizes)} sizes: "
|
|
||||||
f"{', '.join(str(s) for s in sweep_sizes)}[/cyan]"
|
|
||||||
)
|
|
||||||
|
|
||||||
dtype_map = {
|
dtype_map = {
|
||||||
"fp32": (torch.float32, self.specs["fp32_tflops"]),
|
"fp32": (torch.float32, self.specs["fp32_tflops"]),
|
||||||
@ -359,7 +315,6 @@ class Benchmark:
|
|||||||
}
|
}
|
||||||
|
|
||||||
results_by_dtype = {}
|
results_by_dtype = {}
|
||||||
best_shapes = {}
|
|
||||||
per_gpu_results = [{"index": i} for i in range(gpu_count)]
|
per_gpu_results = [{"index": i} for i in range(gpu_count)]
|
||||||
|
|
||||||
with Progress(
|
with Progress(
|
||||||
@ -384,39 +339,50 @@ class Benchmark:
|
|||||||
|
|
||||||
dtype_val, peak_tflops = dtype_map[dtype_name]
|
dtype_val, peak_tflops = dtype_map[dtype_name]
|
||||||
|
|
||||||
# allow_tf32 only affects float32 matmuls: ON for the TF32 run, OFF for
|
try:
|
||||||
# the true-FP32 run so the two stay distinct.
|
|
||||||
old_tf32 = torch.backends.cuda.matmul.allow_tf32
|
|
||||||
if dtype_name == "tf32":
|
if dtype_name == "tf32":
|
||||||
|
old_tf32 = torch.backends.cuda.matmul.allow_tf32
|
||||||
torch.backends.cuda.matmul.allow_tf32 = True
|
torch.backends.cuda.matmul.allow_tf32 = True
|
||||||
dtype_val = torch.float32
|
dtype_val = torch.float32
|
||||||
elif dtype_name == "fp32":
|
|
||||||
torch.backends.cuda.matmul.allow_tf32 = False
|
|
||||||
|
|
||||||
best_tflops, best_shape, last_err = 0.0, None, None
|
M = N = K = matrix_size
|
||||||
for (M, N, K) in shapes:
|
|
||||||
try:
|
|
||||||
t = self._bench_matmul_once(dtype_name, dtype_val, M, N, K, warmup, iterations)
|
|
||||||
if t > best_tflops:
|
|
||||||
best_tflops, best_shape = t, (M, N, K)
|
|
||||||
except Exception as e: # noqa: BLE001 - record and try the next shape
|
|
||||||
last_err = e
|
|
||||||
|
|
||||||
|
if dtype_name == "fp8":
|
||||||
|
a = torch.randn(M, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn)
|
||||||
|
b = torch.randn(K, N, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn)
|
||||||
|
else:
|
||||||
|
a = torch.randn(M, K, device="cuda", dtype=dtype_val)
|
||||||
|
b = torch.randn(K, N, device="cuda", dtype=dtype_val)
|
||||||
|
|
||||||
|
for _ in range(warmup):
|
||||||
|
torch.matmul(a, b)
|
||||||
|
torch.cuda.synchronize()
|
||||||
|
|
||||||
|
start_event = torch.cuda.Event(enable_timing=True)
|
||||||
|
end_event = torch.cuda.Event(enable_timing=True)
|
||||||
|
start_event.record()
|
||||||
|
for _ in range(iterations):
|
||||||
|
c = torch.matmul(a, b)
|
||||||
|
end_event.record()
|
||||||
|
torch.cuda.synchronize()
|
||||||
|
|
||||||
|
elapsed_ms = start_event.elapsed_time(end_event)
|
||||||
|
flops = 2 * M * N * K * iterations
|
||||||
|
tflops = flops / (elapsed_ms / 1000) / 1e12
|
||||||
|
results_by_dtype[dtype_name] = round(tflops, 1)
|
||||||
|
|
||||||
|
for pg in per_gpu_results:
|
||||||
|
pg[dtype_name] = round(tflops, 1)
|
||||||
|
|
||||||
|
if dtype_name == "tf32":
|
||||||
torch.backends.cuda.matmul.allow_tf32 = old_tf32
|
torch.backends.cuda.matmul.allow_tf32 = old_tf32
|
||||||
|
|
||||||
if best_shape is None:
|
del a, b, c
|
||||||
results_by_dtype[dtype_name] = f"error: {last_err}"
|
torch.cuda.empty_cache()
|
||||||
self.console.print(f"[yellow] {dtype_name}: {last_err}[/yellow]")
|
|
||||||
else:
|
except Exception as e:
|
||||||
shape_str = "x".join(str(d) for d in best_shape)
|
results_by_dtype[dtype_name] = f"error: {e}"
|
||||||
results_by_dtype[dtype_name] = round(best_tflops, 1)
|
self.console.print(f"[yellow] {dtype_name}: {e}[/yellow]")
|
||||||
best_shapes[dtype_name] = shape_str
|
|
||||||
for pg in per_gpu_results:
|
|
||||||
pg[dtype_name] = round(best_tflops, 1)
|
|
||||||
if len(shapes) > 1:
|
|
||||||
self.console.print(
|
|
||||||
f"[dim] {dtype_name}: {best_tflops:.1f} TFLOPS @ {shape_str}[/dim]"
|
|
||||||
)
|
|
||||||
|
|
||||||
progress.advance(task)
|
progress.advance(task)
|
||||||
|
|
||||||
@ -432,74 +398,13 @@ class Benchmark:
|
|||||||
"per_dtype_tflops": results_by_dtype,
|
"per_dtype_tflops": results_by_dtype,
|
||||||
"peak_tflops": {dt: dtype_map[dt][1] for dt in dtype_map},
|
"peak_tflops": {dt: dtype_map[dt][1] for dt in dtype_map},
|
||||||
"efficiency_pct": efficiency,
|
"efficiency_pct": efficiency,
|
||||||
# Absolute TFLOPS PASS thresholds (decoupled from peak). When present,
|
|
||||||
# report.py judges PASS/WARN/FAIL against these directly instead of
|
|
||||||
# using % of peak. Empty dict => fall back to legacy 80% rule.
|
|
||||||
"pass_thresholds_tflops": dict(
|
|
||||||
self.specs.get("compute_pass_thresholds_tflops") or {}
|
|
||||||
),
|
|
||||||
"per_gpu": per_gpu_results,
|
"per_gpu": per_gpu_results,
|
||||||
"sweep_sizes": list(sweep_sizes),
|
|
||||||
"best_shapes": best_shapes,
|
|
||||||
"matrix_size": matrix_size,
|
"matrix_size": matrix_size,
|
||||||
"warmup": warmup,
|
"warmup": warmup,
|
||||||
"iterations": iterations,
|
"iterations": iterations,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def _bench_matmul_once(self, dtype_name: str, dtype_val, M: int, N: int, K: int,
|
|
||||||
warmup: int, iterations: int) -> float:
|
|
||||||
"""Time one (M×K)·(K×N) matmul for a dtype and return achieved TFLOPS.
|
|
||||||
|
|
||||||
Uses an L2-cache-busting pool of matrix pairs (total > 256 MB) so operands
|
|
||||||
can't be served from L2 across iterations, and CUDA events for timing. FP8
|
|
||||||
goes through torch._scaled_mm (e4m3); all others through torch.matmul — eager
|
|
||||||
cuBLAS, which on H100 beats torch.compile/Triton for plain GEMM and avoids the
|
|
||||||
per-shape recompile cost that would make a sweep pathologically slow.
|
|
||||||
"""
|
|
||||||
elem_bytes = 1 if dtype_name == "fp8" else torch.tensor([], dtype=dtype_val).element_size()
|
|
||||||
pair_bytes = (M * K + K * N) * elem_bytes
|
|
||||||
num_pools = max(4, -(-256 * 1024 * 1024 // pair_bytes)) # ceil(256MB / pair)
|
|
||||||
|
|
||||||
if dtype_name == "fp8":
|
|
||||||
if not hasattr(torch, "_scaled_mm"):
|
|
||||||
raise RuntimeError("torch._scaled_mm unavailable — upgrade to PyTorch >= 2.1")
|
|
||||||
pools_a = [torch.randn(M, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)]
|
|
||||||
pools_b = [torch.randn(N, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)]
|
|
||||||
scale_a = torch.tensor(1.0, device="cuda")
|
|
||||||
scale_b = torch.tensor(1.0, device="cuda")
|
|
||||||
def op(i):
|
|
||||||
return torch._scaled_mm(pools_a[i], pools_b[i].T, scale_a=scale_a, scale_b=scale_b, out_dtype=torch.bfloat16)
|
|
||||||
else:
|
|
||||||
pools_a = [torch.randn(M, K, device="cuda", dtype=dtype_val) for _ in range(num_pools)]
|
|
||||||
pools_b = [torch.randn(K, N, device="cuda", dtype=dtype_val) for _ in range(num_pools)]
|
|
||||||
def op(i):
|
|
||||||
return torch.matmul(pools_a[i], pools_b[i])
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Probe once so a broken/unsupported kernel raises before the timed loop.
|
|
||||||
_probe = op(0)
|
|
||||||
torch.cuda.synchronize()
|
|
||||||
del _probe
|
|
||||||
|
|
||||||
for i in range(warmup):
|
|
||||||
op(i % num_pools)
|
|
||||||
torch.cuda.synchronize()
|
|
||||||
|
|
||||||
start_event = torch.cuda.Event(enable_timing=True)
|
|
||||||
end_event = torch.cuda.Event(enable_timing=True)
|
|
||||||
start_event.record()
|
|
||||||
for i in range(iterations):
|
|
||||||
op(i % num_pools)
|
|
||||||
end_event.record()
|
|
||||||
torch.cuda.synchronize()
|
|
||||||
elapsed_ms = start_event.elapsed_time(end_event)
|
|
||||||
finally:
|
|
||||||
del pools_a, pools_b
|
|
||||||
torch.cuda.empty_cache()
|
|
||||||
|
|
||||||
return (2 * M * N * K * iterations) / (elapsed_ms / 1000) / 1e12
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def print_results(results: dict, console: Console = None):
|
def print_results(results: dict, console: Console = None):
|
||||||
c = console or Console()
|
c = console or Console()
|
||||||
@ -582,78 +487,3 @@ class Benchmark:
|
|||||||
table.add_row(dt.upper(), f"{achieved:.1f}", f"{pk:.0f}",
|
table.add_row(dt.upper(), f"{achieved:.1f}", f"{pk:.0f}",
|
||||||
f"[{ec}]{ef:.1f}%[/{ec}]")
|
f"[{ec}]{ef:.1f}%[/{ec}]")
|
||||||
c.print(table)
|
c.print(table)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def judge_compute(results: dict) -> dict:
|
|
||||||
"""Judge compute results against pass_thresholds_tflops.
|
|
||||||
|
|
||||||
Single source of truth for the PASS/WARN/FAIL rule (same one report.py uses):
|
|
||||||
achieved >= thr -> PASS; >= 0.9*thr -> WARN; else FAIL. A string achieved value
|
|
||||||
(skipped/error) -> SKIP. A dtype without a threshold falls back to efficiency
|
|
||||||
(>=80 PASS / >=50 WARN / else FAIL).
|
|
||||||
|
|
||||||
Returns {"rows": [(dtype, achieved, threshold, status), ...], "verdict": str}.
|
|
||||||
"""
|
|
||||||
comp = results.get("compute", results)
|
|
||||||
per_dtype = comp.get("per_dtype_tflops", {})
|
|
||||||
thresholds = comp.get("pass_thresholds_tflops", {}) or {}
|
|
||||||
eff = comp.get("efficiency_pct", {})
|
|
||||||
rank = {"PASS": 0, "WARN": 1, "FAIL": 2, "SKIP": 0}
|
|
||||||
rows, verdict = [], "PASS"
|
|
||||||
for dt, val in per_dtype.items():
|
|
||||||
thr = thresholds.get(dt)
|
|
||||||
if isinstance(val, str):
|
|
||||||
status = "SKIP"
|
|
||||||
elif thr:
|
|
||||||
status = "PASS" if val >= thr else ("WARN" if val >= thr * 0.9 else "FAIL")
|
|
||||||
else:
|
|
||||||
e = eff.get(dt, 0)
|
|
||||||
status = "PASS" if e >= 80 else ("WARN" if e >= 50 else "FAIL")
|
|
||||||
rows.append((dt, val, thr, status))
|
|
||||||
if rank[status] > rank[verdict]:
|
|
||||||
verdict = status
|
|
||||||
return {"rows": rows, "verdict": verdict}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def print_compute_verdict(results: dict, console: Console = None) -> str:
|
|
||||||
"""Print the PASS/WARN/FAIL table for compute results; return the verdict."""
|
|
||||||
c = console or Console()
|
|
||||||
judged = Benchmark.judge_compute(results)
|
|
||||||
color = {"PASS": "green", "WARN": "yellow", "FAIL": "red", "SKIP": "dim"}
|
|
||||||
c.print("\n[bold cyan]Compute Verdict (vs thresholds)[/bold cyan]")
|
|
||||||
for dt, val, thr, status in judged["rows"]:
|
|
||||||
val_s = f"{val:.1f}" if isinstance(val, (int, float)) else str(val)
|
|
||||||
thr_s = f">= {thr}" if thr else "(efficiency)"
|
|
||||||
c.print(f" {dt.upper():>4}: {val_s:>8} {thr_s:<12} [{color[status]}]{status}[/{color[status]}]")
|
|
||||||
v = judged["verdict"]
|
|
||||||
c.print(f" [bold]VERDICT: [{color[v]}]{v}[/{color[v]}][/bold]")
|
|
||||||
return v
|
|
||||||
|
|
||||||
|
|
||||||
def _run_cli() -> None:
|
|
||||||
"""`python -m modules.benchmark` — run ONLY the compute-throughput benchmark."""
|
|
||||||
import argparse
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
repo_root = Path(__file__).resolve().parent.parent
|
|
||||||
parser = argparse.ArgumentParser(description="Run the compute-throughput benchmark only.")
|
|
||||||
parser.add_argument("--config", default=str(repo_root / "configs" / "default.yaml"),
|
|
||||||
help="path to config YAML (default: configs/default.yaml)")
|
|
||||||
parser.add_argument("--json", action="store_true", help="also print raw JSON of the compute results")
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
with open(args.config) as f:
|
|
||||||
config = yaml.safe_load(f) or {}
|
|
||||||
|
|
||||||
results = Benchmark(config).run_compute_benchmark()
|
|
||||||
Benchmark.print_results(results)
|
|
||||||
Benchmark.print_compute_verdict(results)
|
|
||||||
|
|
||||||
if args.json:
|
|
||||||
print("JSON_RESULT:" + json.dumps(results["compute"]))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
_run_cli()
|
|
||||||
|
|||||||
@ -6,14 +6,11 @@ import subprocess
|
|||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
|
|
||||||
# GPU name patterns -> internal key mapping
|
# GPU name patterns -> internal key mapping
|
||||||
# Order matters: longer/more-specific patterns must come before shorter ones.
|
|
||||||
GPU_NAME_PATTERNS = {
|
GPU_NAME_PATTERNS = {
|
||||||
"A100": "a100",
|
"A100": "a100",
|
||||||
"A800": "a800",
|
"A800": "a800",
|
||||||
"H100": "h100",
|
"H100": "h100",
|
||||||
"H800": "h800", # H800 = H100 SXM with NVLink halved (400 GB/s) and FP64 restricted
|
|
||||||
"H200": "h200",
|
"H200": "h200",
|
||||||
"H20": "h20", # H20 / H20-3e is the China-compliance export variant, REDUCED peaks
|
|
||||||
"B200": "b200",
|
"B200": "b200",
|
||||||
"B300": "b300",
|
"B300": "b300",
|
||||||
}
|
}
|
||||||
@ -21,10 +18,6 @@ GPU_NAME_PATTERNS = {
|
|||||||
# Specs database — ALL values are DENSE (non-sparse) TFLOPS
|
# Specs database — ALL values are DENSE (non-sparse) TFLOPS
|
||||||
GPU_SPECS = {
|
GPU_SPECS = {
|
||||||
"h100": {
|
"h100": {
|
||||||
# Peaks below are NVIDIA marketing dense peaks (theoretical Tensor Core max).
|
|
||||||
# `compute_pass_thresholds_tflops` carries the absolute PASS thresholds used
|
|
||||||
# by report.py — decoupled from peaks so marketing-spec changes (dense vs
|
|
||||||
# sparse vs FP8-sparsity) don't shift the validation bar.
|
|
||||||
"full_name": "NVIDIA H100 SXM5",
|
"full_name": "NVIDIA H100 SXM5",
|
||||||
"architecture": "Hopper",
|
"architecture": "Hopper",
|
||||||
"compute_capability": 9.0,
|
"compute_capability": 9.0,
|
||||||
@ -36,18 +29,6 @@ GPU_SPECS = {
|
|||||||
"fp16_tflops": 990, # dense (1979 sparse w/ 2:4)
|
"fp16_tflops": 990, # dense (1979 sparse w/ 2:4)
|
||||||
"bf16_tflops": 990, # dense
|
"bf16_tflops": 990, # dense
|
||||||
"fp8_tflops": 1979, # dense
|
"fp8_tflops": 1979, # dense
|
||||||
"compute_pass_thresholds_tflops": {
|
|
||||||
# Recalibrated 2026-05-25 to the H100 eager-cuBLAS achievable floor (each
|
|
||||||
# threshold ~2-4% below the sustained value measured across 16 GPUs via the
|
|
||||||
# MAMF shape sweep: fp32 ~52 / tf32 ~405 / fp16 ~732-748 / bf16 ~747-758 /
|
|
||||||
# fp8 ~1248-1271). The old marketing/MAMF-derived values (fp32 54, tf32 444,
|
|
||||||
# fp16 734, bf16 745, fp8 1400) sat ON or ABOVE what PyTorch cuBLAS reaches
|
|
||||||
# on H100, so healthy cards flaked to WARN/FAIL. fp8 1400 in particular was
|
|
||||||
# an H200/rowwise-scaling figure; H100 tensorwise _scaled_mm tops out ~1310.
|
|
||||||
"fp32": 50, "tf32": 385, "fp16": 720, "bf16": 730, "fp8": 1200,
|
|
||||||
# FP64 63 / INT8 1536 — listed for documentation; benchmark module
|
|
||||||
# doesn't currently exercise these dtypes.
|
|
||||||
},
|
|
||||||
"tdp_watts": 700,
|
"tdp_watts": 700,
|
||||||
"nvlink_gen": 4,
|
"nvlink_gen": 4,
|
||||||
"nvlink_bandwidth_gbps": 900, # bidirectional
|
"nvlink_bandwidth_gbps": 900, # bidirectional
|
||||||
@ -67,70 +48,6 @@ GPU_SPECS = {
|
|||||||
"fp16_tflops": 990, # dense
|
"fp16_tflops": 990, # dense
|
||||||
"bf16_tflops": 990, # dense
|
"bf16_tflops": 990, # dense
|
||||||
"fp8_tflops": 1979, # dense
|
"fp8_tflops": 1979, # dense
|
||||||
# PASS thresholds aligned with H200_production_acceptance.md v2 (2026-05-21):
|
|
||||||
# calibrated against Semianalysis & stas00 MAMF — H200 shares H100 SMs so
|
|
||||||
# achievable TFLOPS in PyTorch is in the same band.
|
|
||||||
"compute_pass_thresholds_tflops": {
|
|
||||||
"fp32": 50, "tf32": 400, "fp16": 720, "bf16": 720, "fp8": 1400,
|
|
||||||
},
|
|
||||||
"tdp_watts": 700,
|
|
||||||
"nvlink_gen": 4,
|
|
||||||
"nvlink_bandwidth_gbps": 900,
|
|
||||||
"pcie_gen": 5,
|
|
||||||
"min_driver_version": "545",
|
|
||||||
"min_cuda_version": "12.4",
|
|
||||||
},
|
|
||||||
"h800": {
|
|
||||||
# H800 = China-compliance export variant of H100 SXM5. SAME chip / SMs /
|
|
||||||
# clocks / HBM as H100 SXM5 — Tensor Core peaks (FP16 / BF16 / FP8 / TF32 /
|
|
||||||
# FP32) are identical to H100. Two restrictions vs H100:
|
|
||||||
# 1. NVLink bandwidth halved: 400 GB/s bidirectional (vs H100 900 GB/s)
|
|
||||||
# 2. FP64 throughput severely cut to ~1 TFLOPS (vs H100 34/67 TFLOPS)
|
|
||||||
# All other interfaces (PCIe Gen5, NVSwitch, HBM3 80GB @ 3.35 TB/s) match H100.
|
|
||||||
# NCCL multi-GPU thresholds MUST be downscaled because NVLink BW is halved.
|
|
||||||
"full_name": "NVIDIA H800 SXM5",
|
|
||||||
"architecture": "Hopper",
|
|
||||||
"compute_capability": 9.0,
|
|
||||||
"hbm_capacity_gb": 80,
|
|
||||||
"hbm_type": "HBM3",
|
|
||||||
"memory_bandwidth_gbps": 3350, # GB/s (3.35 TB/s) — same as H100 SXM
|
|
||||||
"fp32_tflops": 67,
|
|
||||||
"tf32_tflops": 495, # dense (same as H100)
|
|
||||||
"fp16_tflops": 990, # dense (same as H100)
|
|
||||||
"bf16_tflops": 990, # dense (same as H100)
|
|
||||||
"fp8_tflops": 1979, # dense (same as H100)
|
|
||||||
# Tensor Core peaks identical to H100, so PASS thresholds reuse the H100
|
|
||||||
# eager-cuBLAS calibration (2026-05-25). Measured on 8×H800: fp32 ~52 /
|
|
||||||
# tf32 ~420 / fp16 ~741 / bf16 ~745 / fp8 ~1249 — all clear these. fp8 was
|
|
||||||
# 1400 (an H200/rowwise-scaling figure) which PyTorch tensorwise _scaled_mm
|
|
||||||
# can't reach on H100-class silicon (~1310 ceiling); lowered to 1200 to match
|
|
||||||
# h100. FP64 deliberately NOT listed — H800 is restricted to ~1 TFLOPS FP64.
|
|
||||||
"compute_pass_thresholds_tflops": {
|
|
||||||
"fp32": 50, "tf32": 385, "fp16": 720, "bf16": 730, "fp8": 1200,
|
|
||||||
},
|
|
||||||
"tdp_watts": 700,
|
|
||||||
"nvlink_gen": 4,
|
|
||||||
"nvlink_bandwidth_gbps": 400, # bidirectional — HALF of H100 (export restriction)
|
|
||||||
"pcie_gen": 5,
|
|
||||||
"min_driver_version": "535",
|
|
||||||
"min_cuda_version": "12.1",
|
|
||||||
},
|
|
||||||
"h20": {
|
|
||||||
# China-compliance export variant of H200 (reported as "H20" / "H20-3e" by nvidia-smi).
|
|
||||||
# Same silicon family / HBM as H200, but Tensor Core peaks are throttled.
|
|
||||||
# Peaks below are sourced from supplier / NVIDIA China and confirmed against
|
|
||||||
# measured throughput on 8x H20-3e (FP16 ~741, BF16 ~770, FP8 ~1328 TFLOPS).
|
|
||||||
"full_name": "NVIDIA H20 / H20-3e",
|
|
||||||
"architecture": "Hopper",
|
|
||||||
"compute_capability": 9.0,
|
|
||||||
"hbm_capacity_gb": 141,
|
|
||||||
"hbm_type": "HBM3e",
|
|
||||||
"memory_bandwidth_gbps": 4800,
|
|
||||||
"fp32_tflops": 54, # China spec (matches measured ~51-52)
|
|
||||||
"tf32_tflops": 372, # ~75% of H200 (matches measured ~362)
|
|
||||||
"fp16_tflops": 744, # dense, China spec
|
|
||||||
"bf16_tflops": 739, # dense, China spec
|
|
||||||
"fp8_tflops": 1420, # dense, China spec
|
|
||||||
"tdp_watts": 700,
|
"tdp_watts": 700,
|
||||||
"nvlink_gen": 4,
|
"nvlink_gen": 4,
|
||||||
"nvlink_bandwidth_gbps": 900,
|
"nvlink_bandwidth_gbps": 900,
|
||||||
@ -229,7 +146,6 @@ _UNKNOWN_SPECS = {
|
|||||||
"fp16_tflops": 0,
|
"fp16_tflops": 0,
|
||||||
"bf16_tflops": 0,
|
"bf16_tflops": 0,
|
||||||
"fp8_tflops": 0,
|
"fp8_tflops": 0,
|
||||||
"compute_pass_thresholds_tflops": {}, # empty => report.py falls back to 80% of peak
|
|
||||||
"tdp_watts": 700,
|
"tdp_watts": 700,
|
||||||
"nvlink_gen": 0,
|
"nvlink_gen": 0,
|
||||||
"nvlink_bandwidth_gbps": 0,
|
"nvlink_bandwidth_gbps": 0,
|
||||||
@ -256,10 +172,9 @@ def detect_gpu_type() -> str:
|
|||||||
if r.returncode != 0:
|
if r.returncode != 0:
|
||||||
return "unknown"
|
return "unknown"
|
||||||
|
|
||||||
first_line = r.stdout.strip().splitlines()[0].strip().upper()
|
first_line = r.stdout.strip().splitlines()[0].strip()
|
||||||
# Iterate longest-pattern-first so "H200" doesn't get matched by "H20".
|
for pattern, key in GPU_NAME_PATTERNS.items():
|
||||||
for pattern, key in sorted(GPU_NAME_PATTERNS.items(), key=lambda kv: -len(kv[0])):
|
if pattern in first_line.upper():
|
||||||
if pattern in first_line:
|
|
||||||
return key
|
return key
|
||||||
return "unknown"
|
return "unknown"
|
||||||
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
||||||
|
|||||||
@ -1,533 +0,0 @@
|
|||||||
"""Multi-node NCCL benchmark wrapper for nccl-tests via mpirun."""
|
|
||||||
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
|
||||||
from datetime import datetime
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from rich.console import Console
|
|
||||||
from rich.table import Table
|
|
||||||
|
|
||||||
from modules.gpu_specs import resolve_tools_dir
|
|
||||||
|
|
||||||
|
|
||||||
_TEST_ALIASES = {
|
|
||||||
"allreduce": "all_reduce_perf",
|
|
||||||
"all_reduce": "all_reduce_perf",
|
|
||||||
"all_reduce_perf": "all_reduce_perf",
|
|
||||||
"allgather": "all_gather_perf",
|
|
||||||
"all_gather": "all_gather_perf",
|
|
||||||
"all_gather_perf": "all_gather_perf",
|
|
||||||
"alltoall": "alltoall_perf",
|
|
||||||
"all_to_all": "alltoall_perf",
|
|
||||||
"alltoall_perf": "alltoall_perf",
|
|
||||||
"broadcast": "broadcast_perf",
|
|
||||||
"broadcast_perf": "broadcast_perf",
|
|
||||||
"reducescatter": "reduce_scatter_perf",
|
|
||||||
"reduce_scatter": "reduce_scatter_perf",
|
|
||||||
"reduce_scatter_perf": "reduce_scatter_perf",
|
|
||||||
"sendrecv": "sendrecv_perf",
|
|
||||||
"send_recv": "sendrecv_perf",
|
|
||||||
"sendrecv_perf": "sendrecv_perf",
|
|
||||||
}
|
|
||||||
|
|
||||||
_OP_LABELS = {
|
|
||||||
"all_reduce_perf": "allreduce",
|
|
||||||
"all_gather_perf": "allgather",
|
|
||||||
"alltoall_perf": "alltoall",
|
|
||||||
"broadcast_perf": "broadcast",
|
|
||||||
"reduce_scatter_perf": "reducescatter",
|
|
||||||
"sendrecv_perf": "sendrecv",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class MultiNodeNCCLTest:
|
|
||||||
"""Run cross-node NCCL tests with a PDF-style message-size sweep."""
|
|
||||||
|
|
||||||
def __init__(self, config: dict):
|
|
||||||
self.config = config
|
|
||||||
self.cfg = config.get("multinode_nccl", {}) or {}
|
|
||||||
self.tools_dir = resolve_tools_dir(config)
|
|
||||||
self.console = Console()
|
|
||||||
self.artifact_dir = os.environ.get("MULTINODE_NCCL_ARTIFACT_DIR") or self.cfg.get("artifact_dir")
|
|
||||||
|
|
||||||
def _find_mpirun(self) -> Optional[str]:
|
|
||||||
configured = self.cfg.get("mpirun_path")
|
|
||||||
if configured and os.path.isfile(str(configured)) and os.access(str(configured), os.X_OK):
|
|
||||||
return str(configured)
|
|
||||||
for cmd in ["mpirun", "mpiexec", os.path.join(self.tools_dir, "mpi", "bin", "mpirun")]:
|
|
||||||
found = shutil.which(cmd)
|
|
||||||
if found:
|
|
||||||
return found
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _find_nccl_test(self, binary_name: str) -> Optional[str]:
|
|
||||||
configured = self.cfg.get("nccl_tests_dir")
|
|
||||||
candidates = []
|
|
||||||
if configured:
|
|
||||||
candidates.append(os.path.join(configured, binary_name))
|
|
||||||
candidates.append(os.path.join(self.tools_dir, "nccl-tests", "build", binary_name))
|
|
||||||
found = shutil.which(binary_name)
|
|
||||||
if found:
|
|
||||||
candidates.insert(0, found)
|
|
||||||
|
|
||||||
for path in candidates:
|
|
||||||
if path and os.path.isfile(path) and os.access(path, os.X_OK):
|
|
||||||
return path
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _tests(self) -> list[str]:
|
|
||||||
configured = self.cfg.get("tests") or ["all_reduce_perf", "alltoall_perf"]
|
|
||||||
tests = []
|
|
||||||
for name in configured:
|
|
||||||
binary = _TEST_ALIASES.get(str(name).lower())
|
|
||||||
if binary and binary not in tests:
|
|
||||||
tests.append(binary)
|
|
||||||
return tests
|
|
||||||
|
|
||||||
def _hosts(self) -> list[dict]:
|
|
||||||
hosts = self.cfg.get("hosts") or []
|
|
||||||
normalized = []
|
|
||||||
for host in hosts:
|
|
||||||
if isinstance(host, str):
|
|
||||||
normalized.append({"addr": host, "slots": 8})
|
|
||||||
elif isinstance(host, dict):
|
|
||||||
normalized.append({
|
|
||||||
"name": host.get("name") or host.get("addr"),
|
|
||||||
"addr": host.get("addr") or host.get("host") or host.get("ip"),
|
|
||||||
"slots": int(host.get("slots", 8)),
|
|
||||||
})
|
|
||||||
return [h for h in normalized if h.get("addr")]
|
|
||||||
|
|
||||||
def _topologies(self) -> list[dict]:
|
|
||||||
topologies = self.cfg.get("topologies") or [{"nodes": 2, "gpus_per_node": 8}]
|
|
||||||
normalized = []
|
|
||||||
for topo in topologies:
|
|
||||||
nodes = int(topo.get("nodes", 2))
|
|
||||||
gpus_per_node = int(topo.get("gpus_per_node", topo.get("gpn", 8)))
|
|
||||||
normalized.append({
|
|
||||||
"nodes": nodes,
|
|
||||||
"gpus_per_node": gpus_per_node,
|
|
||||||
"label": topo.get("label") or f"{nodes} nodes x {gpus_per_node} GPUs",
|
|
||||||
"cuda_visible_devices": topo.get("cuda_visible_devices"),
|
|
||||||
"env": topo.get("env") or {},
|
|
||||||
"op_env": topo.get("op_env") or topo.get("test_env") or {},
|
|
||||||
"min_peak_busbw_gbps": topo.get("min_peak_busbw_gbps"),
|
|
||||||
})
|
|
||||||
return normalized
|
|
||||||
|
|
||||||
def _env_exports(self, topo: dict = None, label: str = None, binary: str = None) -> list[tuple[str, str]]:
|
|
||||||
env_cfg = {
|
|
||||||
"NCCL_DEBUG": self.cfg.get("debug", "WARN"),
|
|
||||||
"NCCL_SOCKET_IFNAME": self.cfg.get("socket_ifname"),
|
|
||||||
"NCCL_IB_GID_INDEX": self.cfg.get("ib_gid_index"),
|
|
||||||
"NCCL_IB_SL": self.cfg.get("ib_sl"),
|
|
||||||
"NCCL_IB_TC": self.cfg.get("ib_tc"),
|
|
||||||
"NCCL_IB_HCA": self.cfg.get("ib_hca"),
|
|
||||||
"NCCL_IB_TIMEOUT": self.cfg.get("ib_timeout"),
|
|
||||||
"NCCL_IB_QPS_PER_CONNECTION": self.cfg.get("qps_per_connection"),
|
|
||||||
"NCCL_MIN_NCHANNELS": self.cfg.get("min_nchannels"),
|
|
||||||
"NCCL_NET_PLUGIN": self.cfg.get("net_plugin"),
|
|
||||||
"NCCL_NVLS_ENABLE": self.cfg.get("nvls_enable"),
|
|
||||||
"NCCL_IB_SPLIT_DATA_ON_QPS": self.cfg.get("split_data_on_qps"),
|
|
||||||
}
|
|
||||||
mpi_ld_preload = self._mpi_ld_preload()
|
|
||||||
if mpi_ld_preload:
|
|
||||||
env_cfg["LD_PRELOAD"] = mpi_ld_preload
|
|
||||||
extra_ld_library_path = self._extra_ld_library_path()
|
|
||||||
if extra_ld_library_path:
|
|
||||||
existing = os.environ.get("LD_LIBRARY_PATH", "")
|
|
||||||
env_cfg["LD_LIBRARY_PATH"] = ":".join(
|
|
||||||
[extra_ld_library_path] + ([existing] if existing else [])
|
|
||||||
)
|
|
||||||
extra_env = self.cfg.get("extra_env") or {}
|
|
||||||
if isinstance(extra_env, dict):
|
|
||||||
self._merge_env(env_cfg, extra_env)
|
|
||||||
if topo:
|
|
||||||
if topo.get("cuda_visible_devices"):
|
|
||||||
env_cfg["CUDA_VISIBLE_DEVICES"] = str(topo["cuda_visible_devices"])
|
|
||||||
if isinstance(topo.get("env"), dict):
|
|
||||||
self._merge_env(env_cfg, topo["env"])
|
|
||||||
op_env = topo.get("op_env")
|
|
||||||
if isinstance(op_env, dict):
|
|
||||||
for key in (label, binary):
|
|
||||||
overrides = op_env.get(key)
|
|
||||||
if isinstance(overrides, dict):
|
|
||||||
self._merge_env(env_cfg, overrides)
|
|
||||||
return [(k, str(v)) for k, v in env_cfg.items() if v is not None]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _merge_env(env_cfg: dict, overrides: dict):
|
|
||||||
for key, value in overrides.items():
|
|
||||||
key = str(key)
|
|
||||||
if value is None:
|
|
||||||
env_cfg.pop(key, None)
|
|
||||||
else:
|
|
||||||
env_cfg[key] = str(value)
|
|
||||||
|
|
||||||
def _mpi_ld_preload(self) -> str:
|
|
||||||
preload = self.cfg.get("mpi_ld_preload")
|
|
||||||
if isinstance(preload, list):
|
|
||||||
return " ".join(str(p) for p in preload if p)
|
|
||||||
return str(preload) if preload else ""
|
|
||||||
|
|
||||||
def _runtime_env(self) -> dict:
|
|
||||||
env = os.environ.copy()
|
|
||||||
mpi_ld_preload = self._mpi_ld_preload()
|
|
||||||
if mpi_ld_preload:
|
|
||||||
env["LD_PRELOAD"] = mpi_ld_preload
|
|
||||||
extra_ld_library_path = self._extra_ld_library_path()
|
|
||||||
if extra_ld_library_path:
|
|
||||||
existing = env.get("LD_LIBRARY_PATH", "")
|
|
||||||
env["LD_LIBRARY_PATH"] = ":".join(
|
|
||||||
[extra_ld_library_path] + ([existing] if existing else [])
|
|
||||||
)
|
|
||||||
return env
|
|
||||||
|
|
||||||
def _extra_ld_library_path(self) -> str:
|
|
||||||
paths = self.cfg.get("extra_ld_library_path")
|
|
||||||
if isinstance(paths, list):
|
|
||||||
return ":".join(str(p) for p in paths if p)
|
|
||||||
return str(paths) if paths else ""
|
|
||||||
|
|
||||||
def _preflight(self, mpirun: Optional[str], tests: list[str], hosts: list[dict]) -> dict:
|
|
||||||
checks = []
|
|
||||||
checks.append({"name": "mpirun", "status": "PASS" if mpirun else "FAIL", "detail": mpirun or "not found"})
|
|
||||||
checks.append({"name": "hosts", "status": "PASS" if len(hosts) >= 2 else "FAIL", "detail": f"{len(hosts)} configured"})
|
|
||||||
for binary in tests:
|
|
||||||
path = self._find_nccl_test(binary)
|
|
||||||
checks.append({"name": binary, "status": "PASS" if path else "FAIL", "detail": path or "not found"})
|
|
||||||
|
|
||||||
if self.cfg.get("ssh_preflight", True):
|
|
||||||
user = self.cfg.get("ssh_user", "root")
|
|
||||||
for host in hosts:
|
|
||||||
target = f"{user}@{host['addr']}"
|
|
||||||
cmd = [
|
|
||||||
"ssh",
|
|
||||||
"-o", "BatchMode=yes",
|
|
||||||
"-o", "ConnectTimeout=5",
|
|
||||||
"-o", "StrictHostKeyChecking=accept-new",
|
|
||||||
target,
|
|
||||||
"hostname",
|
|
||||||
]
|
|
||||||
try:
|
|
||||||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=8, env=self._runtime_env())
|
|
||||||
detail = r.stdout.strip() or r.stderr.strip()[:120]
|
|
||||||
checks.append({
|
|
||||||
"name": f"ssh {host['addr']}",
|
|
||||||
"status": "PASS" if r.returncode == 0 else "WARN",
|
|
||||||
"detail": detail,
|
|
||||||
})
|
|
||||||
except Exception as e:
|
|
||||||
checks.append({"name": f"ssh {host['addr']}", "status": "WARN", "detail": str(e)})
|
|
||||||
|
|
||||||
return {
|
|
||||||
"checks": checks,
|
|
||||||
"passed": all(c["status"] == "PASS" for c in checks if not c["name"].startswith("ssh ")),
|
|
||||||
}
|
|
||||||
|
|
||||||
def run(self) -> dict:
|
|
||||||
mpirun = self._find_mpirun()
|
|
||||||
tests = self._tests()
|
|
||||||
hosts = self._hosts()
|
|
||||||
topologies = self._topologies()
|
|
||||||
preflight = self._preflight(mpirun, tests, hosts)
|
|
||||||
|
|
||||||
if not preflight["passed"]:
|
|
||||||
return {
|
|
||||||
"passed": False,
|
|
||||||
"source": "nccl-tests-mpirun",
|
|
||||||
"mode": self.cfg.get("mode", "sweep"),
|
|
||||||
"hosts": hosts,
|
|
||||||
"preflight": preflight,
|
|
||||||
"tests": {},
|
|
||||||
"error": "multinode NCCL preflight failed",
|
|
||||||
"timestamp": datetime.now().isoformat(),
|
|
||||||
}
|
|
||||||
|
|
||||||
results = {}
|
|
||||||
for binary in tests:
|
|
||||||
label = _OP_LABELS[binary]
|
|
||||||
binary_path = self._find_nccl_test(binary)
|
|
||||||
op_results = []
|
|
||||||
for topo in topologies:
|
|
||||||
op_results.append(self._run_topology(mpirun, binary_path, label, hosts, topo))
|
|
||||||
results[label] = {"binary": binary_path, "topologies": op_results}
|
|
||||||
|
|
||||||
passed = all(
|
|
||||||
topo.get("status") == "PASS"
|
|
||||||
for op in results.values()
|
|
||||||
for topo in op.get("topologies", [])
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"passed": passed,
|
|
||||||
"source": "nccl-tests-mpirun",
|
|
||||||
"mode": self.cfg.get("mode", "sweep"),
|
|
||||||
"hosts": hosts,
|
|
||||||
"preflight": preflight,
|
|
||||||
"tests": results,
|
|
||||||
"artifact_dir": self.artifact_dir,
|
|
||||||
"timestamp": datetime.now().isoformat(),
|
|
||||||
}
|
|
||||||
|
|
||||||
def _run_topology(self, mpirun: str, binary: str, label: str, hosts: list[dict], topo: dict) -> dict:
|
|
||||||
nodes = topo["nodes"]
|
|
||||||
gpus_per_node = topo["gpus_per_node"]
|
|
||||||
selected_hosts = hosts[:nodes]
|
|
||||||
host_arg = ",".join(f"{h['addr']}:{gpus_per_node}" for h in selected_hosts)
|
|
||||||
ranks = nodes * gpus_per_node
|
|
||||||
|
|
||||||
cmd = [
|
|
||||||
mpirun,
|
|
||||||
"--allow-run-as-root",
|
|
||||||
"--mca", "btl_openib_warn_no_device_params_found", "0",
|
|
||||||
"--mca", "btl_tcp_if_include", str(self.cfg.get("socket_ifname", "bond0")),
|
|
||||||
"--mca", "oob_tcp_if_include", str(self.cfg.get("oob_tcp_ifname", self.cfg.get("socket_ifname", "bond0"))),
|
|
||||||
"-H", host_arg,
|
|
||||||
"--map-by", f"ppr:{gpus_per_node}:node",
|
|
||||||
"-np", str(ranks),
|
|
||||||
]
|
|
||||||
plm_rsh_args = self.cfg.get("plm_rsh_args")
|
|
||||||
if plm_rsh_args:
|
|
||||||
cmd.extend(["--mca", "plm_rsh_args", str(plm_rsh_args)])
|
|
||||||
for key, value in self._env_exports(topo=topo, label=label, binary=os.path.basename(binary)):
|
|
||||||
cmd.extend(["-x", f"{key}={value}"])
|
|
||||||
|
|
||||||
cmd.extend([
|
|
||||||
binary,
|
|
||||||
"-b", str(self.cfg.get("begin_size", "1k")),
|
|
||||||
"-e", str(self.cfg.get("end_size", "16g")),
|
|
||||||
"-g", str(self.cfg.get("gpus_per_rank", 1)),
|
|
||||||
"-f", str(self.cfg.get("step_factor", 2)),
|
|
||||||
"-w", str(self.cfg.get("warmup_iters", 10)),
|
|
||||||
])
|
|
||||||
if self.cfg.get("iters") is not None:
|
|
||||||
cmd.extend(["-n", str(self.cfg["iters"])])
|
|
||||||
|
|
||||||
timeout = int(self.cfg.get("timeout_sec", 1800))
|
|
||||||
started = datetime.now().isoformat()
|
|
||||||
try:
|
|
||||||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, env=self._runtime_env())
|
|
||||||
except subprocess.TimeoutExpired:
|
|
||||||
result = {
|
|
||||||
"label": topo["label"],
|
|
||||||
"nodes": nodes,
|
|
||||||
"gpus_per_node": gpus_per_node,
|
|
||||||
"ranks": ranks,
|
|
||||||
"hosts": selected_hosts,
|
|
||||||
"command": " ".join(cmd),
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": f"timeout after {timeout}s",
|
|
||||||
"started_at": started,
|
|
||||||
}
|
|
||||||
self._write_artifacts(label, topo, result, "", "")
|
|
||||||
return result
|
|
||||||
|
|
||||||
parsed = self._parse_nccl_output(r.stdout)
|
|
||||||
net_diag = self._parse_network_diagnostics(r.stdout + "\n" + r.stderr)
|
|
||||||
threshold = self._threshold_for(label, topo)
|
|
||||||
wrong = sum(row.get("wrong", 0) for row in parsed["by_size"])
|
|
||||||
has_bw = parsed["peak_busbw_gbps"] > 0
|
|
||||||
status = "PASS" if r.returncode == 0 and has_bw and wrong == 0 and parsed["peak_busbw_gbps"] >= threshold else "FAIL"
|
|
||||||
result = {
|
|
||||||
"label": topo["label"],
|
|
||||||
"nodes": nodes,
|
|
||||||
"gpus_per_node": gpus_per_node,
|
|
||||||
"ranks": ranks,
|
|
||||||
"hosts": selected_hosts,
|
|
||||||
"cuda_visible_devices": topo.get("cuda_visible_devices"),
|
|
||||||
"command": " ".join(cmd),
|
|
||||||
"returncode": r.returncode,
|
|
||||||
"status": status,
|
|
||||||
"peak_busbw_gbps": parsed["peak_busbw_gbps"],
|
|
||||||
"peak_algbw_gbps": parsed["peak_algbw_gbps"],
|
|
||||||
"peak_size": parsed["peak_size"],
|
|
||||||
"avg_busbw_gbps": parsed["avg_busbw_gbps"],
|
|
||||||
"min_required_gbps": threshold,
|
|
||||||
"wrong_count": wrong,
|
|
||||||
"network": net_diag,
|
|
||||||
"by_size": parsed["by_size"],
|
|
||||||
"stderr_tail": r.stderr[-1200:],
|
|
||||||
"stdout_tail": r.stdout[-1200:],
|
|
||||||
"started_at": started,
|
|
||||||
"finished_at": datetime.now().isoformat(),
|
|
||||||
}
|
|
||||||
self._write_artifacts(label, topo, result, r.stdout, r.stderr)
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _write_artifacts(self, label: str, topo: dict, result: dict, stdout: str, stderr: str):
|
|
||||||
if not self.artifact_dir:
|
|
||||||
return
|
|
||||||
os.makedirs(self.artifact_dir, exist_ok=True)
|
|
||||||
prefix = _safe_name(f"{label}_{topo.get('nodes')}x{topo.get('gpus_per_node')}_{topo.get('label')}")
|
|
||||||
base = os.path.join(self.artifact_dir, prefix)
|
|
||||||
with open(base + ".cmd.txt", "w") as f:
|
|
||||||
f.write(result.get("command", ""))
|
|
||||||
f.write("\n")
|
|
||||||
with open(base + ".stdout.txt", "w") as f:
|
|
||||||
f.write(stdout)
|
|
||||||
with open(base + ".stderr.txt", "w") as f:
|
|
||||||
f.write(stderr)
|
|
||||||
artifact_result = {k: v for k, v in result.items() if k not in ("stdout_tail", "stderr_tail")}
|
|
||||||
with open(base + ".json", "w") as f:
|
|
||||||
json.dump(artifact_result, f, indent=2, default=str)
|
|
||||||
result["artifact_prefix"] = base
|
|
||||||
|
|
||||||
def _threshold_for(self, label: str, topo: dict = None) -> float:
|
|
||||||
if topo and topo.get("min_peak_busbw_gbps") is not None:
|
|
||||||
topo_thresholds = topo.get("min_peak_busbw_gbps")
|
|
||||||
if isinstance(topo_thresholds, dict):
|
|
||||||
return float(topo_thresholds.get(label, 0) or 0)
|
|
||||||
return float(topo_thresholds or 0)
|
|
||||||
|
|
||||||
thresholds = self.cfg.get("min_peak_busbw_gbps") or {}
|
|
||||||
if isinstance(thresholds, dict):
|
|
||||||
op_threshold = thresholds.get(label, 0)
|
|
||||||
if isinstance(op_threshold, dict):
|
|
||||||
keys = []
|
|
||||||
if topo:
|
|
||||||
keys.extend([
|
|
||||||
topo.get("label"),
|
|
||||||
f"{topo.get('nodes')}x{topo.get('gpus_per_node')}",
|
|
||||||
f"{topo.get('nodes')} nodes x {topo.get('gpus_per_node')} GPUs",
|
|
||||||
str(topo.get("gpus_per_node")),
|
|
||||||
])
|
|
||||||
keys.append("default")
|
|
||||||
for key in keys:
|
|
||||||
if key in op_threshold:
|
|
||||||
return float(op_threshold.get(key) or 0)
|
|
||||||
return 0.0
|
|
||||||
return float(op_threshold or 0)
|
|
||||||
return float(thresholds or 0)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_nccl_output(stdout: str) -> dict:
|
|
||||||
rows = []
|
|
||||||
avg_bus = 0.0
|
|
||||||
for line in stdout.splitlines():
|
|
||||||
stripped = line.strip()
|
|
||||||
if not stripped:
|
|
||||||
continue
|
|
||||||
avg_match = re.search(r"Avg bus bandwidth\s*:\s*([0-9.]+)", stripped)
|
|
||||||
if avg_match:
|
|
||||||
avg_bus = float(avg_match.group(1))
|
|
||||||
continue
|
|
||||||
if stripped.startswith("#"):
|
|
||||||
continue
|
|
||||||
parts = stripped.split()
|
|
||||||
if len(parts) < 9:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
size_bytes = int(parts[0])
|
|
||||||
time_us = float(parts[5])
|
|
||||||
algbw = float(parts[6])
|
|
||||||
busbw = float(parts[7])
|
|
||||||
wrong = int(parts[8])
|
|
||||||
except (ValueError, IndexError):
|
|
||||||
continue
|
|
||||||
rows.append({
|
|
||||||
"size_bytes": size_bytes,
|
|
||||||
"size": _format_size(size_bytes),
|
|
||||||
"time_us": time_us,
|
|
||||||
"algbw_gbps": algbw,
|
|
||||||
"busbw_gbps": busbw,
|
|
||||||
"wrong": wrong,
|
|
||||||
})
|
|
||||||
|
|
||||||
peak_row = max(rows, key=lambda r: r["busbw_gbps"], default={})
|
|
||||||
return {
|
|
||||||
"peak_busbw_gbps": round(float(peak_row.get("busbw_gbps", 0)), 2),
|
|
||||||
"peak_algbw_gbps": round(float(peak_row.get("algbw_gbps", 0)), 2),
|
|
||||||
"peak_size": peak_row.get("size", ""),
|
|
||||||
"avg_busbw_gbps": round(avg_bus, 2),
|
|
||||||
"by_size": rows,
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_network_diagnostics(output: str) -> dict:
|
|
||||||
networks = sorted(set(re.findall(r"Using network (\S+)", output)))
|
|
||||||
gdr_enabled = sorted(set(re.findall(r"GPU Direct RDMA Enabled for HCA \d+ '([^']+)'", output)))
|
|
||||||
gdr_disabled = sorted(set(re.findall(r"GPU Direct RDMA Disabled for HCA \d+ '([^']+)'", output)))
|
|
||||||
ib_using = []
|
|
||||||
for line in output.splitlines():
|
|
||||||
if "NET/IB : Using" in line:
|
|
||||||
text = line.split("NET/IB : ", 1)[-1].strip()
|
|
||||||
if text not in ib_using:
|
|
||||||
ib_using.append(text)
|
|
||||||
if gdr_disabled:
|
|
||||||
gdr_state = "DISABLED"
|
|
||||||
elif gdr_enabled or "/GDRDMA" in output:
|
|
||||||
gdr_state = "ENABLED"
|
|
||||||
elif networks:
|
|
||||||
gdr_state = "NOT_DISABLED_IN_LOG"
|
|
||||||
else:
|
|
||||||
gdr_state = "UNKNOWN"
|
|
||||||
return {
|
|
||||||
"networks": networks,
|
|
||||||
"ib_using": ib_using[:8],
|
|
||||||
"gdr_enabled_hcas": gdr_enabled,
|
|
||||||
"gdr_disabled_hcas": gdr_disabled,
|
|
||||||
"gpu_direct_rdma": gdr_state,
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def print_results(results: dict, console: Console = None):
|
|
||||||
c = console or Console()
|
|
||||||
if results.get("error"):
|
|
||||||
c.print(f"[bold red]Multi-node NCCL failed: {results['error']}[/bold red]")
|
|
||||||
else:
|
|
||||||
c.print("[bold green]Multi-node NCCL complete[/bold green]" if results.get("passed") else "[bold red]Multi-node NCCL failed[/bold red]")
|
|
||||||
|
|
||||||
preflight = results.get("preflight", {})
|
|
||||||
if preflight.get("checks"):
|
|
||||||
table = Table(title="Preflight")
|
|
||||||
table.add_column("Check")
|
|
||||||
table.add_column("Status")
|
|
||||||
table.add_column("Detail")
|
|
||||||
for check in preflight["checks"]:
|
|
||||||
table.add_row(check["name"], check["status"], str(check.get("detail", "")))
|
|
||||||
c.print(table)
|
|
||||||
|
|
||||||
for op, data in (results.get("tests") or {}).items():
|
|
||||||
table = Table(title=f"Multi-node NCCL {op}")
|
|
||||||
table.add_column("Topology")
|
|
||||||
table.add_column("Peak Bus BW")
|
|
||||||
table.add_column("Peak Size")
|
|
||||||
table.add_column("Threshold")
|
|
||||||
table.add_column("Status")
|
|
||||||
for topo in data.get("topologies", []):
|
|
||||||
table.add_row(
|
|
||||||
topo.get("label", ""),
|
|
||||||
f"{topo.get('peak_busbw_gbps', 0):.2f} GB/s",
|
|
||||||
str(topo.get("peak_size", "")),
|
|
||||||
f">= {_format_gbps(topo.get('min_required_gbps', 0))} GB/s" if topo.get("min_required_gbps") else "-",
|
|
||||||
topo.get("status", "?"),
|
|
||||||
)
|
|
||||||
c.print(table)
|
|
||||||
|
|
||||||
|
|
||||||
def _format_size(size_bytes: int) -> str:
|
|
||||||
units = [("G", 1024 ** 3), ("M", 1024 ** 2), ("K", 1024)]
|
|
||||||
for suffix, factor in units:
|
|
||||||
if size_bytes >= factor and size_bytes % factor == 0:
|
|
||||||
return f"{size_bytes // factor}{suffix}"
|
|
||||||
return str(size_bytes)
|
|
||||||
|
|
||||||
|
|
||||||
def _format_gbps(value) -> str:
|
|
||||||
try:
|
|
||||||
numeric = float(value)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
return str(value)
|
|
||||||
if numeric.is_integer():
|
|
||||||
return f"{numeric:.0f}"
|
|
||||||
return f"{numeric:.2f}"
|
|
||||||
|
|
||||||
|
|
||||||
def _safe_name(value: str) -> str:
|
|
||||||
text = re.sub(r"[^A-Za-z0-9_.-]+", "_", value.strip())
|
|
||||||
text = re.sub(r"_+", "_", text).strip("_")
|
|
||||||
return text[:160] or "case"
|
|
||||||
@ -23,20 +23,6 @@ except ImportError:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
# Per-operation bandwidth thresholds, as a fraction of NVLink bidirectional BW.
|
|
||||||
# Values aligned with the H100 production acceptance criteria (acceptance doc §5).
|
|
||||||
# AllToAll runs ~10-20% lower than AllReduce on 8-GPU NVSwitch, so its fraction is
|
|
||||||
# set lower; broadcast/sendrecv sit between.
|
|
||||||
_OP_BW_FRACTIONS = {
|
|
||||||
"allreduce": 0.45,
|
|
||||||
"allgather": 0.45,
|
|
||||||
"reducescatter": 0.45,
|
|
||||||
"broadcast": 0.40,
|
|
||||||
"sendrecv": 0.40,
|
|
||||||
"alltoall": 0.35,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class NCCLTest:
|
class NCCLTest:
|
||||||
|
|
||||||
def __init__(self, config: dict):
|
def __init__(self, config: dict):
|
||||||
@ -94,17 +80,12 @@ class NCCLTest:
|
|||||||
tests.append(("sendrecv_perf", "SendRecv"))
|
tests.append(("sendrecv_perf", "SendRecv"))
|
||||||
|
|
||||||
nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0)
|
nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0)
|
||||||
# User-provided override applies uniformly across all ops; otherwise
|
if nvlink_bw > 0:
|
||||||
# each op gets its own threshold from _OP_BW_FRACTIONS.
|
default_min_bw = nvlink_bw * 0.4
|
||||||
user_override = self.nccl_cfg.get("min_bandwidth_gbps")
|
else:
|
||||||
|
# Conservative floor: any working NVLink should exceed 10 GB/s
|
||||||
def threshold_for(label: str) -> float:
|
default_min_bw = 10
|
||||||
if user_override:
|
min_bw = self.nccl_cfg.get("min_bandwidth_gbps") or round(default_min_bw)
|
||||||
return float(user_override)
|
|
||||||
if nvlink_bw <= 0:
|
|
||||||
return 10.0 # conservative floor
|
|
||||||
frac = _OP_BW_FRACTIONS.get(label.lower(), 0.45)
|
|
||||||
return round(nvlink_bw * frac)
|
|
||||||
|
|
||||||
if self.gpu_type == "unknown":
|
if self.gpu_type == "unknown":
|
||||||
self.console.print("[yellow]Unknown GPU — using conservative bandwidth thresholds[/yellow]")
|
self.console.print("[yellow]Unknown GPU — using conservative bandwidth thresholds[/yellow]")
|
||||||
@ -122,9 +103,8 @@ class NCCLTest:
|
|||||||
|
|
||||||
for binary, label in tests:
|
for binary, label in tests:
|
||||||
progress.update(task, description=f"NCCL {label}...")
|
progress.update(task, description=f"NCCL {label}...")
|
||||||
op_min_bw = threshold_for(label)
|
|
||||||
result = self._run_one_nccl_test_direct(
|
result = self._run_one_nccl_test_direct(
|
||||||
binary, label, gpu_count, op_min_bw
|
binary, label, gpu_count, min_bw
|
||||||
)
|
)
|
||||||
if result.get("status") not in ("SKIP", None) and "error" not in result:
|
if result.get("status") not in ("SKIP", None) and "error" not in result:
|
||||||
any_binary_worked = True
|
any_binary_worked = True
|
||||||
@ -134,7 +114,7 @@ class NCCLTest:
|
|||||||
mpirun = self._find_mpirun()
|
mpirun = self._find_mpirun()
|
||||||
if mpirun:
|
if mpirun:
|
||||||
result = self._run_one_nccl_test_mpirun(
|
result = self._run_one_nccl_test_mpirun(
|
||||||
binary, label, gpu_count, mpirun, op_min_bw
|
binary, label, gpu_count, mpirun, min_bw
|
||||||
)
|
)
|
||||||
if result.get("status") not in ("SKIP", None) and "error" not in result:
|
if result.get("status") not in ("SKIP", None) and "error" not in result:
|
||||||
any_binary_worked = True
|
any_binary_worked = True
|
||||||
@ -154,9 +134,7 @@ class NCCLTest:
|
|||||||
return {
|
return {
|
||||||
"passed": all_passed,
|
"passed": all_passed,
|
||||||
"source": "nccl-tests",
|
"source": "nccl-tests",
|
||||||
"min_bandwidth_gbps": {
|
"min_bandwidth_gbps": min_bw,
|
||||||
lbl.lower(): threshold_for(lbl) for _, lbl in tests
|
|
||||||
},
|
|
||||||
"tests": results,
|
"tests": results,
|
||||||
"gpu_count": gpu_count,
|
"gpu_count": gpu_count,
|
||||||
"timestamp": datetime.now().isoformat(),
|
"timestamp": datetime.now().isoformat(),
|
||||||
@ -172,8 +150,8 @@ class NCCLTest:
|
|||||||
|
|
||||||
cmd = [
|
cmd = [
|
||||||
binary,
|
binary,
|
||||||
"-b", "8M",
|
"-b", "8",
|
||||||
"-e", "8G",
|
"-e", "256M",
|
||||||
"-f", "2",
|
"-f", "2",
|
||||||
"-g", str(gpu_count),
|
"-g", str(gpu_count),
|
||||||
"-w", "5",
|
"-w", "5",
|
||||||
@ -261,15 +239,12 @@ class NCCLTest:
|
|||||||
if not line or line.startswith("#"):
|
if not line or line.startswith("#"):
|
||||||
continue
|
continue
|
||||||
parts = line.split()
|
parts = line.split()
|
||||||
# nccl-tests data lines: size count type redop root time algbw busbw #wrong [time algbw busbw #wrong]
|
if len(parts) >= 7:
|
||||||
if len(parts) >= 9:
|
|
||||||
try:
|
try:
|
||||||
size = int(parts[0])
|
size = int(parts[0])
|
||||||
# parts[2] is dtype string ('float'/'int32'/etc.), not a number
|
algbw = float(parts[-3]) if len(parts) >= 3 else 0
|
||||||
# out-of-place columns: time=parts[5], algbw=parts[6], busbw=parts[7]
|
busbw = float(parts[-2]) if len(parts) >= 2 else 0
|
||||||
time_us = float(parts[5])
|
time_us = float(parts[2]) if len(parts) >= 3 else 0
|
||||||
algbw = float(parts[6])
|
|
||||||
busbw = float(parts[7])
|
|
||||||
size_results.append({
|
size_results.append({
|
||||||
"size": size,
|
"size": size,
|
||||||
"time_us": time_us,
|
"time_us": time_us,
|
||||||
|
|||||||
@ -3,7 +3,6 @@
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import time
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional, List
|
from typing import Optional, List
|
||||||
|
|
||||||
@ -38,69 +37,15 @@ class RDMATest:
|
|||||||
ports = sorted(os.listdir(ports_dir))
|
ports = sorted(os.listdir(ports_dir))
|
||||||
return ports
|
return ports
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _read_sys(path: str) -> str:
|
|
||||||
try:
|
|
||||||
with open(path) as f:
|
|
||||||
return f.read().strip()
|
|
||||||
except (FileNotFoundError, PermissionError, OSError):
|
|
||||||
return ""
|
|
||||||
|
|
||||||
def run(self) -> dict:
|
def run(self) -> dict:
|
||||||
devices = self._get_ib_devices()
|
devices = self._get_ib_devices()
|
||||||
if not devices:
|
if not devices:
|
||||||
self.console.print(
|
self.console.print("[yellow]No InfiniBand devices found[/yellow]")
|
||||||
"[yellow]No InfiniBand devices found — skipping RDMA test[/yellow]"
|
return {"error": "no_ib_devices", "passed": False}
|
||||||
)
|
|
||||||
return {
|
|
||||||
"status": "SKIP", "skipped": True,
|
|
||||||
"reason": "no IB hardware detected",
|
|
||||||
"timestamp": datetime.now().isoformat(),
|
|
||||||
}
|
|
||||||
|
|
||||||
# Only consider ports whose link_layer is InfiniBand — Ethernet
|
|
||||||
# bond/management interfaces (e.g. mlx5_bond_0) can show ACTIVE state
|
|
||||||
# without actually providing IB fabric connectivity.
|
|
||||||
ib_devices = []
|
|
||||||
active_ib_port = False
|
|
||||||
for dev in devices:
|
|
||||||
for port in self._get_ib_ports(dev):
|
|
||||||
link_layer = self._read_sys(
|
|
||||||
f"/sys/class/infiniband/{dev}/ports/{port}/link_layer")
|
|
||||||
if link_layer != "InfiniBand":
|
|
||||||
continue
|
|
||||||
ib_devices.append((dev, port))
|
|
||||||
state = self._read_sys(
|
|
||||||
f"/sys/class/infiniband/{dev}/ports/{port}/state")
|
|
||||||
if "ACTIVE" in state.upper():
|
|
||||||
active_ib_port = True
|
|
||||||
|
|
||||||
device_info = self._collect_device_info(devices)
|
|
||||||
if not ib_devices:
|
|
||||||
self.console.print(
|
|
||||||
"[yellow]No InfiniBand-link_layer ports present — "
|
|
||||||
"skipping RDMA benchmarks[/yellow]"
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"status": "SKIP", "skipped": True,
|
|
||||||
"reason": "no InfiniBand link_layer ports (only Ethernet/RoCE)",
|
|
||||||
"devices": device_info,
|
|
||||||
"timestamp": datetime.now().isoformat(),
|
|
||||||
}
|
|
||||||
if not active_ib_port:
|
|
||||||
self.console.print(
|
|
||||||
f"[yellow]{len(ib_devices)} IB port(s) detected but all DOWN — "
|
|
||||||
f"fabric not wired, skipping RDMA benchmarks[/yellow]"
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"status": "SKIP", "skipped": True,
|
|
||||||
"reason": f"{len(ib_devices)} IB port(s) found but all DOWN (fabric not wired)",
|
|
||||||
"devices": device_info,
|
|
||||||
"timestamp": datetime.now().isoformat(),
|
|
||||||
}
|
|
||||||
|
|
||||||
self.console.print(f"[cyan]RDMA Test - Devices: {', '.join(devices)}[/cyan]")
|
self.console.print(f"[cyan]RDMA Test - Devices: {', '.join(devices)}[/cyan]")
|
||||||
|
|
||||||
|
device_info = self._collect_device_info(devices)
|
||||||
bw_results = self._run_bandwidth_tests(devices)
|
bw_results = self._run_bandwidth_tests(devices)
|
||||||
latency_results = self._run_latency_tests(devices)
|
latency_results = self._run_latency_tests(devices)
|
||||||
|
|
||||||
@ -110,17 +55,13 @@ class RDMATest:
|
|||||||
if isinstance(r, dict)
|
if isinstance(r, dict)
|
||||||
)
|
)
|
||||||
|
|
||||||
result = {
|
return {
|
||||||
"passed": all_passed,
|
"passed": all_passed,
|
||||||
"devices": device_info,
|
"devices": device_info,
|
||||||
"bandwidth_tests": bw_results,
|
"bandwidth_tests": bw_results,
|
||||||
"latency_tests": latency_results,
|
"latency_tests": latency_results,
|
||||||
"timestamp": datetime.now().isoformat(),
|
"timestamp": datetime.now().isoformat(),
|
||||||
}
|
}
|
||||||
# Cross-node (two-host) RDMA, run only when a peer is configured.
|
|
||||||
if (self.rdma_cfg.get("cross_node", {}) or {}).get("enabled"):
|
|
||||||
result["cross_node"] = self.run_cross_node()
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _collect_device_info(self, devices: List[str]) -> List[dict]:
|
def _collect_device_info(self, devices: List[str]) -> List[dict]:
|
||||||
info = []
|
info = []
|
||||||
@ -257,207 +198,9 @@ class RDMATest:
|
|||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
|
||||||
# Cross-node (two-host) RDMA over perftest, orchestrated via SSH.
|
|
||||||
# Runs FROM the client host: for each IB device it launches the matching
|
|
||||||
# perftest server on the peer over SSH (held open in a live ssh channel),
|
|
||||||
# then runs the local client against the peer's OOB address and parses the
|
|
||||||
# result. Replaces the old standalone scripts/rdma_cross_node.sh.
|
|
||||||
# ------------------------------------------------------------------
|
|
||||||
|
|
||||||
def _active_ib_devices(self) -> List[str]:
|
|
||||||
"""IB devices whose port 1 is InfiniBand link_layer and ACTIVE."""
|
|
||||||
out = []
|
|
||||||
for dev in self._get_ib_devices():
|
|
||||||
for port in self._get_ib_ports(dev):
|
|
||||||
ll = self._read_sys(f"/sys/class/infiniband/{dev}/ports/{port}/link_layer")
|
|
||||||
st = self._read_sys(f"/sys/class/infiniband/{dev}/ports/{port}/state")
|
|
||||||
if ll == "InfiniBand" and "ACTIVE" in st.upper():
|
|
||||||
out.append(dev)
|
|
||||||
break
|
|
||||||
return out
|
|
||||||
|
|
||||||
def run_cross_node(self) -> dict:
|
|
||||||
cn = self.rdma_cfg.get("cross_node", {}) or {}
|
|
||||||
if not cn.get("enabled"):
|
|
||||||
return {"status": "SKIP", "skipped": True,
|
|
||||||
"reason": "rdma.cross_node.enabled is false"}
|
|
||||||
|
|
||||||
server = cn.get("server")
|
|
||||||
if not server:
|
|
||||||
return {"status": "SKIP", "skipped": True,
|
|
||||||
"reason": "rdma.cross_node.server (peer ssh address) not set"}
|
|
||||||
|
|
||||||
ssh_user = cn.get("ssh_user", "root")
|
|
||||||
server_target = server if "@" in server else f"{ssh_user}@{server}"
|
|
||||||
# OOB address the client's perftest connects to (defaults to the ssh host).
|
|
||||||
server_addr = cn.get("server_addr") or server.split("@")[-1]
|
|
||||||
ib_port = cn.get("ib_port", 1)
|
|
||||||
gid_index = cn.get("gid_index")
|
|
||||||
msg_size = cn.get("msg_size", 1048576)
|
|
||||||
iters = cn.get("iters", 5000)
|
|
||||||
base_port = cn.get("base_oob_port", 18515)
|
|
||||||
warmup = cn.get("server_warmup_sec", 2.0)
|
|
||||||
min_bw = cn.get("min_bandwidth_gbps", 350)
|
|
||||||
max_lat = cn.get("max_latency_us", 5)
|
|
||||||
|
|
||||||
devices = cn.get("devices") or self._active_ib_devices()
|
|
||||||
if not devices:
|
|
||||||
return {"status": "SKIP", "skipped": True,
|
|
||||||
"reason": "no active InfiniBand devices to test"}
|
|
||||||
|
|
||||||
has_bw = self._find_tool("ib_write_bw") is not None
|
|
||||||
has_lat = self._find_tool("ib_write_lat") is not None
|
|
||||||
if not has_bw and not has_lat:
|
|
||||||
return {"status": "SKIP", "skipped": True,
|
|
||||||
"reason": "perftest (ib_write_bw / ib_write_lat) not installed"}
|
|
||||||
|
|
||||||
self.console.print(
|
|
||||||
f"[cyan]Cross-node RDMA — client → {server_addr}, "
|
|
||||||
f"devices: {', '.join(devices)}[/cyan]")
|
|
||||||
|
|
||||||
per_device = []
|
|
||||||
for idx, dev in enumerate(devices):
|
|
||||||
oob = base_port + idx
|
|
||||||
entry = {"device": dev}
|
|
||||||
|
|
||||||
if has_bw:
|
|
||||||
bw = self._cross_node_perftest(
|
|
||||||
"ib_write_bw", dev, server_target, server_addr, ib_port,
|
|
||||||
oob, gid_index, warmup,
|
|
||||||
extra=["--report_gbits", "-s", str(msg_size), "-n", str(iters)],
|
|
||||||
parse="bw")
|
|
||||||
entry["bandwidth_gbps"] = bw
|
|
||||||
if isinstance(bw, (int, float)):
|
|
||||||
entry["bw_status"] = "PASS" if bw >= min_bw else "WARN"
|
|
||||||
else:
|
|
||||||
entry["bw_status"] = "FAIL"
|
|
||||||
|
|
||||||
if has_lat:
|
|
||||||
lat = self._cross_node_perftest(
|
|
||||||
"ib_write_lat", dev, server_target, server_addr, ib_port,
|
|
||||||
oob, gid_index, warmup, extra=[], parse="lat")
|
|
||||||
if isinstance(lat, dict):
|
|
||||||
entry["latency_us"] = lat.get("typical")
|
|
||||||
entry["latency_p99_us"] = lat.get("p99")
|
|
||||||
t = lat.get("typical")
|
|
||||||
entry["lat_status"] = ("PASS" if isinstance(t, (int, float)) and 0 < t <= max_lat
|
|
||||||
else ("WARN" if isinstance(t, (int, float)) else "FAIL"))
|
|
||||||
else:
|
|
||||||
entry["latency_us"] = lat
|
|
||||||
entry["lat_status"] = "FAIL"
|
|
||||||
|
|
||||||
per_device.append(entry)
|
|
||||||
|
|
||||||
statuses = [e.get(k) for e in per_device for k in ("bw_status", "lat_status") if e.get(k)]
|
|
||||||
verdict = "PASS"
|
|
||||||
for s in statuses:
|
|
||||||
if s == "FAIL":
|
|
||||||
verdict = "FAIL"
|
|
||||||
break
|
|
||||||
if s == "WARN" and verdict == "PASS":
|
|
||||||
verdict = "WARN"
|
|
||||||
|
|
||||||
return {
|
|
||||||
"status": verdict,
|
|
||||||
"server": server_addr,
|
|
||||||
"min_bandwidth_gbps": min_bw,
|
|
||||||
"max_latency_us": max_lat,
|
|
||||||
"per_device": per_device,
|
|
||||||
"timestamp": datetime.now().isoformat(),
|
|
||||||
}
|
|
||||||
|
|
||||||
def _cross_node_perftest(self, tool: str, dev: str, server_target: str,
|
|
||||||
server_addr: str, ib_port: int, oob_port: int,
|
|
||||||
gid_index, warmup: float, extra: List[str], parse: str):
|
|
||||||
"""Start `tool` server on the peer via SSH, run the local client, parse output.
|
|
||||||
|
|
||||||
Returns a float (bw, Gb/s), a dict {typical, p99} (lat, µs), or an error string.
|
|
||||||
"""
|
|
||||||
tool_path = self._find_tool(tool)
|
|
||||||
if not tool_path:
|
|
||||||
return f"{tool} not installed"
|
|
||||||
|
|
||||||
flags = ["-d", dev, "-i", str(ib_port), "-p", str(oob_port), "-F"]
|
|
||||||
if gid_index is not None:
|
|
||||||
flags += ["-x", str(gid_index)]
|
|
||||||
flags += extra
|
|
||||||
|
|
||||||
server_cmd = " ".join([tool] + flags) # server: no host argument
|
|
||||||
server_proc = None
|
|
||||||
try:
|
|
||||||
server_proc = subprocess.Popen(
|
|
||||||
["ssh", "-o", "BatchMode=yes", "-o", "StrictHostKeyChecking=no",
|
|
||||||
server_target, server_cmd],
|
|
||||||
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
|
||||||
time.sleep(warmup) # let the remote server bind before the client connects
|
|
||||||
|
|
||||||
client = subprocess.run([tool_path] + flags + [server_addr],
|
|
||||||
capture_output=True, text=True, timeout=120)
|
|
||||||
out = client.stdout + "\n" + (client.stderr or "")
|
|
||||||
return self._parse_perftest_lat(out) if parse == "lat" else self._parse_perftest_bw(out)
|
|
||||||
except subprocess.TimeoutExpired:
|
|
||||||
return "timeout"
|
|
||||||
except Exception as e: # noqa: BLE001
|
|
||||||
return f"error: {e}"
|
|
||||||
finally:
|
|
||||||
if server_proc and server_proc.poll() is None:
|
|
||||||
server_proc.terminate()
|
|
||||||
try:
|
|
||||||
server_proc.wait(timeout=5)
|
|
||||||
except Exception:
|
|
||||||
server_proc.kill()
|
|
||||||
# ib_write_* server normally exits after one run; pkill cleans up a
|
|
||||||
# leftover one if the client failed mid-handshake. -x matches the exact
|
|
||||||
# process name so it never kills this ssh command itself.
|
|
||||||
try:
|
|
||||||
subprocess.run(
|
|
||||||
["ssh", "-o", "BatchMode=yes", server_target, f"pkill -x {tool}"],
|
|
||||||
capture_output=True, timeout=10)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_perftest_bw(output: str) -> float:
|
|
||||||
"""Parse ib_write_bw rows (#bytes #iter BW_peak BW_avg ...); return max BW avg."""
|
|
||||||
best = 0.0
|
|
||||||
for line in output.splitlines():
|
|
||||||
parts = line.split()
|
|
||||||
if len(parts) >= 4:
|
|
||||||
try:
|
|
||||||
int(parts[0]) # #bytes column
|
|
||||||
best = max(best, float(parts[3])) # BW average[Gb/sec]
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
return round(best, 2) if best else 0.0
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse_perftest_lat(output: str) -> dict:
|
|
||||||
"""Parse ib_write_lat row (#bytes #iter t_min t_max t_typical t_avg ... 99%)."""
|
|
||||||
for line in output.splitlines():
|
|
||||||
parts = line.split()
|
|
||||||
if len(parts) >= 6:
|
|
||||||
try:
|
|
||||||
int(parts[0]); int(parts[1])
|
|
||||||
typical = float(parts[4]) # t_typical[usec]
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
p99 = None
|
|
||||||
if len(parts) >= 8:
|
|
||||||
try:
|
|
||||||
p99 = float(parts[7]) # 99% percentile[usec]
|
|
||||||
except ValueError:
|
|
||||||
p99 = None
|
|
||||||
return {"typical": round(typical, 2), "p99": round(p99, 2) if p99 else None}
|
|
||||||
return {"typical": None, "p99": None}
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def print_results(results: dict, console: Console = None):
|
def print_results(results: dict, console: Console = None):
|
||||||
c = console or Console()
|
c = console or Console()
|
||||||
if results.get("skipped") or results.get("status") == "SKIP":
|
|
||||||
c.print(f"\n[bold yellow]RDMA/InfiniBand: SKIPPED[/bold yellow] "
|
|
||||||
f"[dim]({results.get('reason', 'no IB hardware')})[/dim]")
|
|
||||||
return
|
|
||||||
if "error" in results:
|
if "error" in results:
|
||||||
c.print(f"[bold red]Error: {results['error']}[/bold red]")
|
c.print(f"[bold red]Error: {results['error']}[/bold red]")
|
||||||
return
|
return
|
||||||
@ -495,29 +238,3 @@ class RDMATest:
|
|||||||
c.print(f" {t['test']}: [{sc}]{status}[/{sc}] "
|
c.print(f" {t['test']}: [{sc}]{status}[/{sc}] "
|
||||||
f"({lat:.2f} us, max: {t.get('max_allowed_us', 'N/A')} us)" if status != "SKIP"
|
f"({lat:.2f} us, max: {t.get('max_allowed_us', 'N/A')} us)" if status != "SKIP"
|
||||||
else f" {t['test']}: [dim]SKIPPED[/dim]")
|
else f" {t['test']}: [dim]SKIPPED[/dim]")
|
||||||
|
|
||||||
cn = results.get("cross_node")
|
|
||||||
if cn:
|
|
||||||
if cn.get("skipped"):
|
|
||||||
c.print(f"\n [bold]Cross-node RDMA[/bold]: [dim]SKIPPED "
|
|
||||||
f"({cn.get('reason', '')})[/dim]")
|
|
||||||
else:
|
|
||||||
v = cn.get("status", "?")
|
|
||||||
vc = "green" if v == "PASS" else ("yellow" if v == "WARN" else "red")
|
|
||||||
c.print(f"\n [bold]Cross-node RDMA[/bold] (server {cn.get('server')}) "
|
|
||||||
f"[{vc}]{v}[/{vc}] "
|
|
||||||
f"[dim]min {cn.get('min_bandwidth_gbps')} Gb/s, "
|
|
||||||
f"max {cn.get('max_latency_us')} µs[/dim]")
|
|
||||||
for e in cn.get("per_device", []):
|
|
||||||
bw = e.get("bandwidth_gbps")
|
|
||||||
lat = e.get("latency_us")
|
|
||||||
bws = e.get("bw_status", "")
|
|
||||||
lts = e.get("lat_status", "")
|
|
||||||
bc = "green" if bws == "PASS" else ("yellow" if bws == "WARN" else "red")
|
|
||||||
lc = "green" if lts == "PASS" else ("yellow" if lts == "WARN" else "red")
|
|
||||||
bw_s = f"{bw:.1f} Gb/s" if isinstance(bw, (int, float)) else str(bw)
|
|
||||||
lat_s = f"{lat:.2f} µs" if isinstance(lat, (int, float)) else str(lat)
|
|
||||||
p99 = e.get("latency_p99_us")
|
|
||||||
p99_s = f", p99 {p99:.2f}" if isinstance(p99, (int, float)) else ""
|
|
||||||
c.print(f" {e['device']}: BW [{bc}]{bw_s}[/{bc}] | "
|
|
||||||
f"lat [{lc}]{lat_s}[/{lc}]{p99_s}")
|
|
||||||
|
|||||||
@ -256,35 +256,25 @@ class ReportGenerator:
|
|||||||
lines.append(f"Source: {mem_data.get('source', 'unknown')}\n")
|
lines.append(f"Source: {mem_data.get('source', 'unknown')}\n")
|
||||||
lines.append("| Metric | Value | Peak | Efficiency |")
|
lines.append("| Metric | Value | Peak | Efficiency |")
|
||||||
lines.append("|--------|-------|------|------------|")
|
lines.append("|--------|-------|------|------------|")
|
||||||
d2d = mem_data.get("d2d_bandwidth_gbps") or 0
|
d2d = mem_data.get("d2d_bandwidth_gbps", 0)
|
||||||
h2d = mem_data.get("h2d_bandwidth_gbps") or 0
|
h2d = mem_data.get("h2d_bandwidth_gbps", 0)
|
||||||
d2h = mem_data.get("d2h_bandwidth_gbps") or 0
|
d2h = mem_data.get("d2h_bandwidth_gbps", 0)
|
||||||
# New format with per-metric peaks
|
# New format with per-metric peaks
|
||||||
h2d_peak = mem_data.get("h2d_peak_gbps") or 0
|
h2d_peak = mem_data.get("h2d_peak_gbps", 0)
|
||||||
d2h_peak = mem_data.get("d2h_peak_gbps") or 0
|
d2h_peak = mem_data.get("d2h_peak_gbps", 0)
|
||||||
d2d_peak = mem_data.get("d2d_peak_gbps") or 0
|
d2d_peak = mem_data.get("d2d_peak_gbps", 0)
|
||||||
h2d_eff = mem_data.get("h2d_efficiency_pct") or 0
|
h2d_eff = mem_data.get("h2d_efficiency_pct", 0)
|
||||||
d2h_eff = mem_data.get("d2h_efficiency_pct") or 0
|
d2h_eff = mem_data.get("d2h_efficiency_pct", 0)
|
||||||
d2d_eff = mem_data.get("d2d_efficiency_pct") or 0
|
d2d_eff = mem_data.get("d2d_efficiency_pct", 0)
|
||||||
# Fallback for old format
|
# Fallback for old format
|
||||||
if not d2d_peak:
|
if not d2d_peak:
|
||||||
d2d_peak = mem_data.get("peak_bandwidth_gbps") or 0
|
d2d_peak = mem_data.get("peak_bandwidth_gbps", 0)
|
||||||
d2d_eff = mem_data.get("efficiency_pct") or 0
|
d2d_eff = mem_data.get("efficiency_pct", 0)
|
||||||
lines.append(f"| H2D (PCIe) | {h2d:.1f} GB/s | {h2d_peak:.0f} GB/s | {h2d_eff:.1f}% |")
|
lines.append(f"| H2D (PCIe) | {h2d:.1f} GB/s | {h2d_peak:.0f} GB/s | {h2d_eff:.1f}% |")
|
||||||
lines.append(f"| D2H (PCIe) | {d2h:.1f} GB/s | {d2h_peak:.0f} GB/s | {d2h_eff:.1f}% |")
|
lines.append(f"| D2H (PCIe) | {d2h:.1f} GB/s | {d2h_peak:.0f} GB/s | {d2h_eff:.1f}% |")
|
||||||
lines.append(f"| D2D (NVLink) | {d2d:.1f} GB/s | {d2d_peak:.0f} GB/s | {d2d_eff:.1f}% |")
|
lines.append(f"| D2D (NVLink) | {d2d:.1f} GB/s | {d2d_peak:.0f} GB/s | {d2d_eff:.1f}% |")
|
||||||
lines.append("")
|
lines.append("")
|
||||||
# PyTorch fallback can't accurately measure HBM peak (intra-GPU copy_()
|
verdict = "PASS" if d2d_eff >= 50 else ("WARN" if d2d_eff >= 30 else "FAIL")
|
||||||
# only reaches ~20% of HBM bandwidth). When fallback is used, report
|
|
||||||
# the number but mark as WARN with a note instead of evaluating as FAIL.
|
|
||||||
if mem_data.get("source") == "pytorch":
|
|
||||||
lines.append(
|
|
||||||
f"**Verdict: WARN** (D2D {d2d:.1f} GB/s via PyTorch fallback; "
|
|
||||||
"nvbandwidth unavailable — figure is indicative only, not a true HBM peak)\n"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# Tightened to match production acceptance: PASS >= 80%, WARN 60–80%, FAIL < 60%.
|
|
||||||
verdict = "PASS" if d2d_eff >= 80 else ("WARN" if d2d_eff >= 60 else "FAIL")
|
|
||||||
lines.append(f"**Verdict: {verdict}** (D2D efficiency {d2d_eff:.1f}%)\n")
|
lines.append(f"**Verdict: {verdict}** (D2D efficiency {d2d_eff:.1f}%)\n")
|
||||||
|
|
||||||
# --- Compute Throughput ---
|
# --- Compute Throughput ---
|
||||||
@ -294,18 +284,9 @@ class ReportGenerator:
|
|||||||
per_dtype = comp_data.get("per_dtype_tflops", {})
|
per_dtype = comp_data.get("per_dtype_tflops", {})
|
||||||
peak_tflops = comp_data.get("peak_tflops", {})
|
peak_tflops = comp_data.get("peak_tflops", {})
|
||||||
eff_pct = comp_data.get("efficiency_pct", {})
|
eff_pct = comp_data.get("efficiency_pct", {})
|
||||||
# Absolute PASS thresholds (TFLOPS) from gpu_specs.compute_pass_thresholds_tflops.
|
|
||||||
# When present, override the legacy 80%-of-peak rule on a per-dtype basis.
|
|
||||||
pass_thresholds = comp_data.get("pass_thresholds_tflops", {}) or {}
|
|
||||||
use_abs = bool(pass_thresholds)
|
|
||||||
if use_abs:
|
|
||||||
lines.append("| DType | Achieved (TFLOPS) | Peak | Threshold | Status |")
|
|
||||||
else:
|
|
||||||
lines.append("| DType | Achieved (TFLOPS) | Peak | Efficiency | Status |")
|
lines.append("| DType | Achieved (TFLOPS) | Peak | Efficiency | Status |")
|
||||||
lines.append("|-------|-------------------|------|------------|--------|")
|
lines.append("|-------|-------------------|------|------------|--------|")
|
||||||
worst_eff = 100.0
|
worst_eff = 100.0
|
||||||
overall_status = "PASS"
|
|
||||||
rank = {"PASS": 0, "WARN": 1, "FAIL": 2, "SKIP": 0}
|
|
||||||
for dt, val in per_dtype.items():
|
for dt, val in per_dtype.items():
|
||||||
if isinstance(val, str):
|
if isinstance(val, str):
|
||||||
# skipped or error
|
# skipped or error
|
||||||
@ -315,26 +296,11 @@ class ReportGenerator:
|
|||||||
ef = eff_pct.get(dt, 0)
|
ef = eff_pct.get(dt, 0)
|
||||||
if isinstance(ef, (int, float)) and ef > 0:
|
if isinstance(ef, (int, float)) and ef > 0:
|
||||||
worst_eff = min(worst_eff, ef)
|
worst_eff = min(worst_eff, ef)
|
||||||
thr = pass_thresholds.get(dt)
|
|
||||||
if use_abs and thr:
|
|
||||||
if val >= thr:
|
|
||||||
status = "PASS"
|
|
||||||
elif val >= thr * 0.9:
|
|
||||||
status = "WARN"
|
|
||||||
else:
|
|
||||||
status = "FAIL"
|
|
||||||
lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | >= {thr} | {status} |")
|
|
||||||
else:
|
|
||||||
status = "PASS" if ef >= 80 else ("WARN" if ef >= 50 else "FAIL")
|
status = "PASS" if ef >= 80 else ("WARN" if ef >= 50 else "FAIL")
|
||||||
lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | {ef:.1f}% | {status} |")
|
lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | {ef:.1f}% | {status} |")
|
||||||
if rank.get(status, 0) > rank.get(overall_status, 0):
|
|
||||||
overall_status = status
|
|
||||||
lines.append("")
|
lines.append("")
|
||||||
if use_abs:
|
overall = "PASS" if worst_eff >= 80 else ("WARN" if worst_eff >= 50 else "FAIL")
|
||||||
lines.append(f"**Verdict: {overall_status}** (absolute TFLOPS thresholds; worst efficiency {worst_eff:.1f}%)\n")
|
lines.append(f"**Verdict: {overall}** (worst efficiency {worst_eff:.1f}%)\n")
|
||||||
else:
|
|
||||||
overall_status = "PASS" if worst_eff >= 80 else ("WARN" if worst_eff >= 50 else "FAIL")
|
|
||||||
lines.append(f"**Verdict: {overall_status}** (worst efficiency {worst_eff:.1f}%)\n")
|
|
||||||
|
|
||||||
# --- NCCL ---
|
# --- NCCL ---
|
||||||
nccl = results.get("nccl")
|
nccl = results.get("nccl")
|
||||||
@ -363,8 +329,8 @@ class ReportGenerator:
|
|||||||
if stress and not stress.get("error"):
|
if stress and not stress.get("error"):
|
||||||
lines.append("## Stress Test\n")
|
lines.append("## Stress Test\n")
|
||||||
passed = stress.get("passed", False)
|
passed = stress.get("passed", False)
|
||||||
duration = stress.get("duration_sec") or 0
|
duration = stress.get("duration_sec", 0)
|
||||||
elapsed = stress.get("elapsed_sec") or 0
|
elapsed = stress.get("elapsed_sec", 0)
|
||||||
source = stress.get("source", "unknown")
|
source = stress.get("source", "unknown")
|
||||||
lines.append(f"- **Source:** {source}")
|
lines.append(f"- **Source:** {source}")
|
||||||
lines.append(f"- **Duration:** {elapsed:.0f}s (requested {duration}s)")
|
lines.append(f"- **Duration:** {elapsed:.0f}s (requested {duration}s)")
|
||||||
@ -373,10 +339,7 @@ class ReportGenerator:
|
|||||||
|
|
||||||
# --- RDMA ---
|
# --- RDMA ---
|
||||||
rdma = results.get("rdma")
|
rdma = results.get("rdma")
|
||||||
if rdma and (rdma.get("skipped") or rdma.get("status") == "SKIP"):
|
if rdma and not rdma.get("error"):
|
||||||
lines.append("## RDMA/InfiniBand\n")
|
|
||||||
lines.append(f"**Overall: SKIP** [{rdma.get('reason', 'no IB hardware detected')}]\n")
|
|
||||||
elif rdma and not rdma.get("error"):
|
|
||||||
lines.append("## RDMA/InfiniBand\n")
|
lines.append("## RDMA/InfiniBand\n")
|
||||||
bw_tests = rdma.get("bandwidth_tests", [])
|
bw_tests = rdma.get("bandwidth_tests", [])
|
||||||
lat_tests = rdma.get("latency_tests", [])
|
lat_tests = rdma.get("latency_tests", [])
|
||||||
@ -468,13 +431,9 @@ class ReportGenerator:
|
|||||||
if mem:
|
if mem:
|
||||||
if mem.get("error"):
|
if mem.get("error"):
|
||||||
items.append(("Memory Bandwidth", f"ERROR: {mem['error']}"))
|
items.append(("Memory Bandwidth", f"ERROR: {mem['error']}"))
|
||||||
elif mem.get("source") == "pytorch":
|
|
||||||
# PyTorch fallback can't reach HBM peak — report as WARN, not FAIL.
|
|
||||||
d2d = mem.get("d2d_bandwidth_gbps") or 0
|
|
||||||
items.append(("Memory Bandwidth", f"WARN ({d2d:.0f} GB/s via PyTorch fallback)"))
|
|
||||||
else:
|
else:
|
||||||
eff = mem.get("efficiency_pct") or 0
|
eff = mem.get("efficiency_pct", 0)
|
||||||
verdict = "PASS" if eff >= 80 else ("WARN" if eff >= 60 else "FAIL")
|
verdict = "PASS" if eff >= 80 else ("WARN" if eff >= 50 else "FAIL")
|
||||||
items.append(("Memory Bandwidth", f"{verdict} ({eff:.1f}%)"))
|
items.append(("Memory Bandwidth", f"{verdict} ({eff:.1f}%)"))
|
||||||
|
|
||||||
# Compute
|
# Compute
|
||||||
@ -483,36 +442,7 @@ class ReportGenerator:
|
|||||||
if comp.get("error"):
|
if comp.get("error"):
|
||||||
items.append(("Compute Throughput", f"ERROR: {comp['error']}"))
|
items.append(("Compute Throughput", f"ERROR: {comp['error']}"))
|
||||||
else:
|
else:
|
||||||
per_dtype = comp.get("per_dtype_tflops", {})
|
|
||||||
eff_pct = comp.get("efficiency_pct", {})
|
eff_pct = comp.get("efficiency_pct", {})
|
||||||
pass_thresholds = comp.get("pass_thresholds_tflops", {}) or {}
|
|
||||||
if pass_thresholds:
|
|
||||||
# Absolute TFLOPS judgment, mirroring the per-dtype table above.
|
|
||||||
rank = {"PASS": 0, "WARN": 1, "FAIL": 2}
|
|
||||||
worst_status = "PASS"
|
|
||||||
worst_dt = None
|
|
||||||
for dt, thr in pass_thresholds.items():
|
|
||||||
val = per_dtype.get(dt)
|
|
||||||
if not isinstance(val, (int, float)):
|
|
||||||
continue
|
|
||||||
if val >= thr:
|
|
||||||
st = "PASS"
|
|
||||||
elif val >= thr * 0.9:
|
|
||||||
st = "WARN"
|
|
||||||
else:
|
|
||||||
st = "FAIL"
|
|
||||||
if rank[st] > rank[worst_status]:
|
|
||||||
worst_status = st
|
|
||||||
worst_dt = dt
|
|
||||||
if worst_dt:
|
|
||||||
items.append((
|
|
||||||
"Compute Throughput",
|
|
||||||
f"{worst_status} (worst {worst_dt.upper()} "
|
|
||||||
f"{per_dtype[worst_dt]:.0f} vs >= {pass_thresholds[worst_dt]})"
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
items.append(("Compute Throughput", f"{worst_status}"))
|
|
||||||
else:
|
|
||||||
valid_effs = [v for v in eff_pct.values() if isinstance(v, (int, float)) and v > 0]
|
valid_effs = [v for v in eff_pct.values() if isinstance(v, (int, float)) and v > 0]
|
||||||
if valid_effs:
|
if valid_effs:
|
||||||
worst = min(valid_effs)
|
worst = min(valid_effs)
|
||||||
@ -544,9 +474,7 @@ class ReportGenerator:
|
|||||||
# RDMA
|
# RDMA
|
||||||
if "rdma" in results:
|
if "rdma" in results:
|
||||||
r = results["rdma"]
|
r = results["rdma"]
|
||||||
if r.get("skipped") or r.get("status") == "SKIP":
|
if r.get("error"):
|
||||||
items.append(("RDMA", f"SKIP ({r.get('reason', 'no IB hardware')})"))
|
|
||||||
elif r.get("error"):
|
|
||||||
items.append(("RDMA", f"ERROR: {r['error']}"))
|
items.append(("RDMA", f"ERROR: {r['error']}"))
|
||||||
elif r.get("passed"):
|
elif r.get("passed"):
|
||||||
items.append(("RDMA", "PASS"))
|
items.append(("RDMA", "PASS"))
|
||||||
|
|||||||
@ -144,13 +144,7 @@ class StressTest:
|
|||||||
alloc_bytes = min(target_mem, int(free_mem * 0.95))
|
alloc_bytes = min(target_mem, int(free_mem * 0.95))
|
||||||
|
|
||||||
# matmul(A, A.T) needs 2x input memory (input + output)
|
# matmul(A, A.T) needs 2x input memory (input + output)
|
||||||
mem_side = int((alloc_bytes / 4 / 2) ** 0.5)
|
side = int((alloc_bytes / 4 / 2) ** 0.5) # float32 = 4 bytes
|
||||||
# Cap compute matrix so a single matmul completes in ~2s on H100/H200
|
|
||||||
# (FP32 ≈ 67 TFLOPS → 2*4096³/67e12 ≈ 2s). Without this cap, a 141GB
|
|
||||||
# HBM yields side ≈ 131K → single matmul ~68s × 8 GPUs serial → loop
|
|
||||||
# overshoots a 60s duration request by 10×+.
|
|
||||||
MAX_COMPUTE_SIDE = 4096
|
|
||||||
side = min(mem_side, MAX_COMPUTE_SIDE)
|
|
||||||
|
|
||||||
actual_mem_mb = side * side * 4 / 1024 / 1024
|
actual_mem_mb = side * side * 4 / 1024 / 1024
|
||||||
total_mem_mb = total_mem / 1024 / 1024
|
total_mem_mb = total_mem / 1024 / 1024
|
||||||
@ -167,15 +161,11 @@ class StressTest:
|
|||||||
|
|
||||||
elapsed_check = 0
|
elapsed_check = 0
|
||||||
while time.time() - t0 < duration:
|
while time.time() - t0 < duration:
|
||||||
# Dispatch matmul on all GPUs in parallel — do NOT synchronize between
|
|
||||||
# GPUs, otherwise the 8 GPUs run serially and overshoot the duration.
|
|
||||||
for i in range(gpu_count):
|
for i in range(gpu_count):
|
||||||
with torch.cuda.device(i):
|
with torch.cuda.device(i):
|
||||||
tensors[i] = torch.matmul(tensors[i], tensors[i].T)
|
tensors[i] = torch.matmul(tensors[i], tensors[i].T)
|
||||||
# Single sync per pass — waits for all 8 streams concurrently
|
|
||||||
for i in range(gpu_count):
|
|
||||||
with torch.cuda.device(i):
|
|
||||||
torch.cuda.synchronize()
|
torch.cuda.synchronize()
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
# Show progress every 10 seconds
|
# Show progress every 10 seconds
|
||||||
current_elapsed = time.time() - t0
|
current_elapsed = time.time() - t0
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user