"""NVLink / NVSwitch production acceptance checks.""" import re import shutil import subprocess from datetime import datetime from typing import Optional from rich.console import Console from rich.table import Table class NVLinkTest: def __init__(self, config: dict): self.config = config self.console = Console() self.cfg = config.get("nvlink", {}) def _run(self, args: list[str], timeout: int = 60) -> tuple[int, str, str]: if not shutil.which("nvidia-smi"): return 127, "", "nvidia-smi not found" r = subprocess.run(["nvidia-smi", *args], capture_output=True, text=True, timeout=timeout) return r.returncode, r.stdout, r.stderr def run(self) -> dict: expected_links = int(self.cfg.get("expected_links_per_gpu", 18)) expected_speed = float(self.cfg.get("expected_link_speed_gbps", 25)) require_zero_errors = bool(self.cfg.get("require_zero_errors", True)) rc_s, out_s, err_s = self._run(["nvlink", "-s"]) rc_c, out_c, err_c = self._run(["nvlink", "-c"]) rc_e, out_e, err_e = self._run(["nvlink", "-e"]) if rc_s != 0: return { "passed": False, "error": (err_s or out_s or "nvidia-smi nvlink -s failed")[:1000], "timestamp": datetime.now().isoformat(), } links = self._parse_status(out_s) if not links: return { "passed": False, "error": "no NVLink status entries parsed from nvidia-smi nvlink -s", "raw_status": out_s[-4000:], "timestamp": datetime.now().isoformat(), } speeds = self._parse_speeds(out_c) if rc_c == 0 else {} status_speeds = self._parse_speeds(out_s) for gpu, gpu_speeds in status_speeds.items(): speeds.setdefault(gpu, {}).update({k: v for k, v in gpu_speeds.items() if k not in speeds.get(gpu, {})}) errors = self._parse_errors(out_e) if rc_e == 0 else {} gpu_results = [] overall = True for gpu, gpu_links in sorted(links.items(), key=lambda x: int(x[0])): active = sum(1 for l in gpu_links.values() if l.get("active")) inactive = [lid for lid, l in gpu_links.items() if not l.get("active")] speed_bad = [] for lid in gpu_links: speed = speeds.get(gpu, {}).get(lid) if speed is not None and speed < expected_speed: speed_bad.append({"link": lid, "speed_gbps": speed}) err_bad = [] if require_zero_errors: for lid, counters in errors.get(gpu, {}).items(): total = sum(v for v in counters.values() if isinstance(v, int)) if total: err_bad.append({"link": lid, "counters": counters}) passed = active == expected_links and not inactive and not speed_bad and not err_bad if not passed: overall = False gpu_results.append({ "gpu": int(gpu), "active_links": active, "expected_links": expected_links, "inactive_links": inactive, "speed_issues": speed_bad, "error_issues": err_bad, "passed": passed, }) return { "passed": overall, "expected_links_per_gpu": expected_links, "expected_link_speed_gbps": expected_speed, "require_zero_errors": require_zero_errors, "gpus": gpu_results, "raw_status": out_s[-4000:], "raw_speed": out_c[-4000:] if out_c else "", "raw_errors": out_e[-4000:] if out_e else "", "timestamp": datetime.now().isoformat(), } @staticmethod def _parse_status(text: str) -> dict[str, dict[str, dict]]: result: dict[str, dict[str, dict]] = {} gpu = None for line in text.splitlines(): m_gpu = re.search(r"GPU\s+(\d+)", line, re.I) if m_gpu: gpu = m_gpu.group(1) result.setdefault(gpu, {}) continue if gpu is None: continue m_link = re.search(r"Link\s+(\d+).*?(Active|Inactive|Disabled|Off|Down)", line, re.I) if m_link: state = m_link.group(2) result[gpu][m_link.group(1)] = { "state": state, "active": state.lower() == "active", "raw": line.strip(), } continue m_speed = re.search(r"Link\s+(\d+).*?([0-9.]+)\s*GB/s", line, re.I) if m_speed: result[gpu][m_speed.group(1)] = { "state": "Active", "active": True, "raw": line.strip(), } return result @staticmethod def _parse_speeds(text: str) -> dict[str, dict[str, float]]: result: dict[str, dict[str, float]] = {} gpu = None for line in text.splitlines(): m_gpu = re.search(r"GPU\s+(\d+)", line, re.I) if m_gpu: gpu = m_gpu.group(1) result.setdefault(gpu, {}) continue if gpu is None: continue m_link = re.search(r"Link\s+(\d+).*?([0-9.]+)\s*GB/s", line, re.I) if m_link: result[gpu][m_link.group(1)] = float(m_link.group(2)) return result @staticmethod def _parse_errors(text: str) -> dict[str, dict[str, dict[str, int]]]: result: dict[str, dict[str, dict[str, int]]] = {} gpu = None link = None for line in text.splitlines(): m_gpu = re.search(r"GPU\s+(\d+)", line, re.I) if m_gpu: gpu = m_gpu.group(1) result.setdefault(gpu, {}) continue m_link = re.search(r"Link\s+(\d+)", line, re.I) if m_link and gpu is not None: link = m_link.group(1) result[gpu].setdefault(link, {}) if gpu is None or link is None: continue for name in ("CRC", "Replay", "Recovery"): m = re.search(rf"{name}[^0-9]*(\d+)", line, re.I) if m: result[gpu][link][name.lower()] = int(m.group(1)) return result @staticmethod def print_results(results: dict, console: Optional[Console] = None): c = console or Console() if results.get("error"): c.print(f"[bold red]NVLink error: {results['error']}[/bold red]") return passed = results.get("passed", False) c.print("[bold green]✓ NVLink PASSED[/bold green]" if passed else "[bold red]✗ NVLink FAILED[/bold red]") table = Table(box=None, padding=(0, 1)) table.add_column("GPU", style="bold") table.add_column("Active Links", justify="right") table.add_column("Issues") for g in results.get("gpus", []): issues = [] if g.get("inactive_links"): issues.append("inactive=" + ",".join(g["inactive_links"])) if g.get("speed_issues"): issues.append(f"speed={len(g['speed_issues'])}") if g.get("error_issues"): issues.append(f"errors={len(g['error_issues'])}") table.add_row(str(g["gpu"]), f"{g['active_links']}/{g['expected_links']}", "; ".join(issues) or "OK") c.print(table)