"""Report generation module - export test results to JSON/HTML/Markdown."""
import json
import os
from datetime import datetime
from pathlib import Path
from typing import Optional
try:
from importlib.metadata import version as _pkg_version
__version__ = _pkg_version("gpu-server-test-suite")
except Exception:
__version__ = "0.2.0"
from rich.console import Console
from rich.panel import Panel
HTML_TEMPLATE = """
GPU Test Report - {timestamp}
{content}
"""
class ReportGenerator:
def __init__(self, config: dict):
self.config = config
self.console = Console()
self.report_cfg = config.get("report", {})
def generate(self, results: dict, fmt: str = None, output: str = None) -> str:
fmt = fmt or self.report_cfg.get("format", "json")
output_dir = self.report_cfg.get("output_dir", "./reports")
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if not output:
output = os.path.join(output_dir, f"gpu_report_{timestamp}.{fmt}")
if fmt == "json":
return self._generate_json(results, output)
elif fmt == "html":
return self._generate_html(results, output)
elif fmt == "md":
return self._generate_markdown(results, output)
else:
self.console.print(f"[red]Unsupported format: {fmt}[/red]")
return ""
def _generate_json(self, results: dict, output: str) -> str:
with open(output, "w") as f:
json.dump(results, f, indent=2, default=str)
self.console.print(f"[green]JSON report saved to: {output}[/green]")
return output
def _generate_html(self, results: dict, output: str) -> str:
import socket
hostname = socket.gethostname()
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
sections = []
if "gpu_info" in results:
gpus = results["gpu_info"].get("gpus", [])
rows = ""
for g in gpus:
rows += f"| GPU {g['index']} | {g['name']} | "
rows += f"{g['vram_total_mb']} MB | "
rows += f"{g['temperature']}°C | "
rows += f"{g['clock_sm']} MHz |
"
sections.append(
f'GPU Information
'
f'
Driver: {results["gpu_info"].get("driver_version", "N/A")} | '
f'CUDA: {results["gpu_info"].get("cuda_version", "N/A")} | '
f'Count: {len(gpus)}
'
f'
| GPU | Model | VRAM | Temp | SM Clock |
'
f'{rows}
'
)
if "health" in results:
h = results["health"]
passed = h.get("passed", False)
cls = "pass" if passed else "fail"
txt = "ALL PASSED" if passed else "SOME CHECKS FAILED"
sections.append(f'{txt}
')
if "benchmark" in results and "memory" in results["benchmark"]:
mem = results["benchmark"]["memory"]
sections.append(
f'Memory Bandwidth
'
f'
{mem.get("d2d_bandwidth_gbps", "N/A")} GB/s
'
f'
D2D (HBM)
'
f'
{mem.get("efficiency_pct", "N/A")}%
'
f'
Efficiency vs Peak ({mem.get("peak_bandwidth_gbps", "N/A")} GB/s)
'
f'
'
)
if "benchmark" in results and "compute" in results["benchmark"]:
comp = results["benchmark"]["compute"]
dtype_rows = ""
per_dtype = comp.get("per_dtype_tflops", {})
eff = comp.get("efficiency_pct", {})
for dt, tflops in per_dtype.items():
ef = eff.get(dt, 0)
cls = "pass" if ef >= 80 else ("warn" if ef >= 50 else "fail")
if isinstance(tflops, (int, float)):
dtype_rows += f'| {dt.upper()} | {tflops:.1f} TFLOPS | '
dtype_rows += f'{ef:.1f}% |
'
if dtype_rows:
sections.append(
f'Compute Throughput
'
f'
| DType | Achieved | Efficiency |
'
f'{dtype_rows}
'
)
if "training" in results:
t = results["training"]
sections.append(
f'Training Simulation
'
f'
{t.get("throughput_tokens_per_sec", "N/A")}
'
f'
Tokens/sec
'
f'
{t.get("avg_step_time_ms", "N/A")} ms
'
f'
Avg Step Time
'
f'
{t.get("peak_memory_gb", "N/A")} GB
'
f'
Peak Memory
'
f'
'
)
content = "\n".join(sections)
html = HTML_TEMPLATE.format(timestamp=timestamp, hostname=hostname, content=content)
with open(output, "w") as f:
f.write(html)
self.console.print(f"[green]HTML report saved to: {output}[/green]")
return output
# ------------------------------------------------------------------
# Markdown report
# ------------------------------------------------------------------
def _generate_markdown(self, results: dict, output: str) -> str:
import socket
hostname = socket.gethostname()
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
lines: list[str] = []
# --- Header ---
lines.append("# GPU Test Report\n")
lines.append(f"- **Date:** {timestamp}")
lines.append(f"- **Host:** {hostname}")
# Extract GPU info for header
gpu_info = results.get("gpu_info")
if gpu_info and not gpu_info.get("error"):
gpus = gpu_info.get("gpus", [])
gpu_name = gpus[0]["name"] if gpus else "Unknown"
lines.append(f"- **GPU:** {gpu_name} x{gpu_info.get('gpu_count', len(gpus))}")
lines.append(f"- **Driver:** {gpu_info.get('driver_version', 'N/A')} | "
f"**CUDA:** {gpu_info.get('cuda_version', 'N/A')}")
lines.append("")
# --- Summary table ---
summary_items = self._build_summary(results)
if summary_items:
lines.append("## Summary\n")
lines.append("| Test | Result |")
lines.append("|------|--------|")
for name, verdict in summary_items:
lines.append(f"| {name} | {verdict} |")
lines.append("")
# --- GPU Information ---
if gpu_info and not gpu_info.get("error"):
lines.append("## GPU Information\n")
gpus = gpu_info.get("gpus", [])
lines.append("| GPU | Model | VRAM | Temp | Power | SM Clock |")
lines.append("|-----|-------|------|------|-------|----------|")
for g in gpus:
vram = f"{g.get('vram_total_mb', 0)} MB"
temp = f"{g.get('temperature', 'N/A')}C"
power = f"{g.get('power_draw', 0):.0f}/{g.get('power_limit', 0):.0f}W"
clock = f"{g.get('clock_sm', 0)} MHz"
lines.append(f"| {g['index']} | {g['name']} | {vram} | {temp} | {power} | {clock} |")
lines.append("")
# --- Health Check ---
health = results.get("health")
if health and not health.get("error"):
lines.append("## Health Check\n")
passed = health.get("passed", False)
lines.append(f"**Overall: {'PASS' if passed else 'FAIL'}**\n")
gpu_health = health.get("gpu_health", [])
if gpu_health:
lines.append("| GPU | Temp | Power | ECC | PCIe | Throttle | Status |")
lines.append("|-----|------|-------|-----|------|----------|--------|")
for gh in gpu_health:
checks = gh.get("checks", {})
temp_c = checks.get("temperature", {})
pwr = checks.get("power", {})
ecc = checks.get("ecc_errors", {})
pcie = checks.get("pcie_link", {})
throttle = checks.get("throttling", {})
temp_str = f"{temp_c.get('value', '?')}C {temp_c.get('status', '')}"
pwr_str = f"{pwr.get('value', 0):.0f}W {pwr.get('status', '')}"
ecc_str = f"S:{ecc.get('single', 0)} D:{ecc.get('double', 0)}"
pcie_str = f"Gen{pcie.get('gen', '?')}x{pcie.get('width', '?')}"
throt_str = throttle.get("status", "?")
status = gh.get("status", "?")
lines.append(f"| {gh['index']} | {temp_str} | {pwr_str} | "
f"{ecc_str} | {pcie_str} | {throt_str} | **{status}** |")
lines.append("")
# --- Memory Bandwidth ---
mem_data = self._extract_memory_results(results)
if mem_data and not mem_data.get("error"):
lines.append("## Memory Bandwidth\n")
lines.append(f"Source: {mem_data.get('source', 'unknown')}\n")
lines.append("| Metric | Value | Peak | Efficiency |")
lines.append("|--------|-------|------|------------|")
d2d = mem_data.get("d2d_bandwidth_gbps") or 0
h2d = mem_data.get("h2d_bandwidth_gbps") or 0
d2h = mem_data.get("d2h_bandwidth_gbps") or 0
# New format with per-metric peaks
h2d_peak = mem_data.get("h2d_peak_gbps") or 0
d2h_peak = mem_data.get("d2h_peak_gbps") or 0
d2d_peak = mem_data.get("d2d_peak_gbps") or 0
h2d_eff = mem_data.get("h2d_efficiency_pct") or 0
d2h_eff = mem_data.get("d2h_efficiency_pct") or 0
d2d_eff = mem_data.get("d2d_efficiency_pct") or 0
# Fallback for old format
if not d2d_peak:
d2d_peak = mem_data.get("peak_bandwidth_gbps") or 0
d2d_eff = mem_data.get("efficiency_pct") or 0
lines.append(f"| H2D (PCIe) | {h2d:.1f} GB/s | {h2d_peak:.0f} GB/s | {h2d_eff:.1f}% |")
lines.append(f"| D2H (PCIe) | {d2h:.1f} GB/s | {d2h_peak:.0f} GB/s | {d2h_eff:.1f}% |")
lines.append(f"| D2D (NVLink) | {d2d:.1f} GB/s | {d2d_peak:.0f} GB/s | {d2d_eff:.1f}% |")
lines.append("")
# PyTorch fallback can't accurately measure HBM peak (intra-GPU copy_()
# only reaches ~20% of HBM bandwidth). When fallback is used, report
# the number but mark as WARN with a note instead of evaluating as FAIL.
if mem_data.get("source") == "pytorch":
lines.append(
f"**Verdict: WARN** (D2D {d2d:.1f} GB/s via PyTorch fallback; "
"nvbandwidth unavailable — figure is indicative only, not a true HBM peak)\n"
)
else:
# Tightened to match production acceptance: PASS >= 80%, WARN 60–80%, FAIL < 60%.
verdict = "PASS" if d2d_eff >= 80 else ("WARN" if d2d_eff >= 60 else "FAIL")
lines.append(f"**Verdict: {verdict}** (D2D efficiency {d2d_eff:.1f}%)\n")
# --- Compute Throughput ---
comp_data = self._extract_compute_results(results)
if comp_data and not comp_data.get("error"):
lines.append("## Compute Throughput\n")
per_dtype = comp_data.get("per_dtype_tflops", {})
peak_tflops = comp_data.get("peak_tflops", {})
eff_pct = comp_data.get("efficiency_pct", {})
# Absolute PASS thresholds (TFLOPS) from gpu_specs.compute_pass_thresholds_tflops.
# When present, override the legacy 80%-of-peak rule on a per-dtype basis.
pass_thresholds = comp_data.get("pass_thresholds_tflops", {}) or {}
use_abs = bool(pass_thresholds)
if use_abs:
lines.append("| DType | Achieved (TFLOPS) | Peak | Threshold | Status |")
else:
lines.append("| DType | Achieved (TFLOPS) | Peak | Efficiency | Status |")
lines.append("|-------|-------------------|------|------------|--------|")
worst_eff = 100.0
overall_status = "PASS"
rank = {"PASS": 0, "WARN": 1, "FAIL": 2, "SKIP": 0}
for dt, val in per_dtype.items():
if isinstance(val, str):
# skipped or error
lines.append(f"| {dt.upper()} | {val} | - | N/A | SKIP |")
else:
pk = peak_tflops.get(dt, 0)
ef = eff_pct.get(dt, 0)
if isinstance(ef, (int, float)) and ef > 0:
worst_eff = min(worst_eff, ef)
thr = pass_thresholds.get(dt)
if use_abs and thr:
if val >= thr:
status = "PASS"
elif val >= thr * 0.9:
status = "WARN"
else:
status = "FAIL"
lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | >= {thr} | {status} |")
else:
status = "PASS" if ef >= 80 else ("WARN" if ef >= 50 else "FAIL")
lines.append(f"| {dt.upper()} | {val:.1f} | {pk:.0f} | {ef:.1f}% | {status} |")
if rank.get(status, 0) > rank.get(overall_status, 0):
overall_status = status
lines.append("")
if use_abs:
lines.append(f"**Verdict: {overall_status}** (absolute TFLOPS thresholds; worst efficiency {worst_eff:.1f}%)\n")
else:
overall_status = "PASS" if worst_eff >= 80 else ("WARN" if worst_eff >= 50 else "FAIL")
lines.append(f"**Verdict: {overall_status}** (worst efficiency {worst_eff:.1f}%)\n")
# --- NCCL ---
nccl = results.get("nccl")
if nccl and not nccl.get("error"):
lines.append("## NCCL Multi-GPU\n")
lines.append(f"Source: {nccl.get('source', 'unknown')} | "
f"GPUs: {nccl.get('gpu_count', '?')}\n")
tests = nccl.get("tests", {})
if tests:
lines.append("| Operation | Bus BW (GB/s) | Threshold | Status |")
lines.append("|-----------|---------------|-----------|--------|")
for op, data in tests.items():
if isinstance(data, dict) and not data.get("error"):
bw = data.get("best_busbw_gbps", 0)
req = data.get("min_required_gbps", 0)
status = data.get("status", "?")
lines.append(f"| {op} | {bw:.1f} | >= {req:.0f} | {status} |")
elif isinstance(data, dict) and data.get("error"):
lines.append(f"| {op} | - | - | ERROR: {data['error']} |")
lines.append("")
passed = nccl.get("passed", False)
lines.append(f"**Overall: {'PASS' if passed else 'FAIL'}**\n")
# --- Stress Test ---
stress = results.get("stress")
if stress and not stress.get("error"):
lines.append("## Stress Test\n")
passed = stress.get("passed", False)
duration = stress.get("duration_sec") or 0
elapsed = stress.get("elapsed_sec") or 0
source = stress.get("source", "unknown")
lines.append(f"- **Source:** {source}")
lines.append(f"- **Duration:** {elapsed:.0f}s (requested {duration}s)")
lines.append(f"- **Result: {'PASS' if passed else 'FAIL'}**")
lines.append("")
# --- RDMA ---
rdma = results.get("rdma")
if rdma and (rdma.get("skipped") or rdma.get("status") == "SKIP"):
lines.append("## RDMA/InfiniBand\n")
lines.append(f"**Overall: SKIP** [{rdma.get('reason', 'no IB hardware detected')}]\n")
elif rdma and not rdma.get("error"):
lines.append("## RDMA/InfiniBand\n")
bw_tests = rdma.get("bandwidth_tests", [])
lat_tests = rdma.get("latency_tests", [])
if bw_tests or lat_tests:
lines.append("| Test | Value | Threshold | Status |")
lines.append("|------|-------|-----------|--------|")
for bt in bw_tests:
if not bt.get("error"):
lines.append(f"| {bt['test']} | {bt.get('bandwidth_gbps', 0):.1f} GB/s | "
f">= {bt.get('min_required_gbps', 0)} GB/s | {bt.get('status', '?')} |")
for lt in lat_tests:
if not lt.get("error"):
lines.append(f"| {lt['test']} | {lt.get('latency_us', 0):.2f} us | "
f"<= {lt.get('max_allowed_us', 0)} us | {lt.get('status', '?')} |")
lines.append("")
passed = rdma.get("passed", False)
lines.append(f"**Overall: {'PASS' if passed else 'FAIL'}**\n")
# --- Training ---
training = results.get("training")
if training and not training.get("error"):
lines.append("## Training Simulation\n")
lines.append("| Metric | Value |")
lines.append("|--------|-------|")
lines.append(f"| Model | {training.get('model', 'N/A')} |")
lines.append(f"| Params | {training.get('total_params_m', 0):.1f}M |")
lines.append(f"| Throughput | {training.get('throughput_tokens_per_sec', 0):.0f} tokens/sec |")
lines.append(f"| Avg Step Time | {training.get('avg_step_time_ms', 0):.1f} ms |")
lines.append(f"| Peak Memory | {training.get('peak_memory_gb', 0):.1f} GB |")
lines.append(f"| Final Loss | {training.get('final_loss', 'N/A')} |")
lines.append("")
# --- Footer ---
lines.append("---")
lines.append(f"*Generated by GPU Test Suite v{__version__}*")
content = "\n".join(lines)
with open(output, "w") as f:
f.write(content)
self.console.print(f"[green]Markdown report saved to: {output}[/green]")
return output
def _extract_memory_results(self, results: dict) -> dict:
"""Extract memory benchmark data from either full-suite or single-test format."""
if "memory_bench" in results:
data = results["memory_bench"]
return data.get("memory", data) if isinstance(data, dict) else {}
if "benchmark" in results:
bench = results["benchmark"]
if isinstance(bench, dict) and "memory" in bench:
return bench["memory"]
return {}
def _extract_compute_results(self, results: dict) -> dict:
"""Extract compute benchmark data from either full-suite or single-test format."""
if "compute_bench" in results:
data = results["compute_bench"]
return data.get("compute", data) if isinstance(data, dict) else {}
if "benchmark" in results:
bench = results["benchmark"]
if isinstance(bench, dict) and "compute" in bench:
return bench["compute"]
return {}
def _build_summary(self, results: dict) -> list[tuple[str, str]]:
"""Build summary verdict list from results."""
items = []
# GPU Info
if "gpu_info" in results:
gi = results["gpu_info"]
if gi.get("error"):
items.append(("GPU Info", f"ERROR: {gi['error']}"))
else:
items.append(("GPU Info", f"PASS ({gi.get('gpu_count', '?')} GPUs detected)"))
# Health
if "health" in results:
h = results["health"]
if h.get("error"):
items.append(("Health Check", f"ERROR: {h['error']}"))
elif h.get("passed"):
items.append(("Health Check", "PASS"))
else:
items.append(("Health Check", "FAIL"))
# Memory Bandwidth
mem = self._extract_memory_results(results)
if mem:
if mem.get("error"):
items.append(("Memory Bandwidth", f"ERROR: {mem['error']}"))
elif mem.get("source") == "pytorch":
# PyTorch fallback can't reach HBM peak — report as WARN, not FAIL.
d2d = mem.get("d2d_bandwidth_gbps") or 0
items.append(("Memory Bandwidth", f"WARN ({d2d:.0f} GB/s via PyTorch fallback)"))
else:
eff = mem.get("efficiency_pct") or 0
verdict = "PASS" if eff >= 80 else ("WARN" if eff >= 60 else "FAIL")
items.append(("Memory Bandwidth", f"{verdict} ({eff:.1f}%)"))
# Compute
comp = self._extract_compute_results(results)
if comp:
if comp.get("error"):
items.append(("Compute Throughput", f"ERROR: {comp['error']}"))
else:
per_dtype = comp.get("per_dtype_tflops", {})
eff_pct = comp.get("efficiency_pct", {})
pass_thresholds = comp.get("pass_thresholds_tflops", {}) or {}
if pass_thresholds:
# Absolute TFLOPS judgment, mirroring the per-dtype table above.
rank = {"PASS": 0, "WARN": 1, "FAIL": 2}
worst_status = "PASS"
worst_dt = None
for dt, thr in pass_thresholds.items():
val = per_dtype.get(dt)
if not isinstance(val, (int, float)):
continue
if val >= thr:
st = "PASS"
elif val >= thr * 0.9:
st = "WARN"
else:
st = "FAIL"
if rank[st] > rank[worst_status]:
worst_status = st
worst_dt = dt
if worst_dt:
items.append((
"Compute Throughput",
f"{worst_status} (worst {worst_dt.upper()} "
f"{per_dtype[worst_dt]:.0f} vs >= {pass_thresholds[worst_dt]})"
))
else:
items.append(("Compute Throughput", f"{worst_status}"))
else:
valid_effs = [v for v in eff_pct.values() if isinstance(v, (int, float)) and v > 0]
if valid_effs:
worst = min(valid_effs)
verdict = "PASS" if worst >= 80 else ("WARN" if worst >= 50 else "FAIL")
items.append(("Compute Throughput", f"{verdict} (worst {worst:.1f}%)"))
else:
items.append(("Compute Throughput", "N/A"))
# NCCL
if "nccl" in results:
n = results["nccl"]
if n.get("error"):
items.append(("NCCL", f"ERROR: {n['error']}"))
elif n.get("passed"):
items.append(("NCCL", "PASS"))
else:
items.append(("NCCL", "FAIL"))
# Stress
if "stress" in results:
s = results["stress"]
if s.get("error"):
items.append(("Stress Test", f"ERROR: {s['error']}"))
elif s.get("passed"):
items.append(("Stress Test", "PASS"))
else:
items.append(("Stress Test", "FAIL"))
# RDMA
if "rdma" in results:
r = results["rdma"]
if r.get("skipped") or r.get("status") == "SKIP":
items.append(("RDMA", f"SKIP ({r.get('reason', 'no IB hardware')})"))
elif r.get("error"):
items.append(("RDMA", f"ERROR: {r['error']}"))
elif r.get("passed"):
items.append(("RDMA", "PASS"))
else:
items.append(("RDMA", "FAIL"))
# Training
if "training" in results:
t = results["training"]
if t.get("error"):
items.append(("Training", f"ERROR: {t['error']}"))
else:
tps = t.get("throughput_tokens_per_sec", 0)
items.append(("Training", f"PASS ({tps:.0f} tokens/sec)"))
return items