"""Report generation module - export test results to JSON/HTML/Markdown."""
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Optional
try:
from importlib.metadata import version as _pkg_version
__version__ = _pkg_version("gpu-server-test-suite")
except Exception:
__version__ = "0.2.0"
from rich.console import Console
from rich.panel import Panel
HTML_TEMPLATE = """
GPU Test Report - {timestamp}
{content}
"""
class ReportGenerator:
def __init__(self, config: dict):
self.config = config
self.console = Console()
self.report_cfg = config.get("report", {})
def generate(self, results: dict, fmt: str = None, output: str = None) -> str:
fmt = fmt or self.report_cfg.get("format", "json")
output_dir = self.report_cfg.get("output_dir", "./reports")
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if not output:
output = os.path.join(output_dir, f"gpu_report_{timestamp}.{fmt}")
if fmt == "json":
return self._generate_json(results, output)
elif fmt == "html":
return self._generate_html(results, output)
elif fmt == "md":
return self._generate_markdown(results, output)
else:
self.console.print(f"[red]Unsupported format: {fmt}[/red]")
return ""
def _generate_json(self, results: dict, output: str) -> str:
with open(output, "w") as f:
json.dump(results, f, indent=2, default=str)
self.console.print(f"[green]JSON report saved to: {output}[/green]")
return output
def _generate_html(self, results: dict, output: str) -> str:
import socket
hostname = results.get("hostname") or socket.gethostname()
timestamp = results.get("timestamp") or datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sections = []
if "gpu_info" in results:
gpus = results["gpu_info"].get("gpus", [])
rows = ""
for g in gpus:
rows += f"| GPU {g['index']} | {g['name']} | "
rows += f"{g['vram_total_mb']} MB | "
rows += f"{g['temperature']}°C | "
rows += f"{g['clock_sm']} MHz |
"
sections.append(
f'GPU Information
'
f'
Driver: {results["gpu_info"].get("driver_version", "N/A")} | '
f'CUDA: {results["gpu_info"].get("cuda_version", "N/A")} | '
f'Count: {len(gpus)}
'
f'
| GPU | Model | VRAM | Temp | SM Clock |
'
f'{rows}
'
)
if "health" in results:
h = results["health"]
passed = h.get("passed", False)
cls = "pass" if passed else "fail"
txt = "ALL PASSED" if passed else "SOME CHECKS FAILED"
sections.append(f'{txt}
')
if "benchmark" in results and "memory" in results["benchmark"]:
mem = results["benchmark"]["memory"]
sections.append(
f'Memory Bandwidth
'
f'
{mem.get("d2d_bandwidth_gbps", "N/A")} GB/s
'
f'
D2D (HBM)
'
f'
{mem.get("efficiency_pct", "N/A")}%
'
f'
Efficiency vs Peak ({mem.get("peak_bandwidth_gbps", "N/A")} GB/s)
'
f'
'
)
if "benchmark" in results and "compute" in results["benchmark"]:
comp = results["benchmark"]["compute"]
dtype_rows = ""
per_dtype = comp.get("per_dtype_tflops", {})
eff = comp.get("efficiency_pct", {})
for dt, tflops in per_dtype.items():
ef = eff.get(dt, 0)
cls = "pass" if ef >= 80 else ("warn" if ef >= 50 else "fail")
if isinstance(tflops, (int, float)):
dtype_rows += f'| {dt.upper()} | {tflops:.1f} TFLOPS | '
dtype_rows += f'{ef:.1f}% |
'
if dtype_rows:
sections.append(
f'Compute Throughput
'
f'
| DType | Achieved | Efficiency |
'
f'{dtype_rows}
'
)
if "training" in results:
t = results["training"]
sections.append(
f'Training Simulation
'
f'
{t.get("throughput_tokens_per_sec", "N/A")}
'
f'
Tokens/sec
'
f'
{t.get("avg_step_time_ms", "N/A")} ms
'
f'
Avg Step Time
'
f'
{t.get("peak_memory_gb", "N/A")} GB
'
f'
Peak Memory
'
f'
'
)
content = "\n".join(sections)
html = HTML_TEMPLATE.format(timestamp=timestamp, hostname=hostname, content=content)
with open(output, "w") as f:
f.write(html)
self.console.print(f"[green]HTML report saved to: {output}[/green]")
return output
# ------------------------------------------------------------------
# Markdown report
# ------------------------------------------------------------------
def _generate_markdown(self, results: dict, output: str) -> str:
import socket
hostname = results.get("hostname") or socket.gethostname()
timestamp = results.get("timestamp") or datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines: list[str] = []
# --- Header ---
lines.append("# GPU Test Report\n")
lines.append(f"- **Date:** {timestamp}")
lines.append(f"- **Host:** {hostname}")
# Extract GPU info for header
gpu_info = results.get("gpu_info")
if gpu_info and not gpu_info.get("error"):
gpus = gpu_info.get("gpus", [])
gpu_name = gpus[0]["name"] if gpus else "Unknown"
lines.append(f"- **GPU:** {gpu_name} x{gpu_info.get('gpu_count', len(gpus))}")
lines.append(f"- **Driver:** {gpu_info.get('driver_version', 'N/A')} | "
f"**CUDA:** {gpu_info.get('cuda_version', 'N/A')}")
lines.append("")
# --- Summary table ---
summary_items = self._build_summary(results)
if summary_items:
verdict, failures, missing = self._overall_acceptance_verdict(summary_items)
lines.append("## Overall Acceptance Verdict\n")
lines.append(f"**Result: {verdict}**")
lines.append("")
if failures:
lines.append("Failed or unverified items:")
for name, status in failures:
lines.append(f"- {name}: {status}")
lines.append("")
if missing:
lines.append("Missing required evidence:")
for name in missing:
lines.append(f"- {name}")
lines.append("")
lines.append("## Summary\n")
lines.append("| Test | Result |")
lines.append("|------|--------|")
for name, verdict in summary_items:
lines.append(f"| {name} | {verdict} |")
lines.append("")
# --- GPU Information ---
if gpu_info and not gpu_info.get("error"):
lines.append("## GPU Information\n")
gpus = gpu_info.get("gpus", [])
lines.append("| GPU | Model | VRAM | Temp | Power | SM Clock |")
lines.append("|-----|-------|------|------|-------|----------|")
for g in gpus:
vram = f"{g.get('vram_total_mb', 0)} MB"
temp = f"{g.get('temperature', 'N/A')}C"
power = f"{g.get('power_draw', 0):.0f}/{g.get('power_limit', 0):.0f}W"
clock = f"{g.get('clock_sm', 0)} MHz"
lines.append(f"| {g['index']} | {g['name']} | {vram} | {temp} | {power} | {clock} |")
lines.append("")
# --- Health Check ---
health = results.get("health")
if health and not health.get("error"):
lines.append("## Health Check\n")
passed = health.get("passed", False)
lines.append(f"**Overall: {'PASS' if passed else 'FAIL'}**\n")
gpu_health = health.get("gpu_health", [])
if gpu_health:
lines.append("| GPU | Temp | Power | ECC | PCIe | Throttle | Status |")
lines.append("|-----|------|-------|-----|------|----------|--------|")
for gh in gpu_health:
checks = gh.get("checks", {})
temp_c = checks.get("temperature", {})
pwr = checks.get("power", {})
ecc = checks.get("ecc_errors", {})
pcie = checks.get("pcie_link", {})
throttle = checks.get("throttling", {})
temp_str = f"{temp_c.get('value', '?')}C {temp_c.get('status', '')}"
pwr_str = f"{pwr.get('value', 0):.0f}W {pwr.get('status', '')}"
ecc_str = f"S:{ecc.get('single', 0)} D:{ecc.get('double', 0)}"
pcie_str = f"Gen{pcie.get('gen', '?')}x{pcie.get('width', '?')}"
throt_str = throttle.get("status", "?")
status = gh.get("status", "?")
lines.append(f"| {gh['index']} | {temp_str} | {pwr_str} | "
f"{ecc_str} | {pcie_str} | {throt_str} | **{status}** |")
lines.append("")
# --- Memory Bandwidth ---
mem_data = self._extract_memory_results(results)
if mem_data and not mem_data.get("error"):
lines.append("## Memory Bandwidth\n")
lines.append(f"Source: {mem_data.get('source', 'unknown')}\n")
lines.append("| Metric | Value | Peak | Efficiency |")
lines.append("|--------|-------|------|------------|")
d2d = mem_data.get("d2d_bandwidth_gbps") or 0
h2d = mem_data.get("h2d_bandwidth_gbps") or 0
d2h = mem_data.get("d2h_bandwidth_gbps") or 0
# New format with per-metric peaks
h2d_peak = mem_data.get("h2d_peak_gbps") or 0
d2h_peak = mem_data.get("d2h_peak_gbps") or 0
d2d_peak = mem_data.get("d2d_peak_gbps") or 0
h2d_eff = mem_data.get("h2d_efficiency_pct") or 0
d2h_eff = mem_data.get("d2h_efficiency_pct") or 0
d2d_eff = mem_data.get("d2d_efficiency_pct") or 0
# Fallback for old format
if not d2d_peak:
d2d_peak = mem_data.get("peak_bandwidth_gbps") or 0
d2d_eff = mem_data.get("efficiency_pct") or 0
lines.append(f"| H2D (PCIe) | {h2d:.1f} GB/s | {h2d_peak:.0f} GB/s | {h2d_eff:.1f}% |")
lines.append(f"| D2H (PCIe) | {d2h:.1f} GB/s | {d2h_peak:.0f} GB/s | {d2h_eff:.1f}% |")
lines.append(f"| D2D (NVLink) | {d2d:.1f} GB/s | {d2d_peak:.0f} GB/s | {d2d_eff:.1f}% |")
lines.append("")
# PyTorch fallback can't accurately measure HBM peak (intra-GPU copy_()
# only reaches ~20% of HBM bandwidth). When fallback is used, report
# the number but mark as WARN with a note instead of evaluating as FAIL.
if mem_data.get("source") == "pytorch":
lines.append(
f"**Verdict: WARN** (D2D {d2d:.1f} GB/s via PyTorch fallback; "
"nvbandwidth unavailable — figure is indicative only, not a true HBM peak)\n"
)
else:
# Tightened to match production acceptance: PASS >= 80%, WARN 60–80%, FAIL < 60%.
verdict = "PASS" if d2d_eff >= 80 else ("WARN" if d2d_eff >= 60 else "FAIL")
lines.append(f"**Verdict: {verdict}** (D2D efficiency {d2d_eff:.1f}%)\n")
# --- Compute Throughput ---
comp_data = self._extract_compute_results(results)
if comp_data and not comp_data.get("error"):
lines.append("## Compute Throughput\n")
per_dtype = comp_data.get("per_dtype_tflops", {})
peak_tflops = comp_data.get("peak_tflops", {})
eff_pct = comp_data.get("efficiency_pct", {})
# Absolute PASS thresholds (TFLOPS) from gpu_specs.compute_pass_thresholds_tflops.
# When present, override the legacy 80%-of-peak rule on a per-dtype basis.
pass_thresholds = comp_data.get("pass_thresholds_tflops", {}) or {}
use_abs = bool(pass_thresholds)
if use_abs:
lines.append("| DType | Achieved (TFLOPS) | Peak | Threshold | Status |")
else:
lines.append("| DType | Achieved (TFLOPS) | Peak | Efficiency | Status |")
lines.append("|-------|-------------------|------|------------|--------|")
worst_eff = 100.0
overall_status = "PASS"
rank = {"PASS": 0, "WARN": 1, "FAIL": 2, "SKIP": 0}
for dt, val in per_dtype.items():
if isinstance(val, str):
# skipped or error
lines.append(f"| {dt.upper()} | {val} | - | N/A | SKIP |")
else:
pk = peak_tflops.get(dt, 0)
ef = eff_pct.get(dt, 0)
if isinstance(ef, (int, float)) and ef > 0:
worst_eff = min(worst_eff, ef)
thr = pass_thresholds.get(dt)
if use_abs and thr:
if val >= thr:
status = "PASS"
else:
status = "FAIL"
lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | >= {thr} | {status} |")
else:
status = "PASS" if ef >= 80 else ("WARN" if ef >= 50 else "FAIL")
lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | {ef:.1f}% | {status} |")
if rank.get(status, 0) > rank.get(overall_status, 0):
overall_status = status
lines.append("")
if use_abs:
if any(not row.get("passed", False) for row in (comp_data.get("consistency", {}) or {}).values()):
overall_status = "FAIL"
lines.append(f"**Verdict: {overall_status}** (absolute TFLOPS thresholds; worst efficiency {worst_eff:.1f}%)\n")
else:
overall_status = "PASS" if worst_eff >= 80 else ("WARN" if worst_eff >= 50 else "FAIL")
lines.append(f"**Verdict: {overall_status}** (worst efficiency {worst_eff:.1f}%)\n")
consistency = comp_data.get("consistency", {}) or {}
if consistency:
lines.append("### Compute Consistency\n")
lines.append("| DType | Min | Mean | Max | Spread | Limit | Status |")
lines.append("|-------|-----|------|-----|--------|-------|--------|")
for dt, row in consistency.items():
status = "PASS" if row.get("passed") else "FAIL"
lines.append(
f"| {dt.upper()} | {row.get('min_tflops', 0):.1f} | "
f"{row.get('mean_tflops', 0):.1f} | {row.get('max_tflops', 0):.1f} | "
f"{row.get('spread_pct', 0):.2f}% | <= {row.get('max_allowed_pct', 3)}% | {status} |"
)
lines.append("")
per_gpu = comp_data.get("per_gpu", []) or []
dtype_order = [dt for dt in per_dtype.keys() if not isinstance(per_dtype.get(dt), str)]
if per_gpu and dtype_order:
lines.append("### Compute Per-GPU TFLOPS\n")
headers = ["GPU", *[dt.upper() for dt in dtype_order]]
lines.append("| " + " | ".join(headers) + " |")
lines.append("|" + "|".join(["---"] * len(headers)) + "|")
for row in per_gpu:
cells = [str(row.get("index", ""))]
for dt in dtype_order:
val = row.get(dt, "")
cells.append(f"{val:.1f}" if isinstance(val, (int, float)) else str(val))
lines.append("| " + " | ".join(cells) + " |")
lines.append("")
# --- NCCL ---
nvlink = results.get("nvlink")
if nvlink and not nvlink.get("error"):
lines.append("## NVLink/NVSwitch\n")
lines.append(f"**Overall: {'PASS' if nvlink.get('passed') else 'FAIL'}**\n")
lines.append("| GPU | Active Links | Issues |")
lines.append("|-----|--------------|--------|")
for g in nvlink.get("gpus", []):
issues = []
if g.get("inactive_links"):
issues.append("inactive=" + ",".join(g["inactive_links"]))
if g.get("speed_issues"):
issues.append(f"speed issues={len(g['speed_issues'])}")
if g.get("error_issues"):
issues.append(f"errors={len(g['error_issues'])}")
lines.append(f"| {g.get('gpu')} | {g.get('active_links')}/{g.get('expected_links')} | {', '.join(issues) or 'OK'} |")
lines.append("")
elif nvlink and nvlink.get("error"):
lines.append("## NVLink/NVSwitch\n")
lines.append(f"**Overall: FAIL** ({nvlink.get('error')})\n")
dcgm = results.get("dcgm")
if dcgm and not dcgm.get("error"):
lines.append("## DCGM Diagnostic\n")
lines.append(f"**Overall: {'PASS' if dcgm.get('passed') else 'FAIL'}**\n")
if dcgm.get("subtests"):
lines.append("| Subtest | Status |")
lines.append("|---------|--------|")
for s in dcgm.get("subtests", []):
lines.append(f"| {s.get('name', '')} | {s.get('status', '')} |")
lines.append("")
elif dcgm and dcgm.get("error"):
lines.append("## DCGM Diagnostic\n")
lines.append(f"**Overall: FAIL** ({dcgm.get('error')})\n")
# --- NCCL ---
nccl = results.get("nccl")
if nccl and not nccl.get("error"):
lines.append("## NCCL Multi-GPU\n")
lines.append(f"Source: {nccl.get('source', 'unknown')} | "
f"GPUs: {nccl.get('gpu_count', '?')}\n")
if nccl.get("source") == "torchrun_fallback":
lines.append("> Functional NCCL smoke only: nccl-tests bus bandwidth was not measured, so this does not satisfy production acceptance.\n")
tests = nccl.get("tests", {})
if tests:
lines.append("> Summary reports the best Bus BW observed for each operation. PASS/FAIL is evaluated across every tested message size and repeat run shown in the detail table below.\n")
lines.append("| Operation | Best Bus BW (GB/s) | Failed Sizes | Threshold | Status |")
lines.append("|-----------|--------------------|--------------|-----------|--------|")
for op, data in tests.items():
if isinstance(data, dict) and not data.get("error"):
bw = data.get("best_busbw_gbps", 0)
req = data.get("min_required_gbps", 0)
status = data.get("status", "?")
failed_sizes = [
str(row.get("size", "?"))
for row in data.get("by_size", [])
if row.get("status") != "PASS"
]
failed_sizes_text = ", ".join(failed_sizes) if failed_sizes else "-"
lines.append(f"| {op} | {bw:.1f} | {failed_sizes_text} | >= {req:.0f} | {status} |")
elif isinstance(data, dict) and data.get("error"):
lines.append(f"| {op} | - | - | - | ERROR: {data['error']} |")
lines.append("")
for op, data in tests.items():
by_size = data.get("by_size", []) if isinstance(data, dict) else []
if not by_size:
continue
lines.append(f"### NCCL {op} by size\n")
lines.append("| Size | Runs Bus BW (GB/s) | Worst | Mean | StdDev | Threshold | Status |")
lines.append("|------|---------------------|-------|------|--------|-----------|--------|")
for row in by_size:
runs = ", ".join(str(v) for v in row.get("runs_busbw_gbps", []))
lines.append(
f"| {row.get('size', '')} | {runs} | "
f"{row.get('worst_busbw_gbps', 0):.1f} | "
f"{row.get('mean_busbw_gbps', 0):.1f} | "
f"{row.get('stddev_pct', 0):.2f}% | "
f">= {data.get('min_required_gbps', 0):.0f} | "
f"{row.get('status', '?')} |"
)
lines.append("")
passed = nccl.get("passed", False)
lines.append(f"**Overall: {'PASS' if passed else 'FAIL'}**\n")
multinode = results.get("multinode_nccl")
if multinode and not multinode.get("error"):
lines.append("## Multi-node NCCL / Cross Leaf\n")
lines.append(f"Source: {multinode.get('source', 'unknown')} | Mode: {multinode.get('mode', 'unknown')}\n")
hosts = multinode.get("hosts", [])
if hosts:
host_text = ", ".join(f"{h.get('name') or h.get('addr')}({h.get('addr')})" for h in hosts)
lines.append(f"- **Hosts:** {host_text}")
preflight = multinode.get("preflight", {})
if preflight.get("checks"):
failed_checks = [c for c in preflight["checks"] if c.get("status") == "FAIL"]
warn_checks = [c for c in preflight["checks"] if c.get("status") == "WARN"]
lines.append(f"- **Preflight:** {'PASS' if not failed_checks else 'FAIL'}"
f"{f' ({len(warn_checks)} warnings)' if warn_checks else ''}")
lines.append("")
for op, data in (multinode.get("tests") or {}).items():
lines.append(f"### Multi-node NCCL {op}\n")
lines.append("| Topology | Peak Bus BW | Peak Size | Avg Bus BW | Threshold | Status |")
lines.append("|----------|-------------|-----------|------------|-----------|--------|")
for topo in data.get("topologies", []):
threshold = topo.get("min_required_gbps", 0) or 0
threshold_text = f">= {threshold:.0f} GB/s" if threshold else "-"
lines.append(
f"| {topo.get('label', '')} | {topo.get('peak_busbw_gbps', 0):.2f} GB/s | "
f"{topo.get('peak_size', '')} | {topo.get('avg_busbw_gbps', 0):.2f} GB/s | "
f"{threshold_text} | {topo.get('status', '?')} |"
)
lines.append("")
diag_rows = []
for topo in data.get("topologies", []):
net = topo.get("network") or {}
if net:
diag_rows.append((topo, net))
if diag_rows:
lines.append("| Topology | NCCL Network | GPU Direct RDMA | GDR Disabled HCAs |")
lines.append("|----------|--------------|-----------------|-------------------|")
for topo, net in diag_rows:
networks = ", ".join(net.get("networks") or []) or "unknown"
gdr = net.get("gpu_direct_rdma", "UNKNOWN")
disabled = ", ".join(net.get("gdr_disabled_hcas") or []) or "-"
lines.append(f"| {topo.get('label', '')} | {networks} | {gdr} | {disabled} |")
lines.append("")
failed_topos = [topo for topo in data.get("topologies", []) if topo.get("status") == "FAIL"]
if failed_topos:
lines.append("| Topology | Return Code | Error / Output Tail |")
lines.append("|----------|-------------|---------------------|")
for topo in failed_topos:
tail = topo.get("error") or topo.get("stderr_tail") or topo.get("stdout_tail") or ""
tail = str(tail).replace("\n", " ").replace("|", "\\|")[-240:]
lines.append(f"| {topo.get('label', '')} | {topo.get('returncode', '')} | {tail} |")
lines.append("")
lines.append(f"**Overall: {'PASS' if multinode.get('passed') else 'FAIL'}**\n")
elif multinode and multinode.get("error"):
lines.append("## Multi-node NCCL / Cross Leaf\n")
lines.append(f"**Overall: FAIL** ({multinode.get('error')})\n")
preflight = multinode.get("preflight", {})
if preflight.get("checks"):
lines.append("| Check | Status | Detail |")
lines.append("|-------|--------|--------|")
for check in preflight["checks"]:
detail = str(check.get("detail", "")).replace("\n", " ")
lines.append(f"| {check.get('name', '')} | {check.get('status', '')} | {detail} |")
lines.append("")
# --- Stress Test ---
stress = results.get("stress")
if stress and not stress.get("error"):
lines.append("## Stress Test\n")
passed = stress.get("passed", False)
duration = stress.get("duration_sec") or 0
elapsed = stress.get("elapsed_sec") or 0
source = stress.get("source", "unknown")
lines.append(f"- **Source:** {source}")
lines.append(f"- **Duration:** {elapsed:.0f}s (requested {duration}s)")
telemetry = stress.get("telemetry") or {}
if telemetry:
lines.append(f"- **Telemetry samples:** {telemetry.get('samples', 0)}")
lines.append(f"- **Max temp:** {telemetry.get('max_temp_c', {})}")
lines.append(f"- **Avg power:** {telemetry.get('avg_power_w', {})}")
lines.append(f"- **Temp delta:** {telemetry.get('temp_delta_c', 'N/A')} C")
lines.append(f"- **TFLOPS jitter:** {telemetry.get('tflops_jitter_pct', 'N/A')}%")
lines.append(f"- **Steady TFLOPS samples:** {telemetry.get('steady_tflops_samples', 0)}")
lines.append(f"- **Throttle events:** {telemetry.get('throttle_event_count', len(telemetry.get('throttle_events', [])))}")
lines.append(f"- **XID events:** {len(telemetry.get('xid_events', []))}")
failures = telemetry.get("failures") or []
if failures:
lines.append("- **Failure reasons:**")
for reason in failures:
lines.append(f" - {reason}")
lines.append(f"- **Result: {'PASS' if passed else 'FAIL'}**")
lines.append("")
# --- RDMA ---
rdma = results.get("rdma")
if rdma and (rdma.get("skipped") or rdma.get("status") == "SKIP"):
lines.append("## RDMA/InfiniBand\n")
lines.append(f"**Overall: SKIP** [{rdma.get('reason', 'no IB hardware detected')}]\n")
elif rdma and not rdma.get("error"):
lines.append("## RDMA/InfiniBand\n")
rdma_legacy_note = self._rdma_legacy_note(rdma)
if rdma_legacy_note:
lines.append(f"> {rdma_legacy_note}\n")
port_checks = rdma.get("port_checks", [])
if port_checks:
lines.append("### RDMA Port Checks\n")
lines.append("| Device | Port | State | Rate | Required | Status |")
lines.append("|--------|------|-------|------|----------|--------|")
for p in port_checks:
lines.append(
f"| {p.get('device', '')} | {p.get('port', '')} | "
f"{p.get('state', '')} | {p.get('rate', '')} | "
f">= {p.get('min_rate_gbps', 400):.0f}Gbps ACTIVE | {p.get('status', '?')} |"
)
lines.append("")
bw_tests = rdma.get("bandwidth_tests", [])
lat_tests = rdma.get("latency_tests", [])
ibping_tests = rdma.get("ibping_tests", [])
if bw_tests or lat_tests or ibping_tests:
lines.append("| Test | Value | Threshold | Status |")
lines.append("|------|-------|-----------|--------|")
for bt in bw_tests:
if bt.get("error"):
lines.append(f"| {bt.get('test', 'ib_bw')} | {bt.get('error')} | required runnable test | {bt.get('status', 'FAIL')} |")
else:
threshold, status = self._rdma_bandwidth_verdict(bt)
lines.append(f"| {bt['test']} | {bt.get('bandwidth_gbps', 0):.1f} GB/s | "
f">= {threshold:g} GB/s | {status} |")
for lt in lat_tests:
if lt.get("error"):
lines.append(f"| {lt.get('test', 'ib_lat')} | {lt.get('error')} | required runnable test | {lt.get('status', 'FAIL')} |")
else:
threshold, status = self._rdma_latency_verdict(lt)
lines.append(f"| {lt['test']} | {lt.get('latency_us', 0):.2f} us | "
f"<= {threshold:g} us | {status} |")
for it in ibping_tests:
direction = it.get("direction") or it.get("role", "N/A")
if it.get("error"):
lines.append(f"| {it.get('test', 'ibping')} | {it.get('error')} | bidirectional peer evidence | {it.get('status', 'FAIL')} |")
else:
lines.append(f"| {it['test']} | {direction} target={it.get('target', 'N/A')} count={it.get('count', 'N/A')} | "
f"0% packet loss | {it.get('status', '?')} |")
lines.append("")
fabric = rdma.get("fabric_counters") or {}
if fabric:
counters = fabric.get("counters", {})
lines.append(f"- **PFC/ECN/CNP/congestion counters checked:** {len(counters)}")
lines.append(f"- **PFC/ECN/CNP/congestion non-zero:** {'yes' if fabric.get('failed') else 'no'}")
if not counters:
lines.append("- **PFC/ECN/CNP/congestion evidence:** missing")
failures = rdma.get("failures") or []
if not failures:
failures = self._rdma_failure_reasons(rdma)
if failures:
lines.append("- **Failure reasons:**")
for reason in failures:
lines.append(f" - {reason}")
passed = rdma.get("passed", False)
lines.append(f"**Overall: {'PASS' if passed else 'FAIL'}**\n")
# --- Training ---
training = results.get("training")
if training and not training.get("error"):
training_status, training_detail, training_missing = self._training_verdict(training)
lines.append("## Training Simulation\n")
lines.append("| Metric | Value |")
lines.append("|--------|-------|")
lines.append(f"| Model | {training.get('model', 'N/A')} |")
lines.append(f"| Params | {training.get('total_params_m', 0):.1f}M |")
lines.append(f"| Throughput | {training.get('throughput_tokens_per_sec', 0):.0f} tokens/sec |")
lines.append(f"| Avg Step Time | {training.get('avg_step_time_ms', 0):.1f} ms |")
lines.append(f"| Warmup Steps | {training.get('warmup_steps', 'N/A')} |")
lines.append(f"| Peak Memory | {training.get('peak_memory_gb', 0):.1f} GB |")
lines.append(f"| Final Loss | {training.get('final_loss', 'N/A')} |")
lines.append(f"| Step Jitter | {training.get('step_jitter_pct', 'N/A')}% |")
lines.append(f"| Distributed Mode | {training.get('distributed_mode', 'N/A')} |")
if training_missing:
lines.append(f"| Acceptance Gaps | missing {', '.join(training_missing)} |")
lines.append(f"| Verdict | {training_status} ({training_detail}) |")
lines.append("")
# --- Footer ---
lines.append("---")
lines.append(f"*Generated by GPU Test Suite v{__version__}*")
content = "\n".join(lines)
with open(output, "w") as f:
f.write(content)
self.console.print(f"[green]Markdown report saved to: {output}[/green]")
return output
def _extract_memory_results(self, results: dict) -> dict:
"""Extract memory benchmark data from either full-suite or single-test format."""
if "memory_bench" in results:
data = results["memory_bench"]
return data.get("memory", data) if isinstance(data, dict) else {}
if "benchmark" in results:
bench = results["benchmark"]
if isinstance(bench, dict) and "memory" in bench:
return bench["memory"]
return {}
def _extract_compute_results(self, results: dict) -> dict:
"""Extract compute benchmark data from either full-suite or single-test format."""
if "compute_bench" in results:
data = results["compute_bench"]
return data.get("compute", data) if isinstance(data, dict) else {}
if "benchmark" in results:
bench = results["benchmark"]
if isinstance(bench, dict) and "compute" in bench:
return bench["compute"]
return {}
@staticmethod
def _training_verdict(training: dict) -> tuple[str, str, list[str]]:
"""Return report status for both current and legacy training result schemas."""
tps = float(training.get("throughput_tokens_per_sec", 0) or 0)
if "passed" in training:
status = "PASS" if training.get("passed") else "FAIL"
return status, f"{tps:.0f} tokens/sec", []
required = ["passed", "step_jitter_pct", "distributed_mode", "loss_finite"]
missing = [k for k in required if k not in training]
return "UNVERIFIED", f"{tps:.0f} tokens/sec; legacy result lacks explicit acceptance verdict", missing
def _rdma_cfg_value(self, key: str, default: float) -> float:
try:
return float((self.config.get("rdma", {}) or {}).get(key, default))
except (TypeError, ValueError):
return default
def _rdma_bandwidth_verdict(self, row: dict) -> tuple[float, str]:
threshold = self._rdma_cfg_value("min_bandwidth_gbps", 47.0)
value = float(row.get("bandwidth_gbps", 0) or 0)
return threshold, "PASS" if value >= threshold else "FAIL"
def _rdma_latency_verdict(self, row: dict) -> tuple[float, str]:
name = row.get("test", "")
if name == "ib_write_lat":
threshold = self._rdma_cfg_value("max_write_latency_us", 2.0)
elif name == "ib_read_lat":
threshold = self._rdma_cfg_value("max_read_latency_us", 3.5)
else:
threshold = self._rdma_cfg_value("max_latency_us", 3.5)
value = float(row.get("latency_us", 0) or 0)
return threshold, "PASS" if 0 < value <= threshold else "FAIL"
def _rdma_legacy_note(self, rdma: dict) -> str:
"""Flag old RDMA result schemas whose embedded thresholds were looser."""
for row in rdma.get("bandwidth_tests", []) or []:
if row.get("min_required_gbps") != self._rdma_cfg_value("min_bandwidth_gbps", 47.0):
return (
"Legacy RDMA result re-evaluated with current PDF acceptance thresholds; "
"old WARN statuses and old 50GB/s/10us limits are not used for verdict."
)
for row in rdma.get("latency_tests", []) or []:
threshold, _ = self._rdma_latency_verdict(row)
if row.get("max_allowed_us") != threshold:
return (
"Legacy RDMA result re-evaluated with current PDF acceptance thresholds; "
"old WARN statuses and old 50GB/s/10us limits are not used for verdict."
)
return ""
def _rdma_failure_reasons(self, rdma: dict) -> list[str]:
failures = []
for row in rdma.get("bandwidth_tests", []) or []:
threshold, status = self._rdma_bandwidth_verdict(row)
if status != "PASS":
failures.append(
f"{row.get('test')} bandwidth {row.get('bandwidth_gbps', 0)}GB/s < {threshold:g}GB/s"
)
for row in rdma.get("latency_tests", []) or []:
threshold, status = self._rdma_latency_verdict(row)
if status != "PASS":
failures.append(
f"{row.get('test')} latency {row.get('latency_us', 0)}us > {threshold:g}us"
)
for row in rdma.get("ibping_tests", []) or []:
if row.get("status") != "PASS":
failures.append(f"{row.get('test')} failed")
return failures
@staticmethod
def _overall_acceptance_verdict(summary_items: list[tuple[str, str]]) -> tuple[str, list[tuple[str, str]], list[str]]:
"""PDF-style machine verdict: every required item must be present and PASS."""
required = [
"GPU Info",
"Health Check",
"Memory Bandwidth",
"Compute Throughput",
"NVLink/NVSwitch",
"NCCL",
"Stress Test",
"RDMA",
"DCGM",
"Training",
]
status_by_name = dict(summary_items)
missing = [name for name in required if name not in status_by_name]
failures = [
(name, status)
for name, status in summary_items
if name in required and not str(status).startswith("PASS")
]
verdict = "PASS" if not missing and not failures else "FAIL"
return verdict, failures, missing
def _build_summary(self, results: dict) -> list[tuple[str, str]]:
"""Build summary verdict list from results."""
items = []
# GPU Info
if "gpu_info" in results:
gi = results["gpu_info"]
if gi.get("error"):
items.append(("GPU Info", f"ERROR: {gi['error']}"))
else:
items.append(("GPU Info", f"PASS ({gi.get('gpu_count', '?')} GPUs detected)"))
# Health
if "health" in results:
h = results["health"]
if h.get("error"):
items.append(("Health Check", f"ERROR: {h['error']}"))
elif h.get("passed"):
items.append(("Health Check", "PASS"))
else:
items.append(("Health Check", "FAIL"))
# Memory Bandwidth
mem = self._extract_memory_results(results)
if mem:
if mem.get("error"):
items.append(("Memory Bandwidth", f"ERROR: {mem['error']}"))
elif mem.get("source") == "pytorch":
# PyTorch fallback can't reach HBM peak — report as WARN, not FAIL.
d2d = mem.get("d2d_bandwidth_gbps") or 0
items.append(("Memory Bandwidth", f"WARN ({d2d:.0f} GB/s via PyTorch fallback)"))
else:
eff = mem.get("d2d_efficiency_pct") or mem.get("efficiency_pct") or 0
verdict = "PASS" if eff >= 80 else ("WARN" if eff >= 60 else "FAIL")
items.append(("Memory Bandwidth", f"{verdict} ({eff:.1f}%)"))
# Compute
comp = self._extract_compute_results(results)
if comp:
if comp.get("error"):
items.append(("Compute Throughput", f"ERROR: {comp['error']}"))
else:
per_dtype = comp.get("per_dtype_tflops", {})
eff_pct = comp.get("efficiency_pct", {})
pass_thresholds = comp.get("pass_thresholds_tflops", {}) or {}
if pass_thresholds:
# Absolute TFLOPS judgment, mirroring the per-dtype table above.
rank = {"PASS": 0, "WARN": 1, "FAIL": 2}
worst_status = "PASS"
worst_dt = None
lowest_margin = None
for dt, thr in pass_thresholds.items():
val = per_dtype.get(dt)
if not isinstance(val, (int, float)):
continue
if val >= thr:
st = "PASS"
else:
st = "FAIL"
margin = val / thr if thr else 0
if lowest_margin is None or margin < lowest_margin:
lowest_margin = margin
worst_dt = dt
if rank[st] > rank[worst_status]:
worst_status = st
if worst_dt:
consistency = comp.get("consistency", {}) or {}
failed_consistency = [
(dt, row)
for dt, row in consistency.items()
if not row.get("passed", False)
]
if failed_consistency:
worst_status = "FAIL"
fail_dt, fail_row = failed_consistency[0]
items.append((
"Compute Throughput",
f"FAIL ({fail_dt.upper()} spread "
f"{fail_row.get('spread_pct', 0):.2f}% > "
f"{fail_row.get('max_allowed_pct', 3)}%)"
))
else:
items.append((
"Compute Throughput",
f"{worst_status} (worst {worst_dt.upper()} "
f"{per_dtype[worst_dt]:.0f} vs >= {pass_thresholds[worst_dt]})"
))
else:
items.append(("Compute Throughput", f"{worst_status}"))
else:
valid_effs = [v for v in eff_pct.values() if isinstance(v, (int, float)) and v > 0]
if valid_effs:
worst = min(valid_effs)
verdict = "PASS" if worst >= 80 else ("WARN" if worst >= 50 else "FAIL")
items.append(("Compute Throughput", f"{verdict} (worst {worst:.1f}%)"))
else:
items.append(("Compute Throughput", "N/A"))
# NCCL
if "nvlink" in results:
nvl = results["nvlink"]
if nvl.get("error"):
items.append(("NVLink/NVSwitch", f"ERROR: {nvl['error']}"))
elif nvl.get("passed"):
items.append(("NVLink/NVSwitch", "PASS"))
else:
items.append(("NVLink/NVSwitch", "FAIL"))
if "dcgm" in results:
d = results["dcgm"]
if d.get("error"):
items.append(("DCGM", f"ERROR: {d['error']}"))
elif d.get("passed"):
items.append(("DCGM", "PASS"))
else:
items.append(("DCGM", "FAIL"))
# NCCL
if "nccl" in results:
n = results["nccl"]
if n.get("error"):
items.append(("NCCL", f"ERROR: {n['error']}"))
elif n.get("source") == "torchrun_fallback":
items.append(("NCCL", "FAIL (no nccl-tests bus BW)"))
elif n.get("passed"):
items.append(("NCCL", "PASS"))
else:
items.append(("NCCL", "FAIL"))
if "multinode_nccl" in results:
mn = results["multinode_nccl"]
if mn.get("error"):
items.append(("Multi-node NCCL", f"ERROR: {mn['error']}"))
elif mn.get("passed"):
items.append(("Multi-node NCCL", "PASS"))
else:
items.append(("Multi-node NCCL", "FAIL"))
# Stress
if "stress" in results:
s = results["stress"]
if s.get("error"):
items.append(("Stress Test", f"ERROR: {s['error']}"))
elif s.get("passed"):
items.append(("Stress Test", "PASS"))
else:
items.append(("Stress Test", "FAIL"))
# RDMA
if "rdma" in results:
r = results["rdma"]
if r.get("skipped") or r.get("status") == "SKIP":
items.append(("RDMA", f"SKIP ({r.get('reason', 'no IB hardware')})"))
elif r.get("error"):
items.append(("RDMA", f"ERROR: {r['error']}"))
elif r.get("passed"):
items.append(("RDMA", "PASS"))
else:
items.append(("RDMA", "FAIL"))
# Training
if "training" in results:
t = results["training"]
if t.get("error"):
items.append(("Training", f"ERROR: {t['error']}"))
else:
status, detail, _missing = self._training_verdict(t)
items.append(("Training", f"{status} ({detail})"))
return items