"""GPU benchmark module — nvbandwidth + PyTorch compute throughput.""" import json import os import shutil import subprocess import time from datetime import datetime from typing import Optional, List from rich.console import Console from rich.table import Table from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn from modules.gpu_specs import detect_gpu_type, get_gpu_specs, get_gpu_label, resolve_tools_dir TORCH_AVAILABLE = False try: import torch if torch.cuda.is_available(): TORCH_AVAILABLE = True except ImportError: pass class Benchmark: def __init__(self, config: dict): self.config = config self.console = Console() self.bench_cfg = config.get("benchmark", {}) self.tools_dir = resolve_tools_dir(config) cfg_gpu_type = config.get("gpu_type", "auto") self.gpu_type = cfg_gpu_type if cfg_gpu_type != "auto" else detect_gpu_type() self.specs = get_gpu_specs(self.gpu_type) self.gpu_label = get_gpu_label(self.gpu_type) def run(self) -> dict: results = {} results.update(self.run_memory_benchmark()) results.update(self.run_compute_benchmark()) return results def _find_nvbandwidth(self) -> Optional[str]: # 1. System PATH p = shutil.which("nvbandwidth") if p: return p # 2. tools_dir local = os.path.join(self.tools_dir, "nvbandwidth", "nvbandwidth") if os.path.isfile(local) and os.access(local, os.X_OK): return local # 3. Common DCGM / system locations extra_paths = [ "/usr/libexec/datacenter-gpu-manager-4/plugins/cuda12/nvbandwidth", "/usr/libexec/datacenter-gpu-manager/plugins/cuda12/nvbandwidth", "/usr/local/bin/nvbandwidth", "/opt/nvidia/nvbandwidth/nvbandwidth", ] for ep in extra_paths: if os.path.isfile(ep) and os.access(ep, os.X_OK): return ep return None def run_memory_benchmark(self) -> dict: nvbw = self._find_nvbandwidth() if nvbw: return self._run_nvbandwidth(nvbw) self.console.print("[yellow]nvbandwidth not found, falling back to PyTorch[/yellow]") return self._run_memory_pytorch() def _run_nvbandwidth(self, nvbw_path: str) -> dict: mem_cfg = self.bench_cfg.get("memory", {}) buffer_mb = mem_cfg.get("nvbandwidth_buffer_mb", 512) samples = mem_cfg.get("nvbandwidth_samples", 3) self.console.print(f"[cyan]Memory Benchmark via nvbandwidth ({nvbw_path})[/cyan]") results_by_test = {} # Testcases to run — keys used internally, try both old and new names testcases = [ ("h2d", ["host_to_device_memcpy_ce", "host_to_device_memcpy_read_ce"]), ("d2h", ["device_to_host_memcpy_ce", "device_to_host_memcpy_write_ce"]), ("d2d_write", ["device_to_device_memcpy_write_ce"]), ("d2d_read", ["device_to_device_memcpy_read_ce"]), ("d2d_bidir", ["device_to_device_bidirectional_memcpy_write_sm", "device_to_device_bidirectional_sm"]), ] # Discover available testcase names available_names: list[str] = [] try: list_r = subprocess.run( [nvbw_path, "-l"], capture_output=True, text=True, timeout=15, ) if list_r.returncode == 0: for line in list_r.stdout.splitlines(): line = line.strip() if line and ", " in line and line[0].isdigit(): parts = line.split(", ", 1) name = parts[1].rstrip(":").strip() if name: available_names.append(name) except (subprocess.TimeoutExpired, FileNotFoundError): pass with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TextColumn("{task.completed}/{task.total}"), TimeElapsedColumn(), console=self.console, ) as progress: task = progress.add_task("nvbandwidth tests...", total=len(testcases)) for key, name_candidates in testcases: # Pick the first available test name tc = None for candidate in name_candidates: if not available_names or candidate in available_names: tc = candidate break if tc is None: progress.advance(task) continue try: # --disableAffinity skips nvbandwidth's CPU affinity setup, which # calls nvmlDeviceGetHandleByUUID() — that lookup fails on hosts # whose fabricmanager build doesn't expose the UUID format nvml # expects (seen on H20-3e with custom 570.172.08-1 fabricmanager). cmd = [nvbw_path, "--disableAffinity", "-t", tc, "-b", str(buffer_mb), "-i", str(samples), "-j"] r = subprocess.run(cmd, capture_output=True, text=True, timeout=120) if r.returncode == 0 and r.stdout.strip(): avg_bw = self._parse_nvbandwidth_json(r.stdout) results_by_test[key] = round(avg_bw, 1) else: results_by_test[key] = 0 except (subprocess.TimeoutExpired, FileNotFoundError): results_by_test[key] = 0 progress.advance(task) d2d_bw = max( results_by_test.get("d2d_write", 0), results_by_test.get("d2d_read", 0), results_by_test.get("d2d_bidir", 0), ) h2d_bw = results_by_test.get("h2d", 0) d2h_bw = results_by_test.get("d2h", 0) # If every subtest returned 0 the nvbandwidth binary is broken on this host # (e.g. CUDA_ERROR_INVALID_CONTEXT, NVML mismatch). Fall back to PyTorch. if all(v == 0 for v in results_by_test.values()): self.console.print( "[yellow]nvbandwidth returned no usable data — " "falling back to PyTorch memory benchmark[/yellow]" ) return self._run_memory_pytorch() # D2D goes through NVLink — compare to NVLink per-direction bandwidth # (nvlink_bandwidth_gbps is bidirectional, so per-direction = /2) nvlink_bw = self.specs.get("nvlink_bandwidth_gbps", 0) d2d_peak = nvlink_bw / 2 if nvlink_bw else 0 d2d_efficiency = round((d2d_bw / d2d_peak) * 100, 1) if (d2d_bw and d2d_peak) else None # H2D/D2H goes through PCIe — estimate peak from PCIe gen pcie_gen = self.specs.get("pcie_gen", 0) pcie_peak = {3: 16, 4: 32, 5: 64, 6: 128}.get(pcie_gen, 32) if pcie_gen > 0 else 0 # GB/s x16 h2d_efficiency = round((h2d_bw / pcie_peak) * 100, 1) if (h2d_bw and pcie_peak) else None d2h_efficiency = round((d2h_bw / pcie_peak) * 100, 1) if (d2h_bw and pcie_peak) else None return { "memory": { "source": "nvbandwidth", "h2d_bandwidth_gbps": round(h2d_bw, 1), "d2h_bandwidth_gbps": round(d2h_bw, 1), "d2d_bandwidth_gbps": round(d2d_bw, 1), "h2d_peak_gbps": pcie_peak if pcie_peak else None, "d2h_peak_gbps": pcie_peak if pcie_peak else None, "d2d_peak_gbps": round(d2d_peak, 1) if d2d_peak else None, "h2d_efficiency_pct": h2d_efficiency, "d2h_efficiency_pct": d2h_efficiency, "d2d_efficiency_pct": d2d_efficiency, "peak_bandwidth_gbps": self.specs["memory_bandwidth_gbps"], "efficiency_pct": d2d_efficiency, "results_by_test": results_by_test, "per_gpu": [], } } @staticmethod def _parse_nvbandwidth_json(raw: str) -> float: """Parse nvbandwidth JSON output (supports v0.5+ and v0.8+ formats).""" try: data = json.loads(raw) except json.JSONDecodeError: return 0.0 # v0.8+ format: {"nvbandwidth": {"testcases": [{"bandwidth_matrix": [...], "sum": N}]}} if isinstance(data, dict) and "nvbandwidth" in data: testcases = data["nvbandwidth"].get("testcases", []) for tc in testcases: matrix = tc.get("bandwidth_matrix", []) values = [] for row in matrix: for cell in row: try: v = float(cell) except (ValueError, TypeError): continue # Exclude diagonal entries (intra-device, reported as 0 or # N/A) so they don't drag the off-diagonal average down. if v > 0: values.append(v) if values: return sum(values) / len(values) return 0.0 # v0.5 format: list of dicts with "results" array entries = data if isinstance(data, list) else [data] bw_values = [] for entry in entries: if isinstance(entry, dict): for row in entry.get("results", []): val = row.get("value", 0) if isinstance(val, (int, float)): bw_values.append(val) return sum(bw_values) / len(bw_values) if bw_values else 0.0 def _run_memory_pytorch(self) -> dict: mem_cfg = self.bench_cfg.get("memory", {}) test_sizes_mb = [1, 4, 16, 64, 256, 1024, 4096] iterations = mem_cfg.get("iterations", 10) if not TORCH_AVAILABLE: self.console.print("[yellow]PyTorch not available - skipping memory benchmark[/yellow]") return {"memory": {"error": "pytorch_not_available"}} gpu_count = torch.cuda.device_count() self.console.print(f"[cyan]Memory Benchmark (PyTorch fallback) - {gpu_count} GPU(s)[/cyan]") bandwidth_by_size = {} with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TextColumn("{task.completed}/{task.total}"), TimeElapsedColumn(), console=self.console, ) as progress: task = progress.add_task("Testing sizes...", total=len(test_sizes_mb)) for size_mb in test_sizes_mb: size_bytes = size_mb * 1024 * 1024 h2d_times, d2h_times, d2d_times = [], [], [] x_cpu = torch.randn(size_bytes // 4, dtype=torch.float32) for _ in range(iterations): t0 = time.perf_counter() x_gpu = x_cpu.cuda() torch.cuda.synchronize() h2d_times.append(time.perf_counter() - t0) t0 = time.perf_counter() x_gpu.cpu() torch.cuda.synchronize() d2h_times.append(time.perf_counter() - t0) x_gpu2 = torch.randn_like(x_gpu) t0 = time.perf_counter() x_gpu2.copy_(x_gpu) torch.cuda.synchronize() d2d_times.append(time.perf_counter() - t0) del x_gpu, x_gpu2 torch.cuda.empty_cache() def median(lst): s = sorted(lst) return s[len(s) // 2] def bw_gb(t, sz): return (sz / t) / 1e9 bandwidth_by_size[str(size_mb)] = { "h2d_gbps": round(bw_gb(median(h2d_times), size_bytes), 1), "d2h_gbps": round(bw_gb(median(d2h_times), size_bytes), 1), "d2d_gbps": round(bw_gb(median(d2d_times), size_bytes), 1), } progress.advance(task) best_d2d = max(v["d2d_gbps"] for v in bandwidth_by_size.values()) peak_bw = self.specs["memory_bandwidth_gbps"] efficiency = round((best_d2d / peak_bw) * 100, 1) if peak_bw else None return { "memory": { "source": "pytorch", "h2d_bandwidth_gbps": round(max(v["h2d_gbps"] for v in bandwidth_by_size.values()), 1), "d2h_bandwidth_gbps": round(max(v["d2h_gbps"] for v in bandwidth_by_size.values()), 1), "d2d_bandwidth_gbps": round(best_d2d, 1), "peak_bandwidth_gbps": self.specs["memory_bandwidth_gbps"], "efficiency_pct": efficiency, "test_sizes_mb": test_sizes_mb, "bandwidth_by_size": bandwidth_by_size, "per_gpu": [], } } def run_compute_benchmark(self, dtypes: Optional[List[str]] = None) -> dict: comp_cfg = self.bench_cfg.get("compute", {}) configured_dtypes = dtypes or comp_cfg.get("dtypes", ["fp32", "tf32", "fp16", "bf16", "fp8"]) # MAMF-style shape sweep (à la stas00's mamf-finder): a single fixed matmul # shape under-reports the achievable peak by ~7-12% and therefore can't meet # the MAMF-calibrated PASS thresholds in gpu_specs.compute_pass_thresholds_tflops. # So for each dtype we time several matmul shapes and keep the MAXIMUM TFLOPS # (the realistic peak). matrix_size is the fallback when sweep_sizes is empty. matrix_size = comp_cfg.get("matrix_size", 8192) sweep_sizes = comp_cfg.get("sweep_sizes") or [matrix_size] warmup = comp_cfg.get("warmup", 20) iterations = comp_cfg.get("iterations", 80) # Each sweep entry is either an int N (square N×N×N) or an [M, N, K] triple. # Non-square / K-heavy shapes (e.g. 2048×2048×13312) reach the true MAMF peak # on Hopper — square-only tops out ~5% lower — so the default set mixes both. def _to_shape(entry): if isinstance(entry, (list, tuple)): if len(entry) == 3: return tuple(int(x) for x in entry) if len(entry) == 1: n = int(entry[0]) return (n, n, n) raise ValueError(f"sweep size {entry!r} must be an int or [M, N, K]") n = int(entry) return (n, n, n) shapes = [_to_shape(e) for e in sweep_sizes] if not TORCH_AVAILABLE: self.console.print("[yellow]PyTorch not available - skipping compute benchmark[/yellow]") return {"compute": {"error": "pytorch_not_available"}} gpu_count = torch.cuda.device_count() self.console.print(f"[cyan]Compute Benchmark - {gpu_count} GPU(s)[/cyan]") if len(sweep_sizes) > 1: self.console.print( f"[cyan] MAMF shape sweep over {len(sweep_sizes)} sizes: " f"{', '.join(str(s) for s in sweep_sizes)}[/cyan]" ) dtype_map = { "fp32": (torch.float32, self.specs["fp32_tflops"]), "tf32": ("tf32", self.specs["tf32_tflops"]), "fp16": (torch.float16, self.specs["fp16_tflops"]), "bf16": (torch.bfloat16, self.specs["bf16_tflops"]), "fp8": (torch.float8_e4m3fn, self.specs["fp8_tflops"]), } results_by_dtype = {} best_shapes = {} per_gpu_results = [{"index": i} for i in range(gpu_count)] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TextColumn("{task.completed}/{task.total}"), TimeElapsedColumn(), console=self.console, ) as progress: task = progress.add_task("Testing dtypes...", total=len(configured_dtypes)) for dtype_name in configured_dtypes: if dtype_name not in dtype_map: progress.advance(task) continue # Skip FP8 if GPU architecture doesn't support it if dtype_name == "fp8" and self.specs.get("fp8_tflops", 0) == 0: arch = self.specs.get("architecture", "unknown") results_by_dtype["fp8"] = f"skipped ({arch} does not support FP8)" self.console.print(f"[dim] fp8: skipped - {arch} architecture has no FP8 support[/dim]") progress.advance(task) continue dtype_val, peak_tflops = dtype_map[dtype_name] # allow_tf32 only affects float32 matmuls: ON for the TF32 run, OFF for # the true-FP32 run so the two stay distinct. old_tf32 = torch.backends.cuda.matmul.allow_tf32 if dtype_name == "tf32": torch.backends.cuda.matmul.allow_tf32 = True dtype_val = torch.float32 elif dtype_name == "fp32": torch.backends.cuda.matmul.allow_tf32 = False best_tflops, best_shape, last_err = 0.0, None, None for (M, N, K) in shapes: try: t = self._bench_matmul_once(dtype_name, dtype_val, M, N, K, warmup, iterations) if t > best_tflops: best_tflops, best_shape = t, (M, N, K) except Exception as e: # noqa: BLE001 - record and try the next shape last_err = e torch.backends.cuda.matmul.allow_tf32 = old_tf32 if best_shape is None: results_by_dtype[dtype_name] = f"error: {last_err}" self.console.print(f"[yellow] {dtype_name}: {last_err}[/yellow]") else: shape_str = "x".join(str(d) for d in best_shape) results_by_dtype[dtype_name] = round(best_tflops, 1) best_shapes[dtype_name] = shape_str for pg in per_gpu_results: pg[dtype_name] = round(best_tflops, 1) if len(shapes) > 1: self.console.print( f"[dim] {dtype_name}: {best_tflops:.1f} TFLOPS @ {shape_str}[/dim]" ) progress.advance(task) efficiency = {} for dt, achieved in results_by_dtype.items(): if isinstance(achieved, (int, float)) and dt in dtype_map: peak_tp = dtype_map[dt][1] if peak_tp: efficiency[dt] = round((achieved / peak_tp) * 100, 1) return { "compute": { "per_dtype_tflops": results_by_dtype, "peak_tflops": {dt: dtype_map[dt][1] for dt in dtype_map}, "efficiency_pct": efficiency, # Absolute TFLOPS PASS thresholds (decoupled from peak). When present, # report.py judges PASS/WARN/FAIL against these directly instead of # using % of peak. Empty dict => fall back to legacy 80% rule. "pass_thresholds_tflops": dict( self.specs.get("compute_pass_thresholds_tflops") or {} ), "per_gpu": per_gpu_results, "sweep_sizes": list(sweep_sizes), "best_shapes": best_shapes, "matrix_size": matrix_size, "warmup": warmup, "iterations": iterations, } } def _bench_matmul_once(self, dtype_name: str, dtype_val, M: int, N: int, K: int, warmup: int, iterations: int) -> float: """Time one (M×K)·(K×N) matmul for a dtype and return achieved TFLOPS. Uses an L2-cache-busting pool of matrix pairs (total > 256 MB) so operands can't be served from L2 across iterations, and CUDA events for timing. FP8 goes through torch._scaled_mm (e4m3); all others through torch.matmul — eager cuBLAS, which on H100 beats torch.compile/Triton for plain GEMM and avoids the per-shape recompile cost that would make a sweep pathologically slow. """ elem_bytes = 1 if dtype_name == "fp8" else torch.tensor([], dtype=dtype_val).element_size() pair_bytes = (M * K + K * N) * elem_bytes num_pools = max(4, -(-256 * 1024 * 1024 // pair_bytes)) # ceil(256MB / pair) if dtype_name == "fp8": if not hasattr(torch, "_scaled_mm"): raise RuntimeError("torch._scaled_mm unavailable — upgrade to PyTorch >= 2.1") pools_a = [torch.randn(M, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)] pools_b = [torch.randn(N, K, device="cuda", dtype=torch.float32).to(torch.float8_e4m3fn) for _ in range(num_pools)] scale_a = torch.tensor(1.0, device="cuda") scale_b = torch.tensor(1.0, device="cuda") def op(i): return torch._scaled_mm(pools_a[i], pools_b[i].T, scale_a=scale_a, scale_b=scale_b, out_dtype=torch.bfloat16) else: pools_a = [torch.randn(M, K, device="cuda", dtype=dtype_val) for _ in range(num_pools)] pools_b = [torch.randn(K, N, device="cuda", dtype=dtype_val) for _ in range(num_pools)] def op(i): return torch.matmul(pools_a[i], pools_b[i]) try: # Probe once so a broken/unsupported kernel raises before the timed loop. _probe = op(0) torch.cuda.synchronize() del _probe for i in range(warmup): op(i % num_pools) torch.cuda.synchronize() start_event = torch.cuda.Event(enable_timing=True) end_event = torch.cuda.Event(enable_timing=True) start_event.record() for i in range(iterations): op(i % num_pools) end_event.record() torch.cuda.synchronize() elapsed_ms = start_event.elapsed_time(end_event) finally: del pools_a, pools_b torch.cuda.empty_cache() return (2 * M * N * K * iterations) / (elapsed_ms / 1000) / 1e12 @staticmethod def print_results(results: dict, console: Console = None): c = console or Console() if "memory" in results and "error" not in results["memory"]: mem = results["memory"] source = mem.get("source", "unknown") c.print(f"\n[bold cyan]Memory Bandwidth Results (via {source})[/bold cyan]") table = Table(box=None, padding=(0, 1)) table.add_column("Metric", style="bold") table.add_column("Value", justify="right") table.add_column("Peak", justify="right") table.add_column("Efficiency", justify="right") for label, achieved, peak_key, eff_key in [ ("H2D (PCIe)", mem["h2d_bandwidth_gbps"], "h2d_peak_gbps", "h2d_efficiency_pct"), ("D2H (PCIe)", mem["d2h_bandwidth_gbps"], "d2h_peak_gbps", "d2h_efficiency_pct"), ("D2D (NVLink)", mem["d2d_bandwidth_gbps"], "d2d_peak_gbps", "d2d_efficiency_pct"), ]: val_str = f"{achieved:.1f} GB/s" if isinstance(achieved, (int, float)) else "N/A" peak = mem.get(peak_key, 0) peak_str = f"{peak:.0f} GB/s" if peak else "N/A" eff = mem.get(eff_key, 0) if eff: ec = "green" if eff >= 80 else ("yellow" if eff >= 50 else "red") eff_str = f"[{ec}]{eff:.1f}%[/{ec}]" else: eff_str = "N/A" table.add_row(label, val_str, peak_str, eff_str) c.print(table) by_test = mem.get("results_by_test", {}) if by_test: c.print("\n [dim]nvbandwidth breakdown:[/dim]") for tc, bw in sorted(by_test.items()): c.print(f" {tc}: {bw} GB/s") by_size = mem.get("bandwidth_by_size", {}) if by_size: t2 = Table(title="Bandwidth by Transfer Size", box=None, padding=(0, 1)) t2.add_column("Size (MB)", style="bold", justify="right") t2.add_column("H2D (GB/s)", justify="right") t2.add_column("D2H (GB/s)", justify="right") t2.add_column("D2D (GB/s)", justify="right") for sz, vals in sorted(by_size.items(), key=lambda x: int(x[0])): peak = mem["peak_bandwidth_gbps"] if peak: d2d_eff = (vals["d2d_gbps"] / peak) * 100 ec = "green" if d2d_eff >= 80 else ("yellow" if d2d_eff >= 50 else "red") d2d_cell = f"[{ec}]{vals['d2d_gbps']:.1f}[/{ec}]" else: d2d_cell = f"{vals['d2d_gbps']:.1f}" t2.add_row(sz, f"{vals['h2d_gbps']:.1f}", f"{vals['d2h_gbps']:.1f}", d2d_cell) c.print(t2) if "compute" in results and "error" not in results["compute"]: comp = results["compute"] c.print(f"\n[bold cyan]Compute Throughput Results[/bold cyan]") table = Table(box=None, padding=(0, 1)) table.add_column("DType", style="bold") table.add_column("Achieved (TFLOPS)", justify="right") table.add_column("Peak", justify="right") table.add_column("Efficiency", justify="right") peak = comp.get("peak_tflops", {}) per_dtype = comp.get("per_dtype_tflops", {}) eff = comp.get("efficiency_pct", {}) for dt in per_dtype: achieved = per_dtype[dt] if isinstance(achieved, str): table.add_row(dt, f"[red]{achieved}[/red]", str(peak.get(dt, "N/A")), "N/A") continue pk = peak.get(dt, 0) ef = eff.get(dt, 0) ec = "green" if ef >= 80 else ("yellow" if ef >= 50 else "red") table.add_row(dt.upper(), f"{achieved:.1f}", f"{pk:.0f}", f"[{ec}]{ef:.1f}%[/{ec}]") c.print(table) @staticmethod def judge_compute(results: dict) -> dict: """Judge compute results against pass_thresholds_tflops. Single source of truth for the PASS/WARN/FAIL rule (same one report.py uses): achieved >= thr -> PASS; >= 0.9*thr -> WARN; else FAIL. A string achieved value (skipped/error) -> SKIP. A dtype without a threshold falls back to efficiency (>=80 PASS / >=50 WARN / else FAIL). Returns {"rows": [(dtype, achieved, threshold, status), ...], "verdict": str}. """ comp = results.get("compute", results) per_dtype = comp.get("per_dtype_tflops", {}) thresholds = comp.get("pass_thresholds_tflops", {}) or {} eff = comp.get("efficiency_pct", {}) rank = {"PASS": 0, "WARN": 1, "FAIL": 2, "SKIP": 0} rows, verdict = [], "PASS" for dt, val in per_dtype.items(): thr = thresholds.get(dt) if isinstance(val, str): status = "SKIP" elif thr: status = "PASS" if val >= thr else ("WARN" if val >= thr * 0.9 else "FAIL") else: e = eff.get(dt, 0) status = "PASS" if e >= 80 else ("WARN" if e >= 50 else "FAIL") rows.append((dt, val, thr, status)) if rank[status] > rank[verdict]: verdict = status return {"rows": rows, "verdict": verdict} @staticmethod def print_compute_verdict(results: dict, console: Console = None) -> str: """Print the PASS/WARN/FAIL table for compute results; return the verdict.""" c = console or Console() judged = Benchmark.judge_compute(results) color = {"PASS": "green", "WARN": "yellow", "FAIL": "red", "SKIP": "dim"} c.print("\n[bold cyan]Compute Verdict (vs thresholds)[/bold cyan]") for dt, val, thr, status in judged["rows"]: val_s = f"{val:.1f}" if isinstance(val, (int, float)) else str(val) thr_s = f">= {thr}" if thr else "(efficiency)" c.print(f" {dt.upper():>4}: {val_s:>8} {thr_s:<12} [{color[status]}]{status}[/{color[status]}]") v = judged["verdict"] c.print(f" [bold]VERDICT: [{color[v]}]{v}[/{color[v]}][/bold]") return v def _run_cli() -> None: """`python -m modules.benchmark` — run ONLY the compute-throughput benchmark.""" import argparse from pathlib import Path import yaml repo_root = Path(__file__).resolve().parent.parent parser = argparse.ArgumentParser(description="Run the compute-throughput benchmark only.") parser.add_argument("--config", default=str(repo_root / "configs" / "default.yaml"), help="path to config YAML (default: configs/default.yaml)") parser.add_argument("--json", action="store_true", help="also print raw JSON of the compute results") args = parser.parse_args() with open(args.config) as f: config = yaml.safe_load(f) or {} results = Benchmark(config).run_compute_benchmark() Benchmark.print_results(results) Benchmark.print_compute_verdict(results) if args.json: print("JSON_RESULT:" + json.dumps(results["compute"])) if __name__ == "__main__": _run_cli()