"""Report generation module - export test results to JSON/HTML/Markdown.""" import json import os from datetime import datetime from pathlib import Path from typing import Optional try: from importlib.metadata import version as _pkg_version __version__ = _pkg_version("gpu-server-test-suite") except Exception: __version__ = "0.2.0" from rich.console import Console from rich.panel import Panel HTML_TEMPLATE = """ GPU Test Report - {timestamp}

GPU Training Server Test Report

Generated: {timestamp} | Server: {hostname}
{content} """ class ReportGenerator: def __init__(self, config: dict): self.config = config self.console = Console() self.report_cfg = config.get("report", {}) def generate(self, results: dict, fmt: str = None, output: str = None) -> str: fmt = fmt or self.report_cfg.get("format", "json") output_dir = self.report_cfg.get("output_dir", "./reports") os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if not output: output = os.path.join(output_dir, f"gpu_report_{timestamp}.{fmt}") if fmt == "json": return self._generate_json(results, output) elif fmt == "html": return self._generate_html(results, output) elif fmt == "md": return self._generate_markdown(results, output) else: self.console.print(f"[red]Unsupported format: {fmt}[/red]") return "" def _generate_json(self, results: dict, output: str) -> str: with open(output, "w") as f: json.dump(results, f, indent=2, default=str) self.console.print(f"[green]JSON report saved to: {output}[/green]") return output def _generate_html(self, results: dict, output: str) -> str: import socket hostname = results.get("hostname") or socket.gethostname() timestamp = results.get("timestamp") or datetime.now().strftime("%Y-%m-%d %H:%M:%S") sections = [] if "gpu_info" in results: gpus = results["gpu_info"].get("gpus", []) rows = "" for g in gpus: rows += f"GPU {g['index']}{g['name']}" rows += f"{g['vram_total_mb']} MB" rows += f"{g['temperature']}°C" rows += f"{g['clock_sm']} MHz" sections.append( f'

GPU Information

' f'

Driver: {results["gpu_info"].get("driver_version", "N/A")} | ' f'CUDA: {results["gpu_info"].get("cuda_version", "N/A")} | ' f'Count: {len(gpus)}

' f'' f'{rows}
GPUModelVRAMTempSM Clock
' ) if "health" in results: h = results["health"] passed = h.get("passed", False) cls = "pass" if passed else "fail" txt = "ALL PASSED" if passed else "SOME CHECKS FAILED" sections.append(f'
{txt}
') if "benchmark" in results and "memory" in results["benchmark"]: mem = results["benchmark"]["memory"] sections.append( f'

Memory Bandwidth

' f'
{mem.get("d2d_bandwidth_gbps", "N/A")} GB/s
' f'
D2D (HBM)
' f'
{mem.get("efficiency_pct", "N/A")}%
' f'
Efficiency vs Peak ({mem.get("peak_bandwidth_gbps", "N/A")} GB/s)
' f'
' ) if "benchmark" in results and "compute" in results["benchmark"]: comp = results["benchmark"]["compute"] dtype_rows = "" per_dtype = comp.get("per_dtype_tflops", {}) eff = comp.get("efficiency_pct", {}) for dt, tflops in per_dtype.items(): ef = eff.get(dt, 0) cls = "pass" if ef >= 80 else ("warn" if ef >= 50 else "fail") if isinstance(tflops, (int, float)): dtype_rows += f'{dt.upper()}{tflops:.1f} TFLOPS' dtype_rows += f'{ef:.1f}%' if dtype_rows: sections.append( f'

Compute Throughput

' f'' f'{dtype_rows}
DTypeAchievedEfficiency
' ) if "training" in results: t = results["training"] sections.append( f'

Training Simulation

' f'
{t.get("throughput_tokens_per_sec", "N/A")}
' f'
Tokens/sec
' f'
{t.get("avg_step_time_ms", "N/A")} ms
' f'
Avg Step Time
' f'
{t.get("peak_memory_gb", "N/A")} GB
' f'
Peak Memory
' f'
' ) content = "\n".join(sections) html = HTML_TEMPLATE.format(timestamp=timestamp, hostname=hostname, content=content) with open(output, "w") as f: f.write(html) self.console.print(f"[green]HTML report saved to: {output}[/green]") return output # ------------------------------------------------------------------ # Markdown report # ------------------------------------------------------------------ def _generate_markdown(self, results: dict, output: str) -> str: import socket hostname = results.get("hostname") or socket.gethostname() timestamp = results.get("timestamp") or datetime.now().strftime("%Y-%m-%d %H:%M:%S") lines: list[str] = [] # --- Header --- lines.append("# GPU Test Report\n") lines.append(f"- **Date:** {timestamp}") lines.append(f"- **Host:** {hostname}") # Extract GPU info for header gpu_info = results.get("gpu_info") if gpu_info and not gpu_info.get("error"): gpus = gpu_info.get("gpus", []) gpu_name = gpus[0]["name"] if gpus else "Unknown" lines.append(f"- **GPU:** {gpu_name} x{gpu_info.get('gpu_count', len(gpus))}") lines.append(f"- **Driver:** {gpu_info.get('driver_version', 'N/A')} | " f"**CUDA:** {gpu_info.get('cuda_version', 'N/A')}") lines.append("") # --- Summary table --- summary_items = self._build_summary(results) if summary_items: verdict, failures, missing = self._overall_acceptance_verdict(summary_items) lines.append("## Overall Acceptance Verdict\n") lines.append(f"**Result: {verdict}**") lines.append("") if failures: lines.append("Failed or unverified items:") for name, status in failures: lines.append(f"- {name}: {status}") lines.append("") if missing: lines.append("Missing required evidence:") for name in missing: lines.append(f"- {name}") lines.append("") lines.append("## Summary\n") lines.append("| Test | Result |") lines.append("|------|--------|") for name, verdict in summary_items: lines.append(f"| {name} | {verdict} |") lines.append("") # --- GPU Information --- if gpu_info and not gpu_info.get("error"): lines.append("## GPU Information\n") gpus = gpu_info.get("gpus", []) lines.append("| GPU | Model | VRAM | Temp | Power | SM Clock |") lines.append("|-----|-------|------|------|-------|----------|") for g in gpus: vram = f"{g.get('vram_total_mb', 0)} MB" temp = f"{g.get('temperature', 'N/A')}C" power = f"{g.get('power_draw', 0):.0f}/{g.get('power_limit', 0):.0f}W" clock = f"{g.get('clock_sm', 0)} MHz" lines.append(f"| {g['index']} | {g['name']} | {vram} | {temp} | {power} | {clock} |") lines.append("") # --- Health Check --- health = results.get("health") if health and not health.get("error"): lines.append("## Health Check\n") passed = health.get("passed", False) lines.append(f"**Overall: {'PASS' if passed else 'FAIL'}**\n") gpu_health = health.get("gpu_health", []) if gpu_health: lines.append("| GPU | Temp | Power | ECC | PCIe | Throttle | Status |") lines.append("|-----|------|-------|-----|------|----------|--------|") for gh in gpu_health: checks = gh.get("checks", {}) temp_c = checks.get("temperature", {}) pwr = checks.get("power", {}) ecc = checks.get("ecc_errors", {}) pcie = checks.get("pcie_link", {}) throttle = checks.get("throttling", {}) temp_str = f"{temp_c.get('value', '?')}C {temp_c.get('status', '')}" pwr_str = f"{pwr.get('value', 0):.0f}W {pwr.get('status', '')}" ecc_str = f"S:{ecc.get('single', 0)} D:{ecc.get('double', 0)}" pcie_str = f"Gen{pcie.get('gen', '?')}x{pcie.get('width', '?')}" throt_str = throttle.get("status", "?") status = gh.get("status", "?") lines.append(f"| {gh['index']} | {temp_str} | {pwr_str} | " f"{ecc_str} | {pcie_str} | {throt_str} | **{status}** |") lines.append("") # --- Memory Bandwidth --- mem_data = self._extract_memory_results(results) if mem_data and not mem_data.get("error"): lines.append("## Memory Bandwidth\n") lines.append(f"Source: {mem_data.get('source', 'unknown')}\n") lines.append("| Metric | Value | Peak | Efficiency |") lines.append("|--------|-------|------|------------|") d2d = mem_data.get("d2d_bandwidth_gbps") or 0 h2d = mem_data.get("h2d_bandwidth_gbps") or 0 d2h = mem_data.get("d2h_bandwidth_gbps") or 0 # New format with per-metric peaks h2d_peak = mem_data.get("h2d_peak_gbps") or 0 d2h_peak = mem_data.get("d2h_peak_gbps") or 0 d2d_peak = mem_data.get("d2d_peak_gbps") or 0 h2d_eff = mem_data.get("h2d_efficiency_pct") or 0 d2h_eff = mem_data.get("d2h_efficiency_pct") or 0 d2d_eff = mem_data.get("d2d_efficiency_pct") or 0 # Fallback for old format if not d2d_peak: d2d_peak = mem_data.get("peak_bandwidth_gbps") or 0 d2d_eff = mem_data.get("efficiency_pct") or 0 lines.append(f"| H2D (PCIe) | {h2d:.1f} GB/s | {h2d_peak:.0f} GB/s | {h2d_eff:.1f}% |") lines.append(f"| D2H (PCIe) | {d2h:.1f} GB/s | {d2h_peak:.0f} GB/s | {d2h_eff:.1f}% |") lines.append(f"| D2D (NVLink) | {d2d:.1f} GB/s | {d2d_peak:.0f} GB/s | {d2d_eff:.1f}% |") lines.append("") # PyTorch fallback can't accurately measure HBM peak (intra-GPU copy_() # only reaches ~20% of HBM bandwidth). When fallback is used, report # the number but mark as WARN with a note instead of evaluating as FAIL. if mem_data.get("source") == "pytorch": lines.append( f"**Verdict: WARN** (D2D {d2d:.1f} GB/s via PyTorch fallback; " "nvbandwidth unavailable — figure is indicative only, not a true HBM peak)\n" ) else: # Tightened to match production acceptance: PASS >= 80%, WARN 60–80%, FAIL < 60%. verdict = "PASS" if d2d_eff >= 80 else ("WARN" if d2d_eff >= 60 else "FAIL") lines.append(f"**Verdict: {verdict}** (D2D efficiency {d2d_eff:.1f}%)\n") # --- Compute Throughput --- comp_data = self._extract_compute_results(results) if comp_data and not comp_data.get("error"): lines.append("## Compute Throughput\n") per_dtype = comp_data.get("per_dtype_tflops", {}) peak_tflops = comp_data.get("peak_tflops", {}) eff_pct = comp_data.get("efficiency_pct", {}) # Absolute PASS thresholds (TFLOPS) from gpu_specs.compute_pass_thresholds_tflops. # When present, override the legacy 80%-of-peak rule on a per-dtype basis. pass_thresholds = comp_data.get("pass_thresholds_tflops", {}) or {} use_abs = bool(pass_thresholds) if use_abs: lines.append("| DType | Achieved (TFLOPS) | Peak | Threshold | Status |") else: lines.append("| DType | Achieved (TFLOPS) | Peak | Efficiency | Status |") lines.append("|-------|-------------------|------|------------|--------|") worst_eff = 100.0 overall_status = "PASS" rank = {"PASS": 0, "WARN": 1, "FAIL": 2, "SKIP": 0} for dt, val in per_dtype.items(): if isinstance(val, str): # skipped or error lines.append(f"| {dt.upper()} | {val} | - | N/A | SKIP |") else: pk = peak_tflops.get(dt, 0) ef = eff_pct.get(dt, 0) if isinstance(ef, (int, float)) and ef > 0: worst_eff = min(worst_eff, ef) thr = pass_thresholds.get(dt) if use_abs and thr: if val >= thr: status = "PASS" else: status = "FAIL" lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | >= {thr} | {status} |") else: status = "PASS" if ef >= 80 else ("WARN" if ef >= 50 else "FAIL") lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | {ef:.1f}% | {status} |") if rank.get(status, 0) > rank.get(overall_status, 0): overall_status = status lines.append("") if use_abs: if any(not row.get("passed", False) for row in (comp_data.get("consistency", {}) or {}).values()): overall_status = "FAIL" lines.append(f"**Verdict: {overall_status}** (absolute TFLOPS thresholds; worst efficiency {worst_eff:.1f}%)\n") else: overall_status = "PASS" if worst_eff >= 80 else ("WARN" if worst_eff >= 50 else "FAIL") lines.append(f"**Verdict: {overall_status}** (worst efficiency {worst_eff:.1f}%)\n") consistency = comp_data.get("consistency", {}) or {} if consistency: lines.append("### Compute Consistency\n") lines.append("| DType | Min | Mean | Max | Spread | Limit | Status |") lines.append("|-------|-----|------|-----|--------|-------|--------|") for dt, row in consistency.items(): status = "PASS" if row.get("passed") else "FAIL" lines.append( f"| {dt.upper()} | {row.get('min_tflops', 0):.1f} | " f"{row.get('mean_tflops', 0):.1f} | {row.get('max_tflops', 0):.1f} | " f"{row.get('spread_pct', 0):.2f}% | <= {row.get('max_allowed_pct', 3)}% | {status} |" ) lines.append("") per_gpu = comp_data.get("per_gpu", []) or [] dtype_order = [dt for dt in per_dtype.keys() if not isinstance(per_dtype.get(dt), str)] if per_gpu and dtype_order: lines.append("### Compute Per-GPU TFLOPS\n") headers = ["GPU", *[dt.upper() for dt in dtype_order]] lines.append("| " + " | ".join(headers) + " |") lines.append("|" + "|".join(["---"] * len(headers)) + "|") for row in per_gpu: cells = [str(row.get("index", ""))] for dt in dtype_order: val = row.get(dt, "") cells.append(f"{val:.1f}" if isinstance(val, (int, float)) else str(val)) lines.append("| " + " | ".join(cells) + " |") lines.append("") # --- NCCL --- nvlink = results.get("nvlink") if nvlink and not nvlink.get("error"): lines.append("## NVLink/NVSwitch\n") lines.append(f"**Overall: {'PASS' if nvlink.get('passed') else 'FAIL'}**\n") lines.append("| GPU | Active Links | Issues |") lines.append("|-----|--------------|--------|") for g in nvlink.get("gpus", []): issues = [] if g.get("inactive_links"): issues.append("inactive=" + ",".join(g["inactive_links"])) if g.get("speed_issues"): issues.append(f"speed issues={len(g['speed_issues'])}") if g.get("error_issues"): issues.append(f"errors={len(g['error_issues'])}") lines.append(f"| {g.get('gpu')} | {g.get('active_links')}/{g.get('expected_links')} | {', '.join(issues) or 'OK'} |") lines.append("") elif nvlink and nvlink.get("error"): lines.append("## NVLink/NVSwitch\n") lines.append(f"**Overall: FAIL** ({nvlink.get('error')})\n") dcgm = results.get("dcgm") if dcgm and not dcgm.get("error"): lines.append("## DCGM Diagnostic\n") lines.append(f"**Overall: {'PASS' if dcgm.get('passed') else 'FAIL'}**\n") if dcgm.get("subtests"): lines.append("| Subtest | Status |") lines.append("|---------|--------|") for s in dcgm.get("subtests", []): lines.append(f"| {s.get('name', '')} | {s.get('status', '')} |") lines.append("") elif dcgm and dcgm.get("error"): lines.append("## DCGM Diagnostic\n") lines.append(f"**Overall: FAIL** ({dcgm.get('error')})\n") # --- NCCL --- nccl = results.get("nccl") if nccl and not nccl.get("error"): lines.append("## NCCL Multi-GPU\n") lines.append(f"Source: {nccl.get('source', 'unknown')} | " f"GPUs: {nccl.get('gpu_count', '?')}\n") if nccl.get("source") == "torchrun_fallback": lines.append("> Functional NCCL smoke only: nccl-tests bus bandwidth was not measured, so this does not satisfy production acceptance.\n") tests = nccl.get("tests", {}) if tests: lines.append("> Summary reports the best Bus BW observed for each operation. PASS/FAIL is evaluated across every tested message size and repeat run shown in the detail table below.\n") lines.append("| Operation | Best Bus BW (GB/s) | Failed Sizes | Threshold | Status |") lines.append("|-----------|--------------------|--------------|-----------|--------|") for op, data in tests.items(): if isinstance(data, dict) and not data.get("error"): bw = data.get("best_busbw_gbps", 0) req = data.get("min_required_gbps", 0) status = data.get("status", "?") failed_sizes = [ str(row.get("size", "?")) for row in data.get("by_size", []) if row.get("status") != "PASS" ] failed_sizes_text = ", ".join(failed_sizes) if failed_sizes else "-" lines.append(f"| {op} | {bw:.1f} | {failed_sizes_text} | >= {_format_gbps(req)} | {status} |") elif isinstance(data, dict) and data.get("error"): lines.append(f"| {op} | - | - | - | ERROR: {data['error']} |") lines.append("") for op, data in tests.items(): by_size = data.get("by_size", []) if isinstance(data, dict) else [] if not by_size: continue lines.append(f"### NCCL {op} by size\n") lines.append("| Size | Runs Bus BW (GB/s) | Worst | Mean | StdDev | Threshold | Status |") lines.append("|------|---------------------|-------|------|--------|-----------|--------|") for row in by_size: runs = ", ".join(str(v) for v in row.get("runs_busbw_gbps", [])) lines.append( f"| {row.get('size', '')} | {runs} | " f"{row.get('worst_busbw_gbps', 0):.1f} | " f"{row.get('mean_busbw_gbps', 0):.1f} | " f"{row.get('stddev_pct', 0):.2f}% | " f">= {_format_gbps(data.get('min_required_gbps', 0))} | " f"{row.get('status', '?')} |" ) lines.append("") passed = nccl.get("passed", False) lines.append(f"**Overall: {'PASS' if passed else 'FAIL'}**\n") multinode = results.get("multinode_nccl") if multinode and not multinode.get("error"): lines.append("## Multi-node NCCL / Cross Leaf\n") lines.append(f"Source: {multinode.get('source', 'unknown')} | Mode: {multinode.get('mode', 'unknown')}\n") hosts = multinode.get("hosts", []) if hosts: host_text = ", ".join(f"{h.get('name') or h.get('addr')}({h.get('addr')})" for h in hosts) lines.append(f"- **Hosts:** {host_text}") preflight = multinode.get("preflight", {}) if preflight.get("checks"): failed_checks = [c for c in preflight["checks"] if c.get("status") == "FAIL"] warn_checks = [c for c in preflight["checks"] if c.get("status") == "WARN"] lines.append(f"- **Preflight:** {'PASS' if not failed_checks else 'FAIL'}" f"{f' ({len(warn_checks)} warnings)' if warn_checks else ''}") lines.append("") for op, data in (multinode.get("tests") or {}).items(): lines.append(f"### Multi-node NCCL {op}\n") lines.append("| Topology | CUDA Visible Devices | Peak Bus BW | Peak Size | Avg Bus BW | Threshold | Status |") lines.append("|----------|----------------------|-------------|-----------|------------|-----------|--------|") for topo in data.get("topologies", []): threshold = topo.get("min_required_gbps", 0) or 0 threshold_text = f">= {_format_gbps(threshold)} GB/s" if threshold else "-" cuda_visible = topo.get("cuda_visible_devices") or "-" lines.append( f"| {topo.get('label', '')} | {cuda_visible} | {topo.get('peak_busbw_gbps', 0):.2f} GB/s | " f"{topo.get('peak_size', '')} | {topo.get('avg_busbw_gbps', 0):.2f} GB/s | " f"{threshold_text} | {topo.get('status', '?')} |" ) lines.append("") diag_rows = [] for topo in data.get("topologies", []): net = topo.get("network") or {} if net: diag_rows.append((topo, net)) if diag_rows: lines.append("| Topology | NCCL Network | GPU Direct RDMA | GDR Enabled HCAs | GDR Disabled HCAs |") lines.append("|----------|--------------|-----------------|------------------|-------------------|") for topo, net in diag_rows: networks = ", ".join(net.get("networks") or []) or "unknown" gdr = net.get("gpu_direct_rdma", "UNKNOWN") enabled = ", ".join(net.get("gdr_enabled_hcas") or []) or "-" disabled = ", ".join(net.get("gdr_disabled_hcas") or []) or "-" lines.append(f"| {topo.get('label', '')} | {networks} | {gdr} | {enabled} | {disabled} |") lines.append("") failed_topos = [topo for topo in data.get("topologies", []) if topo.get("status") == "FAIL"] if failed_topos: lines.append("| Topology | Return Code | Error / Output Tail |") lines.append("|----------|-------------|---------------------|") for topo in failed_topos: tail = topo.get("error") or topo.get("stderr_tail") or topo.get("stdout_tail") or "" tail = str(tail).replace("\n", " ").replace("|", "\\|")[-240:] lines.append(f"| {topo.get('label', '')} | {topo.get('returncode', '')} | {tail} |") lines.append("") lines.append(f"**Overall: {'PASS' if multinode.get('passed') else 'FAIL'}**\n") elif multinode and multinode.get("error"): lines.append("## Multi-node NCCL / Cross Leaf\n") lines.append(f"**Overall: FAIL** ({multinode.get('error')})\n") preflight = multinode.get("preflight", {}) if preflight.get("checks"): lines.append("| Check | Status | Detail |") lines.append("|-------|--------|--------|") for check in preflight["checks"]: detail = str(check.get("detail", "")).replace("\n", " ") lines.append(f"| {check.get('name', '')} | {check.get('status', '')} | {detail} |") lines.append("") # --- Stress Test --- stress = results.get("stress") if stress and not stress.get("error"): lines.append("## Stress Test\n") passed = stress.get("passed", False) duration = stress.get("duration_sec") or 0 elapsed = stress.get("elapsed_sec") or 0 source = stress.get("source", "unknown") lines.append(f"- **Source:** {source}") lines.append(f"- **Duration:** {elapsed:.0f}s (requested {duration}s)") telemetry = stress.get("telemetry") or {} if telemetry: lines.append(f"- **Telemetry samples:** {telemetry.get('samples', 0)}") lines.append(f"- **Max temp:** {telemetry.get('max_temp_c', {})}") lines.append(f"- **Avg power:** {telemetry.get('avg_power_w', {})}") lines.append(f"- **Temp delta:** {telemetry.get('temp_delta_c', 'N/A')} C") lines.append(f"- **TFLOPS jitter:** {telemetry.get('tflops_jitter_pct', 'N/A')}%") lines.append(f"- **Steady TFLOPS samples:** {telemetry.get('steady_tflops_samples', 0)}") lines.append(f"- **Throttle events:** {telemetry.get('throttle_event_count', len(telemetry.get('throttle_events', [])))}") lines.append(f"- **XID events:** {len(telemetry.get('xid_events', []))}") failures = telemetry.get("failures") or [] if failures: lines.append("- **Failure reasons:**") for reason in failures: lines.append(f" - {reason}") lines.append(f"- **Result: {'PASS' if passed else 'FAIL'}**") lines.append("") # --- RDMA --- rdma = results.get("rdma") if rdma and (rdma.get("skipped") or rdma.get("status") == "SKIP"): lines.append("## RDMA/InfiniBand\n") lines.append(f"**Overall: SKIP** [{rdma.get('reason', 'no IB hardware detected')}]\n") elif rdma and not rdma.get("error"): lines.append("## RDMA/InfiniBand\n") rdma_legacy_note = self._rdma_legacy_note(rdma) if rdma_legacy_note: lines.append(f"> {rdma_legacy_note}\n") port_checks = rdma.get("port_checks", []) if port_checks: lines.append("### RDMA Port Checks\n") lines.append("| Device | Port | State | Rate | Required | Status |") lines.append("|--------|------|-------|------|----------|--------|") for p in port_checks: lines.append( f"| {p.get('device', '')} | {p.get('port', '')} | " f"{p.get('state', '')} | {p.get('rate', '')} | " f">= {p.get('min_rate_gbps', 400):.0f}Gbps ACTIVE | {p.get('status', '?')} |" ) lines.append("") bw_tests = rdma.get("bandwidth_tests", []) lat_tests = rdma.get("latency_tests", []) ibping_tests = rdma.get("ibping_tests", []) if bw_tests or lat_tests or ibping_tests: lines.append("| Test | Value | Threshold | Status |") lines.append("|------|-------|-----------|--------|") for bt in bw_tests: if bt.get("error"): lines.append(f"| {bt.get('test', 'ib_bw')} | {bt.get('error')} | required runnable test | {bt.get('status', 'FAIL')} |") else: threshold, status = self._rdma_bandwidth_verdict(bt) lines.append(f"| {bt['test']} | {bt.get('bandwidth_gbps', 0):.1f} GB/s | " f">= {threshold:g} GB/s | {status} |") for lt in lat_tests: if lt.get("error"): lines.append(f"| {lt.get('test', 'ib_lat')} | {lt.get('error')} | required runnable test | {lt.get('status', 'FAIL')} |") else: threshold, status = self._rdma_latency_verdict(lt) lines.append(f"| {lt['test']} | {lt.get('latency_us', 0):.2f} us | " f"<= {threshold:g} us | {status} |") for it in ibping_tests: direction = it.get("direction") or it.get("role", "N/A") if it.get("error"): lines.append(f"| {it.get('test', 'ibping')} | {it.get('error')} | bidirectional peer evidence | {it.get('status', 'FAIL')} |") else: lines.append(f"| {it['test']} | {direction} target={it.get('target', 'N/A')} count={it.get('count', 'N/A')} | " f"0% packet loss | {it.get('status', '?')} |") lines.append("") fabric = rdma.get("fabric_counters") or {} if fabric: counters = fabric.get("counters", {}) lines.append(f"- **PFC/ECN/CNP/congestion counters checked:** {len(counters)}") lines.append(f"- **PFC/ECN/CNP/congestion non-zero:** {'yes' if fabric.get('failed') else 'no'}") if not counters: lines.append("- **PFC/ECN/CNP/congestion evidence:** missing") failures = rdma.get("failures") or [] if not failures: failures = self._rdma_failure_reasons(rdma) if failures: lines.append("- **Failure reasons:**") for reason in failures: lines.append(f" - {reason}") passed = rdma.get("passed", False) lines.append(f"**Overall: {'PASS' if passed else 'FAIL'}**\n") # --- Training --- training = results.get("training") if training and not training.get("error"): training_status, training_detail, training_missing = self._training_verdict(training) lines.append("## Training Simulation\n") lines.append("| Metric | Value |") lines.append("|--------|-------|") lines.append(f"| Model | {training.get('model', 'N/A')} |") lines.append(f"| Params | {training.get('total_params_m', 0):.1f}M |") lines.append(f"| Throughput | {training.get('throughput_tokens_per_sec', 0):.0f} tokens/sec |") lines.append(f"| Avg Step Time | {training.get('avg_step_time_ms', 0):.1f} ms |") lines.append(f"| Warmup Steps | {training.get('warmup_steps', 'N/A')} |") lines.append(f"| Peak Memory | {training.get('peak_memory_gb', 0):.1f} GB |") lines.append(f"| Final Loss | {training.get('final_loss', 'N/A')} |") lines.append(f"| Step Jitter | {training.get('step_jitter_pct', 'N/A')}% |") lines.append(f"| Distributed Mode | {training.get('distributed_mode', 'N/A')} |") if training_missing: lines.append(f"| Acceptance Gaps | missing {', '.join(training_missing)} |") lines.append(f"| Verdict | {training_status} ({training_detail}) |") lines.append("") # --- Footer --- lines.append("---") lines.append(f"*Generated by GPU Test Suite v{__version__}*") content = "\n".join(lines) with open(output, "w") as f: f.write(content) self.console.print(f"[green]Markdown report saved to: {output}[/green]") return output def _extract_memory_results(self, results: dict) -> dict: """Extract memory benchmark data from either full-suite or single-test format.""" if "memory_bench" in results: data = results["memory_bench"] return data.get("memory", data) if isinstance(data, dict) else {} if "benchmark" in results: bench = results["benchmark"] if isinstance(bench, dict) and "memory" in bench: return bench["memory"] return {} def _extract_compute_results(self, results: dict) -> dict: """Extract compute benchmark data from either full-suite or single-test format.""" if "compute_bench" in results: data = results["compute_bench"] return data.get("compute", data) if isinstance(data, dict) else {} if "benchmark" in results: bench = results["benchmark"] if isinstance(bench, dict) and "compute" in bench: return bench["compute"] return {} @staticmethod def _training_verdict(training: dict) -> tuple[str, str, list[str]]: """Return report status for both current and legacy training result schemas.""" tps = float(training.get("throughput_tokens_per_sec", 0) or 0) if "passed" in training: status = "PASS" if training.get("passed") else "FAIL" return status, f"{tps:.0f} tokens/sec", [] required = ["passed", "step_jitter_pct", "distributed_mode", "loss_finite"] missing = [k for k in required if k not in training] return "UNVERIFIED", f"{tps:.0f} tokens/sec; legacy result lacks explicit acceptance verdict", missing def _rdma_cfg_value(self, key: str, default: float) -> float: try: return float((self.config.get("rdma", {}) or {}).get(key, default)) except (TypeError, ValueError): return default def _rdma_bandwidth_verdict(self, row: dict) -> tuple[float, str]: threshold = self._rdma_cfg_value("min_bandwidth_gbps", 47.0) value = float(row.get("bandwidth_gbps", 0) or 0) return threshold, "PASS" if value >= threshold else "FAIL" def _rdma_latency_verdict(self, row: dict) -> tuple[float, str]: name = row.get("test", "") if name == "ib_write_lat": threshold = self._rdma_cfg_value("max_write_latency_us", 2.0) elif name == "ib_read_lat": threshold = self._rdma_cfg_value("max_read_latency_us", 3.5) else: threshold = self._rdma_cfg_value("max_latency_us", 3.5) value = float(row.get("latency_us", 0) or 0) return threshold, "PASS" if 0 < value <= threshold else "FAIL" def _rdma_legacy_note(self, rdma: dict) -> str: """Flag old RDMA result schemas whose embedded thresholds were looser.""" for row in rdma.get("bandwidth_tests", []) or []: if row.get("min_required_gbps") != self._rdma_cfg_value("min_bandwidth_gbps", 47.0): return ( "Legacy RDMA result re-evaluated with current PDF acceptance thresholds; " "old WARN statuses and old 50GB/s/10us limits are not used for verdict." ) for row in rdma.get("latency_tests", []) or []: threshold, _ = self._rdma_latency_verdict(row) if row.get("max_allowed_us") != threshold: return ( "Legacy RDMA result re-evaluated with current PDF acceptance thresholds; " "old WARN statuses and old 50GB/s/10us limits are not used for verdict." ) return "" def _rdma_failure_reasons(self, rdma: dict) -> list[str]: failures = [] for row in rdma.get("bandwidth_tests", []) or []: threshold, status = self._rdma_bandwidth_verdict(row) if status != "PASS": failures.append( f"{row.get('test')} bandwidth {row.get('bandwidth_gbps', 0)}GB/s < {threshold:g}GB/s" ) for row in rdma.get("latency_tests", []) or []: threshold, status = self._rdma_latency_verdict(row) if status != "PASS": failures.append( f"{row.get('test')} latency {row.get('latency_us', 0)}us > {threshold:g}us" ) for row in rdma.get("ibping_tests", []) or []: if row.get("status") != "PASS": failures.append(f"{row.get('test')} failed") return failures @staticmethod def _overall_acceptance_verdict(summary_items: list[tuple[str, str]]) -> tuple[str, list[tuple[str, str]], list[str]]: """PDF-style verdict for the report scope. Full-suite reports require every single-node acceptance item. Standalone reports, such as `--test multinode-nccl`, should only judge the items that were actually requested instead of reporting unrelated evidence as missing. """ single_node_required = [ "GPU Info", "Health Check", "Memory Bandwidth", "Compute Throughput", "NVLink/NVSwitch", "NCCL", "Stress Test", "RDMA", "DCGM", "Training", ] status_by_name = dict(summary_items) present_single_node = [name for name in single_node_required if name in status_by_name] if len(present_single_node) >= 3: required = list(single_node_required) if "Multi-node NCCL" in status_by_name: required.append("Multi-node NCCL") else: required = list(status_by_name) missing = [name for name in required if name not in status_by_name] failures = [ (name, status) for name, status in summary_items if name in required and not str(status).startswith("PASS") ] verdict = "PASS" if not missing and not failures else "FAIL" return verdict, failures, missing def _build_summary(self, results: dict) -> list[tuple[str, str]]: """Build summary verdict list from results.""" items = [] # GPU Info if "gpu_info" in results: gi = results["gpu_info"] if gi.get("error"): items.append(("GPU Info", f"ERROR: {gi['error']}")) else: items.append(("GPU Info", f"PASS ({gi.get('gpu_count', '?')} GPUs detected)")) # Health if "health" in results: h = results["health"] if h.get("error"): items.append(("Health Check", f"ERROR: {h['error']}")) elif h.get("passed"): items.append(("Health Check", "PASS")) else: items.append(("Health Check", "FAIL")) # Memory Bandwidth mem = self._extract_memory_results(results) if mem: if mem.get("error"): items.append(("Memory Bandwidth", f"ERROR: {mem['error']}")) elif mem.get("source") == "pytorch": # PyTorch fallback can't reach HBM peak — report as WARN, not FAIL. d2d = mem.get("d2d_bandwidth_gbps") or 0 items.append(("Memory Bandwidth", f"WARN ({d2d:.0f} GB/s via PyTorch fallback)")) else: eff = mem.get("d2d_efficiency_pct") or mem.get("efficiency_pct") or 0 verdict = "PASS" if eff >= 80 else ("WARN" if eff >= 60 else "FAIL") items.append(("Memory Bandwidth", f"{verdict} ({eff:.1f}%)")) # Compute comp = self._extract_compute_results(results) if comp: if comp.get("error"): items.append(("Compute Throughput", f"ERROR: {comp['error']}")) else: per_dtype = comp.get("per_dtype_tflops", {}) eff_pct = comp.get("efficiency_pct", {}) pass_thresholds = comp.get("pass_thresholds_tflops", {}) or {} if pass_thresholds: # Absolute TFLOPS judgment, mirroring the per-dtype table above. rank = {"PASS": 0, "WARN": 1, "FAIL": 2} worst_status = "PASS" worst_dt = None lowest_margin = None for dt, thr in pass_thresholds.items(): val = per_dtype.get(dt) if not isinstance(val, (int, float)): continue if val >= thr: st = "PASS" else: st = "FAIL" margin = val / thr if thr else 0 if lowest_margin is None or margin < lowest_margin: lowest_margin = margin worst_dt = dt if rank[st] > rank[worst_status]: worst_status = st if worst_dt: consistency = comp.get("consistency", {}) or {} failed_consistency = [ (dt, row) for dt, row in consistency.items() if not row.get("passed", False) ] if failed_consistency: worst_status = "FAIL" fail_dt, fail_row = failed_consistency[0] items.append(( "Compute Throughput", f"FAIL ({fail_dt.upper()} spread " f"{fail_row.get('spread_pct', 0):.2f}% > " f"{fail_row.get('max_allowed_pct', 3)}%)" )) else: items.append(( "Compute Throughput", f"{worst_status} (worst {worst_dt.upper()} " f"{per_dtype[worst_dt]:.0f} vs >= {pass_thresholds[worst_dt]})" )) else: items.append(("Compute Throughput", f"{worst_status}")) else: valid_effs = [v for v in eff_pct.values() if isinstance(v, (int, float)) and v > 0] if valid_effs: worst = min(valid_effs) verdict = "PASS" if worst >= 80 else ("WARN" if worst >= 50 else "FAIL") items.append(("Compute Throughput", f"{verdict} (worst {worst:.1f}%)")) else: items.append(("Compute Throughput", "N/A")) # NCCL if "nvlink" in results: nvl = results["nvlink"] if nvl.get("error"): items.append(("NVLink/NVSwitch", f"ERROR: {nvl['error']}")) elif nvl.get("passed"): items.append(("NVLink/NVSwitch", "PASS")) else: items.append(("NVLink/NVSwitch", "FAIL")) if "dcgm" in results: d = results["dcgm"] if d.get("error"): items.append(("DCGM", f"ERROR: {d['error']}")) elif d.get("passed"): items.append(("DCGM", "PASS")) else: items.append(("DCGM", "FAIL")) # NCCL if "nccl" in results: n = results["nccl"] if n.get("error"): items.append(("NCCL", f"ERROR: {n['error']}")) elif n.get("source") == "torchrun_fallback": items.append(("NCCL", "FAIL (no nccl-tests bus BW)")) elif n.get("passed"): items.append(("NCCL", "PASS")) else: items.append(("NCCL", "FAIL")) if "multinode_nccl" in results: mn = results["multinode_nccl"] if mn.get("error"): items.append(("Multi-node NCCL", f"ERROR: {mn['error']}")) elif mn.get("passed"): items.append(("Multi-node NCCL", "PASS")) else: items.append(("Multi-node NCCL", "FAIL")) # Stress if "stress" in results: s = results["stress"] if s.get("error"): items.append(("Stress Test", f"ERROR: {s['error']}")) elif s.get("passed"): items.append(("Stress Test", "PASS")) else: items.append(("Stress Test", "FAIL")) # RDMA if "rdma" in results: r = results["rdma"] if r.get("skipped") or r.get("status") == "SKIP": items.append(("RDMA", f"SKIP ({r.get('reason', 'no IB hardware')})")) elif r.get("error"): items.append(("RDMA", f"ERROR: {r['error']}")) elif r.get("passed"): items.append(("RDMA", "PASS")) else: items.append(("RDMA", "FAIL")) # Training if "training" in results: t = results["training"] if t.get("error"): items.append(("Training", f"ERROR: {t['error']}")) else: status, detail, _missing = self._training_verdict(t) items.append(("Training", f"{status} ({detail})")) return items def _format_gbps(value) -> str: try: numeric = float(value) except (TypeError, ValueError): return str(value) if numeric.is_integer(): return f"{numeric:.0f}" return f"{numeric:.2f}"