diff --git a/configs/default.yaml b/configs/default.yaml index fd9e995..4a13313 100644 --- a/configs/default.yaml +++ b/configs/default.yaml @@ -17,6 +17,7 @@ benchmark: matrix_size: 4096 warmup: 10 iterations: 100 + use_compile: false health: temp_warning: 80 diff --git a/modules/benchmark.py b/modules/benchmark.py index ba91221..e58a8f0 100644 --- a/modules/benchmark.py +++ b/modules/benchmark.py @@ -30,7 +30,8 @@ class Benchmark: self.console = Console() self.bench_cfg = config.get("benchmark", {}) self.tools_dir = resolve_tools_dir(config) - self.gpu_type = detect_gpu_type() + cfg_gpu_type = config.get("gpu_type", "auto") + self.gpu_type = cfg_gpu_type if cfg_gpu_type != "auto" else detect_gpu_type() self.specs = get_gpu_specs(self.gpu_type) self.gpu_label = get_gpu_label(self.gpu_type) @@ -125,8 +126,12 @@ class Benchmark: continue try: - cmd = [nvbw_path, "-t", tc, "-b", str(buffer_mb), - "-i", str(samples), "-j"] + # --disableAffinity skips nvbandwidth's CPU affinity setup, which + # calls nvmlDeviceGetHandleByUUID() — that lookup fails on hosts + # whose fabricmanager build doesn't expose the UUID format nvml + # expects (seen on H20-3e with custom 570.172.08-1 fabricmanager). + cmd = [nvbw_path, "--disableAffinity", "-t", tc, + "-b", str(buffer_mb), "-i", str(samples), "-j"] r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if r.returncode == 0 and r.stdout.strip(): @@ -147,6 +152,15 @@ class Benchmark: h2d_bw = results_by_test.get("h2d", 0) d2h_bw = results_by_test.get("d2h", 0) + # If every subtest returned 0 the nvbandwidth binary is broken on this host + # (e.g. CUDA_ERROR_INVALID_CONTEXT, NVML mismatch). Fall back to PyTorch. + if all(v == 0 for v in results_by_test.values()): + self.console.print( + "[yellow]nvbandwidth returned no usable data — " + "falling back to PyTorch memory benchmark[/yellow]" + ) + return self._run_memory_pytorch() + # D2D goes through NVLink — compare to NVLink per-direction bandwidth # (nvlink_bandwidth_gbps is bidirectional, so per-direction = /2) nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0) @@ -196,9 +210,12 @@ class Benchmark: for cell in row: try: v = float(cell) - values.append(v) except (ValueError, TypeError): continue + # Exclude diagonal entries (intra-device, reported as 0 or + # N/A) so they don't drag the off-diagonal average down. + if v > 0: + values.append(v) if values: return sum(values) / len(values) return 0.0 @@ -298,6 +315,7 @@ class Benchmark: matrix_size = comp_cfg.get("matrix_size", 4096) warmup = comp_cfg.get("warmup", 10) iterations = comp_cfg.get("iterations", 100) + use_compile = comp_cfg.get("use_compile", False) if not TORCH_AVAILABLE: self.console.print("[yellow]PyTorch not available - skipping compute benchmark[/yellow]") @@ -306,6 +324,25 @@ class Benchmark: gpu_count = torch.cuda.device_count() self.console.print(f"[cyan]Compute Benchmark - {gpu_count} GPU(s)[/cyan]") + # torch.compile(max-autotune) benchmarks cuBLAS vs Triton kernels and picks + # the fastest for this GPU/shape, typically improving efficiency by 8-15%. + # compile_warmup must be larger than warmup to absorb JIT + autotuning time. + mm_fn = torch.matmul + compile_warmup = warmup + if use_compile: + try: + _compiled = torch.compile(torch.matmul, mode="max-autotune") + # Trial call to trigger JIT and verify compilation succeeds before the dtype loop. + _t = torch.randn(64, 64, device="cuda", dtype=torch.float32) + _compiled(_t, _t) + torch.cuda.synchronize() + del _t + mm_fn = _compiled + compile_warmup = max(warmup, 50) + self.console.print("[cyan] torch.compile(max-autotune) enabled[/cyan]") + except Exception as e: + self.console.print(f"[yellow] torch.compile unavailable ({type(e).__name__}), using eager[/yellow]") + dtype_map = { "fp32": (torch.float32, self.specs["fp32_tflops"]), "tf32": ("tf32", self.specs["tf32_tflops"]), @@ -347,40 +384,60 @@ class Benchmark: M = N = K = matrix_size + # Allocate enough matrix pairs so total memory exceeds GPU L2 cache + # (H100/H200 L2 = 50 MB), preventing cross-iteration cache reuse. + elem_bytes = 1 if dtype_name == "fp8" else torch.tensor([], dtype=dtype_val).element_size() + pair_bytes = 2 * M * K * elem_bytes + num_pools = max(4, -(-256 * 1024 * 1024 // pair_bytes)) # ceil(256MB / pair) + + pools_a = pools_b = c = None + if dtype_name == "fp8": - a = torch.randn(M, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) - b = torch.randn(N, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) + pools_a = [torch.randn(M, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)] + pools_b = [torch.randn(N, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)] scale_a = torch.tensor(1.0, device="cuda") scale_b = torch.tensor(1.0, device="cuda") - def _fp8_mm(): - return torch._scaled_mm(a, b.T, scale_a=scale_a, scale_b=scale_b, out_dtype=torch.bfloat16) - else: - a = torch.randn(M, K, device="cuda", dtype=dtype_val) - b = torch.randn(K, N, device="cuda", dtype=dtype_val) - - if dtype_name == "fp8": - for _ in range(warmup): - _fp8_mm() + def _fp8_mm(i): + return torch._scaled_mm(pools_a[i], pools_b[i].T, scale_a=scale_a, scale_b=scale_b, out_dtype=torch.bfloat16) + # Probe: verify _scaled_mm is functional before the timed loop. + # It requires PyTorch >= 2.1 + CUDA >= 12.0 + sm90 (Hopper). + if not hasattr(torch, "_scaled_mm"): + raise RuntimeError("torch._scaled_mm unavailable — upgrade to PyTorch >= 2.1") + try: + _probe = _fp8_mm(0) + torch.cuda.synchronize() + del _probe + except Exception as probe_err: + raise RuntimeError(f"FP8 _scaled_mm probe failed: {probe_err}") from probe_err + for i in range(warmup): + _fp8_mm(i % num_pools) torch.cuda.synchronize() start_event = torch.cuda.Event(enable_timing=True) end_event = torch.cuda.Event(enable_timing=True) start_event.record() - for _ in range(iterations): - c = _fp8_mm() + for i in range(iterations): + c = _fp8_mm(i % num_pools) end_event.record() + torch.cuda.synchronize() + elapsed_ms = start_event.elapsed_time(end_event) else: - for _ in range(warmup): - torch.matmul(a, b) + pools_a = [torch.randn(M, K, device="cuda", dtype=dtype_val) for _ in range(num_pools)] + pools_b = [torch.randn(K, N, device="cuda", dtype=dtype_val) for _ in range(num_pools)] + + indexed_a = [pools_a[i % num_pools] for i in range(compile_warmup + iterations)] + indexed_b = [pools_b[i % num_pools] for i in range(compile_warmup + iterations)] + + for i in range(compile_warmup): + mm_fn(indexed_a[i], indexed_b[i]) torch.cuda.synchronize() start_event = torch.cuda.Event(enable_timing=True) end_event = torch.cuda.Event(enable_timing=True) start_event.record() - for _ in range(iterations): - c = torch.matmul(a, b) + for i in range(compile_warmup, compile_warmup + iterations): + c = mm_fn(indexed_a[i], indexed_b[i]) end_event.record() - torch.cuda.synchronize() - - elapsed_ms = start_event.elapsed_time(end_event) + torch.cuda.synchronize() + elapsed_ms = start_event.elapsed_time(end_event) flops = 2 * M * N * K * iterations tflops = flops / (elapsed_ms / 1000) / 1e12 results_by_dtype[dtype_name] = round(tflops, 1) @@ -391,7 +448,7 @@ class Benchmark: if dtype_name == "tf32": torch.backends.cuda.matmul.allow_tf32 = old_tf32 - del a, b, c + del pools_a, pools_b, c torch.cuda.empty_cache() except Exception as e: diff --git a/modules/gpu_specs.py b/modules/gpu_specs.py index f746b84..db3ca30 100644 --- a/modules/gpu_specs.py +++ b/modules/gpu_specs.py @@ -6,11 +6,13 @@ import subprocess from typing import List, Optional # GPU name patterns -> internal key mapping +# Order matters: longer/more-specific patterns must come before shorter ones. GPU_NAME_PATTERNS = { "A100": "a100", "A800": "a800", "H100": "h100", "H200": "h200", + "H20": "h20", # H20 / H20-3e is the China-compliance export variant, REDUCED peaks "B200": "b200", "B300": "b300", } @@ -55,6 +57,29 @@ GPU_SPECS = { "min_driver_version": "535", "min_cuda_version": "12.1", }, + "h20": { + # China-compliance export variant of H200 (reported as "H20" / "H20-3e" by nvidia-smi). + # Same silicon family / HBM as H200, but Tensor Core peaks are throttled. + # Peaks below are sourced from supplier / NVIDIA China and confirmed against + # measured throughput on 8x H20-3e (FP16 ~741, BF16 ~770, FP8 ~1328 TFLOPS). + "full_name": "NVIDIA H20 / H20-3e", + "architecture": "Hopper", + "compute_capability": 9.0, + "hbm_capacity_gb": 141, + "hbm_type": "HBM3e", + "memory_bandwidth_gbps": 4800, + "fp32_tflops": 54, # China spec (matches measured ~51-52) + "tf32_tflops": 372, # ~75% of H200 (matches measured ~362) + "fp16_tflops": 744, # dense, China spec + "bf16_tflops": 739, # dense, China spec + "fp8_tflops": 1420, # dense, China spec + "tdp_watts": 700, + "nvlink_gen": 4, + "nvlink_bandwidth_gbps": 900, + "pcie_gen": 5, + "min_driver_version": "535", + "min_cuda_version": "12.1", + }, "b200": { "full_name": "NVIDIA B200 SXM", "architecture": "Blackwell", @@ -172,9 +197,10 @@ def detect_gpu_type() -> str: if r.returncode != 0: return "unknown" - first_line = r.stdout.strip().splitlines()[0].strip() - for pattern, key in GPU_NAME_PATTERNS.items(): - if pattern in first_line.upper(): + first_line = r.stdout.strip().splitlines()[0].strip().upper() + # Iterate longest-pattern-first so "H200" doesn't get matched by "H20". + for pattern, key in sorted(GPU_NAME_PATTERNS.items(), key=lambda kv: -len(kv[0])): + if pattern in first_line: return key return "unknown" except (subprocess.TimeoutExpired, FileNotFoundError, OSError): diff --git a/modules/nccl_test.py b/modules/nccl_test.py index ae10d60..7435577 100644 --- a/modules/nccl_test.py +++ b/modules/nccl_test.py @@ -23,6 +23,22 @@ except ImportError: pass +# Per-operation bandwidth thresholds, as a fraction of NVLink bidirectional BW. +# AllReduce uses ring algorithm and saturates ring BW; AllToAll requires full-mesh +# transfers and on 8-GPU NVSwitch typically runs 10-20% lower than AllReduce. +# Public H100/H200 8-GPU benchmarks show AllToAll bus BW in the 300-380 GB/s range +# vs AllReduce in 400-500 GB/s. Using a single 40% threshold for both produced +# false positives for AllToAll. +_OP_BW_FRACTIONS = { + "allreduce": 0.40, + "alltoall": 0.30, + "broadcast": 0.35, + "reducescatter": 0.38, + "allgather": 0.38, + "sendrecv": 0.35, +} + + class NCCLTest: def __init__(self, config: dict): @@ -80,12 +96,17 @@ class NCCLTest: tests.append(("sendrecv_perf", "SendRecv")) nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0) - if nvlink_bw > 0: - default_min_bw = nvlink_bw * 0.4 - else: - # Conservative floor: any working NVLink should exceed 10 GB/s - default_min_bw = 10 - min_bw = self.nccl_cfg.get("min_bandwidth_gbps") or round(default_min_bw) + # User-provided override applies uniformly across all ops; otherwise + # each op gets its own threshold from _OP_BW_FRACTIONS. + user_override = self.nccl_cfg.get("min_bandwidth_gbps") + + def threshold_for(label: str) -> float: + if user_override: + return float(user_override) + if nvlink_bw <= 0: + return 10.0 # conservative floor + frac = _OP_BW_FRACTIONS.get(label.lower(), 0.40) + return round(nvlink_bw * frac) if self.gpu_type == "unknown": self.console.print("[yellow]Unknown GPU — using conservative bandwidth thresholds[/yellow]") @@ -103,8 +124,9 @@ class NCCLTest: for binary, label in tests: progress.update(task, description=f"NCCL {label}...") + op_min_bw = threshold_for(label) result = self._run_one_nccl_test_direct( - binary, label, gpu_count, min_bw + binary, label, gpu_count, op_min_bw ) if result.get("status") not in ("SKIP", None) and "error" not in result: any_binary_worked = True @@ -114,7 +136,7 @@ class NCCLTest: mpirun = self._find_mpirun() if mpirun: result = self._run_one_nccl_test_mpirun( - binary, label, gpu_count, mpirun, min_bw + binary, label, gpu_count, mpirun, op_min_bw ) if result.get("status") not in ("SKIP", None) and "error" not in result: any_binary_worked = True @@ -134,7 +156,9 @@ class NCCLTest: return { "passed": all_passed, "source": "nccl-tests", - "min_bandwidth_gbps": min_bw, + "min_bandwidth_gbps": { + lbl.lower(): threshold_for(lbl) for _, lbl in tests + }, "tests": results, "gpu_count": gpu_count, "timestamp": datetime.now().isoformat(), diff --git a/modules/rdma_test.py b/modules/rdma_test.py index e1f54f5..497e9d5 100644 --- a/modules/rdma_test.py +++ b/modules/rdma_test.py @@ -37,15 +37,69 @@ class RDMATest: ports = sorted(os.listdir(ports_dir)) return ports + @staticmethod + def _read_sys(path: str) -> str: + try: + with open(path) as f: + return f.read().strip() + except (FileNotFoundError, PermissionError, OSError): + return "" + def run(self) -> dict: devices = self._get_ib_devices() if not devices: - self.console.print("[yellow]No InfiniBand devices found[/yellow]") - return {"error": "no_ib_devices", "passed": False} + self.console.print( + "[yellow]No InfiniBand devices found — skipping RDMA test[/yellow]" + ) + return { + "status": "SKIP", "skipped": True, + "reason": "no IB hardware detected", + "timestamp": datetime.now().isoformat(), + } + + # Only consider ports whose link_layer is InfiniBand — Ethernet + # bond/management interfaces (e.g. mlx5_bond_0) can show ACTIVE state + # without actually providing IB fabric connectivity. + ib_devices = [] + active_ib_port = False + for dev in devices: + for port in self._get_ib_ports(dev): + link_layer = self._read_sys( + f"/sys/class/infiniband/{dev}/ports/{port}/link_layer") + if link_layer != "InfiniBand": + continue + ib_devices.append((dev, port)) + state = self._read_sys( + f"/sys/class/infiniband/{dev}/ports/{port}/state") + if "ACTIVE" in state.upper(): + active_ib_port = True + + device_info = self._collect_device_info(devices) + if not ib_devices: + self.console.print( + "[yellow]No InfiniBand-link_layer ports present — " + "skipping RDMA benchmarks[/yellow]" + ) + return { + "status": "SKIP", "skipped": True, + "reason": "no InfiniBand link_layer ports (only Ethernet/RoCE)", + "devices": device_info, + "timestamp": datetime.now().isoformat(), + } + if not active_ib_port: + self.console.print( + f"[yellow]{len(ib_devices)} IB port(s) detected but all DOWN — " + f"fabric not wired, skipping RDMA benchmarks[/yellow]" + ) + return { + "status": "SKIP", "skipped": True, + "reason": f"{len(ib_devices)} IB port(s) found but all DOWN (fabric not wired)", + "devices": device_info, + "timestamp": datetime.now().isoformat(), + } self.console.print(f"[cyan]RDMA Test - Devices: {', '.join(devices)}[/cyan]") - device_info = self._collect_device_info(devices) bw_results = self._run_bandwidth_tests(devices) latency_results = self._run_latency_tests(devices) @@ -201,6 +255,10 @@ class RDMATest: @staticmethod def print_results(results: dict, console: Console = None): c = console or Console() + if results.get("skipped") or results.get("status") == "SKIP": + c.print(f"\n[bold yellow]RDMA/InfiniBand: SKIPPED[/bold yellow] " + f"[dim]({results.get('reason', 'no IB hardware')})[/dim]") + return if "error" in results: c.print(f"[bold red]Error: {results['error']}[/bold red]") return diff --git a/modules/report.py b/modules/report.py index 93ccbb8..c8248cd 100644 --- a/modules/report.py +++ b/modules/report.py @@ -274,8 +274,17 @@ class ReportGenerator: lines.append(f"| D2H (PCIe) | {d2h:.1f} GB/s | {d2h_peak:.0f} GB/s | {d2h_eff:.1f}% |") lines.append(f"| D2D (NVLink) | {d2d:.1f} GB/s | {d2d_peak:.0f} GB/s | {d2d_eff:.1f}% |") lines.append("") - verdict = "PASS" if d2d_eff >= 50 else ("WARN" if d2d_eff >= 30 else "FAIL") - lines.append(f"**Verdict: {verdict}** (D2D efficiency {d2d_eff:.1f}%)\n") + # PyTorch fallback can't accurately measure HBM peak (intra-GPU copy_() + # only reaches ~20% of HBM bandwidth). When fallback is used, report + # the number but mark as WARN with a note instead of evaluating as FAIL. + if mem_data.get("source") == "pytorch": + lines.append( + f"**Verdict: WARN** (D2D {d2d:.1f} GB/s via PyTorch fallback; " + "nvbandwidth unavailable — figure is indicative only, not a true HBM peak)\n" + ) + else: + verdict = "PASS" if d2d_eff >= 50 else ("WARN" if d2d_eff >= 30 else "FAIL") + lines.append(f"**Verdict: {verdict}** (D2D efficiency {d2d_eff:.1f}%)\n") # --- Compute Throughput --- comp_data = self._extract_compute_results(results) @@ -339,7 +348,10 @@ class ReportGenerator: # --- RDMA --- rdma = results.get("rdma") - if rdma and not rdma.get("error"): + if rdma and (rdma.get("skipped") or rdma.get("status") == "SKIP"): + lines.append("## RDMA/InfiniBand\n") + lines.append(f"**Overall: SKIP** [{rdma.get('reason', 'no IB hardware detected')}]\n") + elif rdma and not rdma.get("error"): lines.append("## RDMA/InfiniBand\n") bw_tests = rdma.get("bandwidth_tests", []) lat_tests = rdma.get("latency_tests", []) @@ -431,6 +443,10 @@ class ReportGenerator: if mem: if mem.get("error"): items.append(("Memory Bandwidth", f"ERROR: {mem['error']}")) + elif mem.get("source") == "pytorch": + # PyTorch fallback can't reach HBM peak — report as WARN, not FAIL. + d2d = mem.get("d2d_bandwidth_gbps") or 0 + items.append(("Memory Bandwidth", f"WARN ({d2d:.0f} GB/s via PyTorch fallback)")) else: eff = mem.get("efficiency_pct") or 0 verdict = "PASS" if eff >= 80 else ("WARN" if eff >= 50 else "FAIL") @@ -474,7 +490,9 @@ class ReportGenerator: # RDMA if "rdma" in results: r = results["rdma"] - if r.get("error"): + if r.get("skipped") or r.get("status") == "SKIP": + items.append(("RDMA", f"SKIP ({r.get('reason', 'no IB hardware')})")) + elif r.get("error"): items.append(("RDMA", f"ERROR: {r['error']}")) elif r.get("passed"): items.append(("RDMA", "PASS")) diff --git a/modules/stress_test.py b/modules/stress_test.py index 892f95a..8b69d1c 100644 --- a/modules/stress_test.py +++ b/modules/stress_test.py @@ -144,8 +144,14 @@ class StressTest: alloc_bytes = min(target_mem, int(free_mem * 0.95)) # matmul(A, A.T) needs 2x input memory (input + output) - side = int((alloc_bytes / 4 / 2) ** 0.5) # float32 = 4 bytes - + mem_side = int((alloc_bytes / 4 / 2) ** 0.5) + # Cap compute matrix so a single matmul completes in ~2s on H100/H200 + # (FP32 ≈ 67 TFLOPS → 2*4096³/67e12 ≈ 2s). Without this cap, a 141GB + # HBM yields side ≈ 131K → single matmul ~68s × 8 GPUs serial → loop + # overshoots a 60s duration request by 10×+. + MAX_COMPUTE_SIDE = 4096 + side = min(mem_side, MAX_COMPUTE_SIDE) + actual_mem_mb = side * side * 4 / 1024 / 1024 total_mem_mb = total_mem / 1024 / 1024 free_mem_mb = free_mem / 1024 / 1024 @@ -161,12 +167,16 @@ class StressTest: elapsed_check = 0 while time.time() - t0 < duration: + # Dispatch matmul on all GPUs in parallel — do NOT synchronize between + # GPUs, otherwise the 8 GPUs run serially and overshoot the duration. for i in range(gpu_count): with torch.cuda.device(i): tensors[i] = torch.matmul(tensors[i], tensors[i].T) + # Single sync per pass — waits for all 8 streams concurrently + for i in range(gpu_count): + with torch.cuda.device(i): torch.cuda.synchronize() - time.sleep(0.1) - + # Show progress every 10 seconds current_elapsed = time.time() - t0 if int(current_elapsed) != int(elapsed_check) and int(current_elapsed) % 10 == 0: