SmolVLA_Tools/export_tools/export_smolvla.py
2026-03-03 16:24:02 +08:00

743 lines
25 KiB
Python

"""
SmolVLA Model Export Tool
This module exports SmolVLA models for BPU deployment, including:
- Vision encoder with connector
- VLM expert model (KV cache generation)
- Action expert model (denoising)
- Calibration data preparation
- Configuration file generation
"""
import os
import copy
import random
import argparse
from pathlib import Path
from typing import Dict, List, Tuple, Any
import torch
import torch.nn as nn
import numpy as np
from tqdm import tqdm
from lerobot.policies.smolvla.modeling_smolvla import (
SmolVLAPolicy,
make_att_2d_masks,
)
from lerobot.datasets.lerobot_dataset import LeRobotDataset
# Constants
DEFAULT_REPO_ID = "adjust_bottle_clean_50_fps15_1instruction"
DEFAULT_DEVICE = "cuda:0"
DEFAULT_NUM_CALIBRATION_SAMPLES = 10
# Model component names
ALL_OUTPUT_NAME = "board_outputs_all"
TEST_DATA_NAME = "e2e_test_datas"
STATE_NORM_NAME = "state_normalize_unnormalize"
STATE_PROJ_NAME = "state_proj"
VISION_ENCODER_NAME = "vlm_vision_encoder_with_connecter"
VLM_EXPERT_MODEL_NAME = "vlm_expert"
ACTION_EXPERT_MODEL_NAME = "action_expert"
VLM_EXPERT_EMBEDDING = "language_embedding_matrix"
def parse_arguments() -> argparse.Namespace:
"""Parse command line arguments."""
repo_id = "adjust_bottle_clean_50_fps15_1instruction"
parser = argparse.ArgumentParser(description="Export SmolVLA models for BPU deployment")
parser.add_argument("--repo-id", type=str, default=repo_id, help="Repository ID for the model and dataset")
parser.add_argument("--smolvla-model-path", type=str, default=f"/home/chao.wu/SmolVLA_RoboTwin2_BPU/train_result_rtw15fps_1instruction/{repo_id}/checkpoints/040000/pretrained_model", help="Path to pretrained SmolVLA model")
parser.add_argument("--lerobot-dataset-path", type=str, default=f"/home/chao.wu/SmolVLA_RoboTwin2_BPU/huggingface/lerobot/rtw15fps_1instruction/{repo_id}", help="Path to LeRobot dataset")
parser.add_argument("--export-path", type=str, default="export_dir", help="Output directory for exported models")
parser.add_argument("--jobs", type=int, default=32, help="Number of parallel jobs for compilation")
parser.add_argument("--march", type=str, default="nash-m", help="Target architecture for BPU")
parser.add_argument("--debug", type=str, default="False", choices=["True", "False"], help="Enable debug mode")
parser.add_argument("--device", type=str, default=DEFAULT_DEVICE, help="Device to use (e.g., 'cuda:0', 'cpu')")
parser.add_argument("--num-calibration-samples", type=int, default=DEFAULT_NUM_CALIBRATION_SAMPLES, help="Number of calibration samples to use")
return parser.parse_args([])
def main():
"""Main entry point."""
# Set environment variables
os.environ["CUDA_VISIBLE_DEVICES"] = "4"
os.environ["HTTPS_PROXY"] = "http://192.168.16.68:18000"
args = parse_arguments()
# Create exporter and run
exporter = SmolVLAExporter(
model_path=args.smolvla_model_path,
dataset_path=args.lerobot_dataset_path,
export_path=args.export_path,
device=args.device,
num_calibration_samples=args.num_calibration_samples,
march=args.march,
jobs=args.jobs,
debug=args.debug,
)
exporter.export_all()
class BPUKVCache(nn.Module):
"""Wrapper for KV cache generation model."""
def __init__(self, policy: SmolVLAPolicy):
super().__init__()
self.policy = policy
def forward(
self,
prefix_att_2d_masks: torch.Tensor,
prefix_position_ids: torch.Tensor,
prefix_embs: torch.Tensor,
) -> List[torch.Tensor]:
"""Generate KV cache from prefix embeddings."""
_, past_key_values = self.policy.model.vlm_with_expert.forward(
attention_mask=prefix_att_2d_masks,
position_ids=prefix_position_ids,
past_key_values=None,
inputs_embeds=[prefix_embs, None],
use_cache=self.policy.model.config.use_cache,
fill_kv_cache=True,
)
results = []
for i in range(len(past_key_values)):
results.append(past_key_values[i]['key_states'])
results.append(past_key_values[i]['value_states'])
return results
class BPUDenoise(nn.Module):
"""Wrapper for denoising model."""
def __init__(self, policy: SmolVLAPolicy):
super().__init__()
self.policy = policy
def forward(
self,
prefix_pad_masks: torch.Tensor,
x_t: torch.Tensor,
expanded_time: torch.Tensor,
k_0: torch.Tensor,
v_0: torch.Tensor,
k_1: torch.Tensor,
v_1: torch.Tensor,
k_2: torch.Tensor,
v_2: torch.Tensor,
k_3: torch.Tensor,
v_3: torch.Tensor,
k_4: torch.Tensor,
v_4: torch.Tensor,
k_5: torch.Tensor,
v_5: torch.Tensor,
k_6: torch.Tensor,
v_6: torch.Tensor,
k_7: torch.Tensor,
v_7: torch.Tensor,
k_8: torch.Tensor,
v_8: torch.Tensor,
k_9: torch.Tensor,
v_9: torch.Tensor,
k_10: torch.Tensor,
v_10: torch.Tensor,
k_11: torch.Tensor,
v_11: torch.Tensor,
k_12: torch.Tensor,
v_12: torch.Tensor,
k_13: torch.Tensor,
v_13: torch.Tensor,
k_14: torch.Tensor,
v_14: torch.Tensor,
k_15: torch.Tensor,
v_15: torch.Tensor,
) -> torch.Tensor:
"""Perform one denoising step."""
key_states = [
k_0,
k_1,
k_2,
k_3,
k_4,
k_5,
k_6,
k_7,
k_8,
k_9,
k_10,
k_11,
k_12,
k_13,
k_14,
k_15,
]
value_states = [
v_0,
v_1,
v_2,
v_3,
v_4,
v_5,
v_6,
v_7,
v_8,
v_9,
v_10,
v_11,
v_12,
v_13,
v_14,
v_15,
]
past_key_values = {
i: {"key_states": key_states[i], "value_states": value_states[i]}
for i in range(len(self.policy.model.vlm_with_expert.lm_expert.layers))
}
v_t = self.policy.model.denoise_step(
prefix_pad_masks,
past_key_values,
x_t,
expanded_time,
)
return v_t
class DirectoryManager:
"""Manages export directory structure."""
def __init__(self, export_path: str, num_layers: int):
self.export_path = Path(export_path)
self.num_layers = num_layers
self._create_directories()
def _create_directories(self):
"""Create all necessary directories."""
# Main directories
(self.export_path / ALL_OUTPUT_NAME).mkdir(parents=True, exist_ok=True)
(self.export_path / TEST_DATA_NAME).mkdir(parents=True, exist_ok=True)
# Create workspace directories for each model
self._create_model_workspace(STATE_PROJ_NAME, ["state"], ["state_output"])
kv_output_names = []
for i in range(self.num_layers):
kv_output_names.append(f"k_{i}")
kv_output_names.append(f"v_{i}")
self._create_model_workspace(
VLM_EXPERT_MODEL_NAME,
["prefix_att_2d_masks", "prefix_position_ids", "prefix_embs"],
kv_output_names,
)
# Action expert inputs
action_inputs = ["prefix_pad_masks", "x_t", "expanded_time"]
action_inputs.extend(kv_output_names)
self._create_model_workspace(
ACTION_EXPERT_MODEL_NAME, action_inputs, ["x_t_output"]
)
self._create_model_workspace(
VISION_ENCODER_NAME, ["pixel_values"], ["hidden_state"]
)
def _create_model_workspace(
self, model_name: str, input_names: List[str], output_names: List[str]
):
"""Create workspace structure for a model."""
ws_path = self.export_path / f"{model_name}_ws"
ws_path.mkdir(parents=True, exist_ok=True)
# Calibration directories
cal_path = ws_path / "cal"
cal_path.mkdir(exist_ok=True)
for name in input_names:
(cal_path / name).mkdir(exist_ok=True)
# Output directories
output_path = ws_path / "output"
output_path.mkdir(exist_ok=True)
for name in output_names:
(output_path / name).mkdir(exist_ok=True)
class ConfigGenerator:
"""Generates YAML configuration and bash scripts."""
@staticmethod
def generate_yaml_config(
model_name: str,
input_names: List[str],
march: str,
jobs: int,
debug: str,
extra_node_config: Dict[str, Dict] = None,
) -> str:
"""Generate YAML configuration for model compilation."""
input_name_str = ";".join(input_names) + ";"
layout_str = "NCHW;" * len(input_names)
type_str = "featuremap;" * len(input_names)
norm_type_str = "no_preprocess;" * len(input_names)
cal_data_dir_str = ";".join([f"./cal/{name}" for name in input_names]) + ";"
node_config_str = ""
if extra_node_config:
node_config_items = ",\n".join(
f' "{node}": {{"qtype": "float32"}}'
for node in extra_node_config
)
node_config_str = f"""
"node_config": {{
{node_config_items}
}}"""
yaml_content = f"""model_parameters:
onnx_model: {model_name}.onnx
march: {march}
layer_out_dump: False
working_dir: bpu_output
output_model_file_prefix: {model_name}_featuremaps
enable_vpu: True
input_parameters:
input_name: {input_name_str}
input_layout_rt: {layout_str}
input_layout_train: {layout_str}
input_type_rt: {type_str}
input_type_train: {type_str}
norm_type: {norm_type_str}
calibration_parameters:
cal_data_dir: '{cal_data_dir_str}'
quant_config: {{
"model_config": {{
"all_node_type": "int16",
"model_output_type": "float32",
"activation": {{
"calibration_type": ["max"],
"num_bin": [1024, 2048, 4096],
"max_num_bin": 16384,
"max_percentile": 1.0,
"per_channel": true,
"asymmetric": [true]
}},
"weight": {{
"bias_correction": {{
"metric": "mae"
}}
}},
"modelwise_search": {{
"metric": "mae"
}}
}},
"model_config": {{
"all_node_type": "int16",
"model_output_type": "int16",
}},
"op_config": {{
"ReduceMean": {{"qtype": "int16"}},
"Sub": {{"qtype": "int16"}},
"Softmax": {{"qtype": "int16"}}
}},{node_config_str}
}}
compiler_parameters:
extra_params: {{'input_no_padding': True, 'output_no_padding': True}}
jobs: {jobs}
compile_mode: 'latency'
debug: {debug}
advice: 1
optimize_level: 'O2'
"""
return yaml_content
@staticmethod
def generate_bash_script(model_name: str) -> str:
"""Generate bash build script."""
return f"""hb_compile --config config.yaml
chmod 777 ./*
chmod 777 ./*/*
chmod 777 ./*/*/*
cp bpu_output/{model_name}_featuremaps.hbm ../{ALL_OUTPUT_NAME}
"""
class SmolVLAExporter:
"""Main exporter class for SmolVLA models."""
def __init__(
self,
model_path: str,
dataset_path: str,
export_path: str,
device: str = DEFAULT_DEVICE,
num_calibration_samples: int = DEFAULT_NUM_CALIBRATION_SAMPLES,
march: str = "nash-m",
jobs: int = 32,
debug: str = "False",
):
self.model_path = model_path
self.dataset_path = dataset_path
self.export_path = Path(export_path)
self.device = torch.device(device)
self.num_calibration_samples = num_calibration_samples
self.march = march
self.jobs = jobs
self.debug = debug
# Load policy and dataset
self.policy = self._load_policy()
self.dataset = self._load_dataset()
self.data_indices = self._select_calibration_samples()
# Setup directories
num_layers = len(self.policy.model.vlm_with_expert.lm_expert.layers)
self.dir_manager = DirectoryManager(export_path, num_layers)
# Define input/output names
self.input_names_kv = [
"prefix_att_2d_masks",
"prefix_position_ids",
"prefix_embs",
]
self.output_names_kv = []
for i in range(num_layers):
self.output_names_kv.append(f"k_{i}")
self.output_names_kv.append(f"v_{i}")
self.input_names_denoise = ["prefix_pad_masks", "x_t", "expanded_time"]
self.input_names_denoise.extend(self.output_names_kv)
self.output_names_denoise = ["x_t_output"]
def _load_policy(self) -> SmolVLAPolicy:
"""Load pretrained policy model."""
policy = SmolVLAPolicy.from_pretrained(self.model_path)
return policy.to(self.device).float().eval()
def _load_dataset(self) -> LeRobotDataset:
"""Load LeRobot dataset."""
dataset = LeRobotDataset(repo_id="Foo/Bar", root=self.dataset_path)
print(f"Dataset loaded: {len(dataset)} samples")
return dataset
def _select_calibration_samples(self) -> List[int]:
"""Randomly select calibration samples."""
return random.sample(range(len(self.dataset)), self.num_calibration_samples)
def export_language_embedding(self):
"""Export language embedding matrix."""
embedding_matrix = (
self.policy.model.vlm_with_expert.vlm.model.text_model.embed_tokens.weight.detach()
.cpu()
.float()
.numpy()
)
print(
f"Language embedding shape: {embedding_matrix.shape}, dtype: {embedding_matrix.dtype}"
)
output_path = self.export_path / ALL_OUTPUT_NAME / f"{VLM_EXPERT_EMBEDDING}.npy"
np.save(output_path, embedding_matrix)
def export_normalization_params(self):
"""Export state normalization/unnormalization parameters."""
params = {
"normalize_inputs.mean": self.policy.normalize_inputs.buffer_observation_state.mean.data.detach().cpu(),
"normalize_inputs.std": self.policy.normalize_inputs.buffer_observation_state.std.data.detach().cpu(),
"unnormalize_outputs.mean": self.policy.unnormalize_outputs.buffer_action.mean.data.detach().cpu(),
"unnormalize_outputs.std": self.policy.unnormalize_outputs.buffer_action.std.data.detach().cpu(),
}
output_path = self.export_path / ALL_OUTPUT_NAME / f"{STATE_NORM_NAME}.pt"
torch.save(params, output_path)
def export_denoise_model(self):
"""Export denoising model to ONNX."""
# Prepare sample data
sample_data = self.dataset[0]
obs = self._prepare_observation(sample_data)
batch = self.policy.normalize_inputs(copy.deepcopy(obs))
# Prepare inputs
images, img_masks = self.policy.prepare_images(batch)
state = self.policy.prepare_state(batch)
lang_tokens, lang_masks = self.policy.prepare_language(batch)
# Generate prefix embeddings
prefix_embs, prefix_pad_masks, prefix_att_masks = (
self.policy.model.embed_prefix(
images, img_masks, lang_tokens, lang_masks, state=state
)
)
prefix_att_2d_masks = make_att_2d_masks(prefix_pad_masks, prefix_att_masks)
prefix_position_ids = torch.cumsum(prefix_pad_masks, dim=1) - 1
# Generate KV cache
m_kv = BPUKVCache(self.policy)
with torch.no_grad():
kv_caches = m_kv(prefix_att_2d_masks, prefix_position_ids, prefix_embs)
# Prepare denoising inputs
noise = self._generate_noise(batch_size=1)
m_denoise = BPUDenoise(self.policy)
m_denoise.eval()
dt = -1.0 / self.policy.model.config.num_steps
dt = torch.tensor(dt, dtype=torch.float32, device=self.device)
x_t = noise
time = torch.tensor(1.0, dtype=torch.float32, device=self.device)
while time >= -dt / 2:
expanded_time = time.expand(1)
input_tensors = (prefix_pad_masks, x_t, expanded_time, *kv_caches)
with torch.no_grad():
v_t = m_denoise(*input_tensors)
x_t += dt * v_t
time += dt
onnx_path = (
self.export_path
/ f"{ACTION_EXPERT_MODEL_NAME}_ws"
/ f"{ACTION_EXPERT_MODEL_NAME}.onnx"
)
torch.onnx.export(
m_denoise,
input_tensors,
onnx_path,
export_params=True,
opset_version=19,
do_constant_folding=True,
input_names=self.input_names_denoise,
output_names=self.output_names_denoise,
dynamic_axes=None,
dynamo=False,
)
print(f"Denoising model exported to {onnx_path}")
def prepare_calibration_data(self):
"""Prepare calibration data for all models."""
m_kv = BPUKVCache(self.policy)
m_denoise = BPUDenoise(self.policy)
kv_cnt = 0
denoise_cnt = 0
for idx in tqdm(self.data_indices, desc="Preparing calibration data"):
sample_data = self.dataset[idx]
obs = self._prepare_observation(sample_data)
batch = self.policy.normalize_inputs(copy.deepcopy(obs))
# Prepare inputs
images, img_masks = self.policy.prepare_images(batch)
state = self.policy.prepare_state(batch)
lang_tokens, lang_masks = self.policy.prepare_language(batch)
# Generate prefix embeddings
prefix_embs, prefix_pad_masks, prefix_att_masks = (
self.policy.model.embed_prefix(
images, img_masks, lang_tokens, lang_masks, state=state
)
)
prefix_att_2d_masks = make_att_2d_masks(prefix_pad_masks, prefix_att_masks)
prefix_position_ids = torch.cumsum(prefix_pad_masks, dim=1) - 1
# Save KV cache inputs and outputs
self._save_calibration_tensors(
[prefix_att_2d_masks, prefix_position_ids, prefix_embs],
self.input_names_kv,
VLM_EXPERT_MODEL_NAME,
"cal",
kv_cnt,
)
with torch.no_grad():
kv_caches = m_kv(prefix_att_2d_masks, prefix_position_ids, prefix_embs)
self._save_calibration_tensors(
kv_caches, self.output_names_kv, VLM_EXPERT_MODEL_NAME, "output", kv_cnt
)
kv_cnt += 1
# Diffusion loop for denoising calibration
noise = self._generate_noise(batch_size=1)
dt = -1.0 / self.policy.model.config.num_steps
dt = torch.tensor(dt, dtype=torch.float32, device=self.device)
x_t = noise
time = torch.tensor(1.0, dtype=torch.float32, device=self.device)
if kv_cnt % 5 == 0: # 保留所有trace作为校准数据
while time >= -dt / 2:
expanded_time = time.expand(1)
input_tensors = (prefix_pad_masks, x_t, expanded_time, *kv_caches)
with torch.no_grad():
v_t = m_denoise(*input_tensors)
self._save_calibration_tensors(
input_tensors, self.input_names_denoise,
ACTION_EXPERT_MODEL_NAME, "cal", denoise_cnt,
)
self._save_calibration_tensors(
[v_t], self.output_names_denoise,
ACTION_EXPERT_MODEL_NAME, "output", denoise_cnt,
)
x_t += dt * v_t
time += dt
denoise_cnt += 1
else: # 保留一半作为校准数据
cnt_ls = 0
while time >= -dt / 2:
expanded_time = time.expand(1)
input_tensors = (prefix_pad_masks, x_t, expanded_time, *kv_caches)
with torch.no_grad():
v_t = m_denoise(*input_tensors)
cnt_ls += 1
if cnt_ls % 3 == 0:
self._save_calibration_tensors(
input_tensors, self.input_names_denoise,
ACTION_EXPERT_MODEL_NAME, "cal", denoise_cnt,
)
self._save_calibration_tensors(
[v_t], self.output_names_denoise,
ACTION_EXPERT_MODEL_NAME, "output", denoise_cnt,
)
denoise_cnt += 1
x_t += dt * v_t
time += dt
print(
f"Calibration data prepared: {kv_cnt} KV samples, {denoise_cnt} denoise samples"
)
def generate_config_files(self):
"""Generate YAML configs and bash scripts for compilation."""
config_gen = ConfigGenerator()
# VLM expert config
yaml_content = config_gen.generate_yaml_config(
VLM_EXPERT_MODEL_NAME,
self.input_names_kv,
self.march,
self.jobs,
self.debug,
)
self._write_config_files(VLM_EXPERT_MODEL_NAME, yaml_content)
# Action expert config with extra node configuration
extra_node_config = [
"/Unsqueeze",
"/Mul",
"/Cos",
"/Sin",
"/Concat",
"/Cast",
"/Unsqueeze_1",
]
yaml_content = config_gen.generate_yaml_config(
ACTION_EXPERT_MODEL_NAME,
self.input_names_denoise,
self.march,
self.jobs,
self.debug,
extra_node_config,
)
self._write_config_files(ACTION_EXPERT_MODEL_NAME, yaml_content)
def _prepare_observation(self, data: Dict) -> Dict[str, Any]:
"""Prepare observation dictionary from dataset sample."""
obs = {
"instruction": data["task"],
"task": data["task"],
"observation.images.cam_high": data["observation.images.cam_high"]
.unsqueeze(0)
.to(self.device),
"observation.images.cam_left_wrist": data[
"observation.images.cam_left_wrist"
]
.unsqueeze(0)
.to(self.device),
"observation.images.cam_right_wrist": data[
"observation.images.cam_right_wrist"
]
.unsqueeze(0)
.to(self.device),
"observation.state": data["action"].unsqueeze(0).to(self.device),
}
return obs
def _generate_noise(self, batch_size: int) -> torch.Tensor:
"""Generate noise tensor for diffusion."""
actions_shape = (
batch_size,
self.policy.model.config.chunk_size,
self.policy.model.config.max_action_dim,
)
return torch.normal(
mean=0.0,
std=1.0,
size=actions_shape,
dtype=torch.float32,
device=self.device,
)
def _save_calibration_tensors(
self,
tensors: List[torch.Tensor],
names: List[str],
model_name: str,
subdir: str,
index: int,
):
"""Save calibration tensors to disk."""
for tensor, name in zip(tensors, names):
output_path = (
self.export_path / f"{model_name}_ws" / subdir / name / f"{index}.npy"
)
np.save(output_path, tensor.detach().cpu().numpy())
def _write_config_files(self, model_name: str, yaml_content: str):
"""Write YAML config and bash script for a model."""
ws_path = self.export_path / f"{model_name}_ws"
# Write YAML config
yaml_path = ws_path / "config.yaml"
with open(yaml_path, "w", encoding="utf-8") as f:
f.write(yaml_content)
# Write bash script
bash_content = ConfigGenerator.generate_bash_script(model_name)
bash_path = ws_path / "build.bash"
with open(bash_path, "w", encoding="utf-8") as f:
f.write(bash_content)
print(f"Config files written for {model_name}")
def export_all(self):
"""Run complete export pipeline."""
print("Starting SmolVLA export...")
print(f"Export path: {self.export_path}")
print("\n[1/5] Exporting language embedding...")
self.export_language_embedding()
print("\n[2/5] Exporting normalization parameters...")
self.export_normalization_params()
print("\n[3/5] Exporting denoising model...")
self.export_denoise_model()
print("\n[4/5] Preparing calibration data...")
self.prepare_calibration_data()
print("\n[5/5] Generating configuration files...")
self.generate_config_files()
print("\n✓ Export completed successfully!")
if __name__ == "__main__":
main()