#!/usr/bin/env python3 """ BPU Model Performance Benchmark Tool Runs hrt_model_exec perf for each model at thread counts 1,2,3,4 """ import argparse import json import re import subprocess import sys from pathlib import Path HRT_MODEL_EXEC = "hrt_model_exec" THREAD_COUNTS = [1, 2, 3, 4] def run_perf(model_path: str, thread_num: int, frame_count: int = 200) -> dict: """Run hrt_model_exec perf and return parsed results.""" cmd = [ HRT_MODEL_EXEC, "perf", "--model_file", model_path, "--thread_num", str(thread_num), "--frame_count", str(frame_count), ] result = subprocess.run(cmd, capture_output=True, text=True) output = result.stdout + result.stderr perf = { "thread_num": thread_num, "frame_count": frame_count, "run_time_ms": None, "total_latency_ms": None, "avg_latency_ms": None, "fps": None, "raw_output": output, "returncode": result.returncode, } m = re.search(r"Program run time:\s*([\d.]+)\s*ms", output) if m: perf["run_time_ms"] = float(m.group(1)) m = re.search(r"Frame totally latency is:\s*([\d.]+)\s*ms", output) if m: perf["total_latency_ms"] = float(m.group(1)) m = re.search(r"Average\s+latency\s+is:\s*([\d.]+)\s*ms", output) if m: perf["avg_latency_ms"] = float(m.group(1)) m = re.search(r"Frame\s+rate\s+is:\s*([\d.]+)\s*FPS", output) if m: perf["fps"] = float(m.group(1)) return perf def print_table(results: list): """Print results as a human-readable table.""" # 动态计算 model 列宽 max_name_len = max(len(e["model_name"]) for e in results) col_widths = [max(max_name_len, 10), 9, 16, 10] headers = ["Model", "Threads", "Avg Latency(ms)", "FPS"] sep = "+" + "+".join("-" * (w + 2) for w in col_widths) + "+" header_row = "|" + "|".join( f" {h:<{w}} " for h, w in zip(headers, col_widths) ) + "|" for entry in results: name = entry["model_name"] # 每个模型单独打印表头 print(sep) print(header_row) print(sep) for p in entry["perf_results"]: avg = f"{p['avg_latency_ms']:.3f}" if p["avg_latency_ms"] is not None else "N/A" fps = f"{p['fps']:.2f}" if p["fps"] is not None else "N/A" row = [name, str(p["thread_num"]), avg, fps] print("|" + "|".join( f" {v:<{w}} " for v, w in zip(row, col_widths) ) + "|") name = "" print(sep) def validate_config(config: dict, input_dir: Path) -> list: """Print and validate task.json, return normalized model list. Exit on error.""" print("=" * 60) print("task.json content:") print(json.dumps(config, indent=2, ensure_ascii=False)) print("=" * 60) errors = [] # --- frame_count --- if "frame_count" in config: if not isinstance(config["frame_count"], int) or config["frame_count"] <= 0: errors.append(" [frame_count] must be a positive integer") # --- 判断格式 --- has_single = "model_relative_path" in config has_multi = "models" in config if not has_single and not has_multi: errors.append(" missing required field: 'model_relative_path' or 'models'") for e in errors: print(f"[ERROR] {e}", file=sys.stderr) sys.exit(1) if has_single and has_multi: errors.append(" ambiguous: both 'model_relative_path' and 'models' are present, use one") models = [] if has_single: rel = config["model_relative_path"] if not isinstance(rel, str) or not rel.strip(): errors.append(" [model_relative_path] must be a non-empty string") elif not rel.endswith((".hbm", ".bin")): errors.append(f" [model_relative_path] unsupported extension: '{rel}' (expected .hbm or .bin)") else: models = [{"name": Path(rel).name, "path": rel}] if has_multi: if not isinstance(config["models"], list) or len(config["models"]) == 0: errors.append(" [models] must be a non-empty list") else: for i, m in enumerate(config["models"]): prefix = f" [models[{i}]]" if not isinstance(m, dict): errors.append(f"{prefix} each entry must be an object") continue if "path" not in m: errors.append(f"{prefix} missing required field 'path'") elif not isinstance(m["path"], str) or not m["path"].strip(): errors.append(f"{prefix} 'path' must be a non-empty string") elif not m["path"].endswith((".hbm", ".bin")): errors.append(f"{prefix} unsupported extension: '{m['path']}' (expected .hbm or .bin)") else: models.append({"name": m.get("name", Path(m["path"]).name), "path": m["path"]}) if errors: for e in errors: print(f"[ERROR] {e}", file=sys.stderr) sys.exit(1) # --- 文件存在性检查 --- missing = [] for m in models: full = input_dir / m["path"] if not full.exists(): missing.append(f" model file not found: {full}") if missing: for e in missing: print(f"[ERROR] {e}", file=sys.stderr) sys.exit(1) print(f"[OK] {len(models)} model(s) validated, frame_count={config.get('frame_count', 200)}\n") return models def main(): parser = argparse.ArgumentParser(description="BPU model perf benchmark") parser.add_argument("--input", required=True, help="Input JSON file") parser.add_argument("--output", required=True, help="Output JSON file") args = parser.parse_args() if not Path(args.input).exists(): print(f"[ERROR] input file not found: {args.input}", file=sys.stderr) sys.exit(1) with open(args.input) as f: config = json.load(f) input_dir = Path(args.input).parent frame_count = config.get("frame_count", 200) models = validate_config(config, input_dir) if not models: print("[ERROR] no models specified in input JSON", file=sys.stderr) sys.exit(1) output_results = [] for model in models: rel_path = model["path"] model_path = str(input_dir / rel_path) model_name = model.get("name", Path(rel_path).name) print(f"\n[Benchmarking] {model_name} ({model_path})") perf_results = [] for t in THREAD_COUNTS: print(f" thread_num={t} ...", end=" ", flush=True) p = run_perf(model_path, t, frame_count) perf_results.append(p) if p["fps"] is not None: print(f"FPS={p['fps']:.2f} avg_latency={p['avg_latency_ms']:.3f}ms") else: print("FAILED (check raw_output in result JSON)") output_results.append({ "model_name": model_name, "model_path": model_path, "perf_results": perf_results, }) print("\n" + "=" * 80) print_table(output_results) output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w") as f: json.dump(output_results, f, indent=2) print(f"\nResults saved to: {args.output}") if __name__ == "__main__": main()