test_gpu_scripts/modules/rdma_test.py
qinyusen 1c6ba4809a add: stress test (gpu-burn) and RDMA/IB test modules
Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-04-25 17:23:57 +08:00

241 lines
9.5 KiB
Python

"""RDMA / InfiniBand bandwidth and latency test module."""
import os
import shutil
import subprocess
from datetime import datetime
from typing import Optional, List
from rich.console import Console
from rich.table import Table
class RDMATest:
def __init__(self, config: dict):
self.config = config
self.console = Console()
self.rdma_cfg = config.get("rdma", {})
def _find_tool(self, name: str) -> Optional[str]:
p = shutil.which(name)
if p:
return p
return None
def _get_ib_devices(self) -> List[str]:
devices = []
ib_path = "/sys/class/infiniband"
if os.path.isdir(ib_path):
devices = sorted(os.listdir(ib_path))
return devices
def _get_ib_ports(self, device: str) -> List[str]:
ports = []
ports_dir = f"/sys/class/infiniband/{device}/ports"
if os.path.isdir(ports_dir):
ports = sorted(os.listdir(ports_dir))
return ports
def run(self) -> dict:
devices = self._get_ib_devices()
if not devices:
self.console.print("[yellow]No InfiniBand devices found[/yellow]")
return {"error": "no_ib_devices", "passed": False}
self.console.print(f"[cyan]RDMA Test - Devices: {', '.join(devices)}[/cyan]")
device_info = self._collect_device_info(devices)
bw_results = self._run_bandwidth_tests(devices)
latency_results = self._run_latency_tests(devices)
all_passed = all(
r.get("status") == "PASS"
for r in bw_results + latency_results
if isinstance(r, dict)
)
return {
"passed": all_passed,
"devices": device_info,
"bandwidth_tests": bw_results,
"latency_tests": latency_results,
"timestamp": datetime.now().isoformat(),
}
def _collect_device_info(self, devices: List[str]) -> List[dict]:
info = []
for dev in devices:
dev_info = {"name": dev, "ports": []}
ports = self._get_ib_ports(dev)
for port in ports:
port_info = {"port": port}
rate_path = f"/sys/class/infiniband/{dev}/ports/{port}/rate"
state_path = f"/sys/class/infiniband/{dev}/ports/{port}/state"
phys_state_path = f"/sys/class/infiniband/{dev}/ports/{port}/phys_state"
gid_path = f"/sys/class/infiniband/{dev}/ports/{port}/gids/0"
for label, path in [("rate", rate_path), ("state", state_path),
("phys_state", phys_state_path), ("gid", gid_path)]:
try:
with open(path) as f:
port_info[label] = f.read().strip()
except (FileNotFoundError, PermissionError):
port_info[label] = "N/A"
dev_info["ports"].append(port_info)
info.append(dev_info)
return info
def _run_ib_command(self, cmd: List[str], timeout: int = 60) -> dict:
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if r.returncode == 0:
return {"status": "PASS", "output": r.stdout.strip()}
return {"status": "FAIL", "error": r.stderr.strip()[:200]}
except subprocess.TimeoutExpired:
return {"status": "FAIL", "error": "timeout"}
except FileNotFoundError:
return {"status": "SKIP", "error": "tool not found"}
except Exception as e:
return {"status": "FAIL", "error": str(e)}
def _run_bandwidth_tests(self, devices: List[str]) -> List[dict]:
results = []
ib_write_bw = self._find_tool("ib_write_bw")
ib_read_bw = self._find_tool("ib_read_bw")
min_bw = self.rdma_cfg.get("min_bandwidth_gbps", 50)
msg_size = self.rdma_cfg.get("msg_size", 65536)
iters = self.rdma_cfg.get("ib_iterations", 1000)
dx = self.rdma_cfg.get("ib_device", None)
port = self.rdma_cfg.get("ib_port", 1)
for tool, label in [(ib_write_bw, "ib_write_bw"), (ib_read_bw, "ib_read_bw")]:
if not tool:
results.append({"test": label, "status": "SKIP", "error": "not installed"})
continue
server_cmd = [tool, "-d", dx or devices[0], "-i", str(port), "-s", str(msg_size)]
client_cmd = server_cmd + ["localhost"]
server = subprocess.Popen(server_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
import time
time.sleep(1)
try:
client = subprocess.run(client_cmd, capture_output=True, text=True, timeout=60)
server.wait(timeout=10)
output = client.stdout + server.stdout.read() if server.stdout else ""
bw_mbps = 0
for line in output.split("\n"):
line = line.strip()
if not line:
continue
parts = line.split()
try:
bw_mbps = max(bw_mbps, float(parts[-1]))
except (ValueError, IndexError):
continue
bw_gbps = bw_mbps / 1000 if bw_mbps else 0
status = "PASS" if bw_gbps >= min_bw else "WARN"
results.append({
"test": label,
"status": status,
"bandwidth_gbps": round(bw_gbps, 2),
"min_required_gbps": min_bw,
})
except Exception as e:
server.kill()
results.append({"test": label, "status": "FAIL", "error": str(e)})
return results
def _run_latency_tests(self, devices: List[str]) -> List[dict]:
results = []
ib_write_lat = self._find_tool("ib_write_lat")
ib_read_lat = self._find_tool("ib_read_lat")
max_lat_us = self.rdma_cfg.get("max_latency_us", 10)
dx = self.rdma_cfg.get("ib_device", None)
port = self.rdma_cfg.get("ib_port", 1)
for tool, label in [(ib_write_lat, "ib_write_lat"), (ib_read_lat, "ib_read_lat")]:
if not tool:
results.append({"test": label, "status": "SKIP", "error": "not installed"})
continue
server_cmd = [tool, "-d", dx or devices[0], "-i", str(port)]
client_cmd = server_cmd + ["localhost"]
server = subprocess.Popen(server_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
import time
time.sleep(1)
try:
client = subprocess.run(client_cmd, capture_output=True, text=True, timeout=60)
server.wait(timeout=10)
output = client.stdout + server.stdout.read() if server.stdout else ""
lat_us = 0
for line in output.split("\n"):
parts = line.strip().split()
try:
lat_us = max(lat_us, float(parts[-1]))
except (ValueError, IndexError):
continue
status = "PASS" if 0 < lat_us <= max_lat_us else ("WARN" if lat_us > 0 else "FAIL")
results.append({
"test": label,
"status": status,
"latency_us": round(lat_us, 2),
"max_allowed_us": max_lat_us,
})
except Exception as e:
server.kill()
results.append({"test": label, "status": "FAIL", "error": str(e)})
return results
@staticmethod
def print_results(results: dict, console: Console = None):
c = console or Console()
if "error" in results:
c.print(f"[bold red]Error: {results['error']}[/bold red]")
return
devices = results.get("devices", [])
c.print(f"\n[bold cyan]RDMA/InfiniBand Test Results[/bold cyan]")
c.print(f" Devices found: {len(devices)}")
for dev in devices:
c.print(f"\n [bold]{dev['name']}[/bold]")
for p in dev.get("ports", []):
state = p.get("state", "N/A")
color = "green" if "Active" in state else "red"
c.print(f" Port {p['port']}: [{color}]{state}[/{color}] | "
f"Rate: {p.get('rate', 'N/A')} | GID: {p.get('gid', 'N/A')[:20]}")
bw_tests = results.get("bandwidth_tests", [])
if bw_tests:
c.print("\n [bold]Bandwidth Tests[/bold]")
for t in bw_tests:
status = t.get("status", "SKIP")
sc = "green" if status == "PASS" else ("yellow" if status == "WARN" else "red")
bw = t.get("bandwidth_gbps", 0)
c.print(f" {t['test']}: [{sc}]{status}[/{sc}] "
f"({bw:.2f} GB/s, min: {t.get('min_required_gbps', 'N/A')} GB/s)" if status != "SKIP"
else f" {t['test']}: [dim]SKIPPED[/dim]")
lat_tests = results.get("latency_tests", [])
if lat_tests:
c.print("\n [bold]Latency Tests[/bold]")
for t in lat_tests:
status = t.get("status", "SKIP")
sc = "green" if status == "PASS" else ("yellow" if status == "WARN" else "red")
lat = t.get("latency_us", 0)
c.print(f" {t['test']}: [{sc}]{status}[/{sc}] "
f"({lat:.2f} us, max: {t.get('max_allowed_us', 'N/A')} us)" if status != "SKIP"
else f" {t['test']}: [dim]SKIPPED[/dim]")