""" SmolVLA Model Export Tool This module exports SmolVLA models for BPU deployment, including: - Vision encoder with connector - VLM expert model (KV cache generation) - Action expert model (denoising) - Calibration data preparation - Configuration file generation """ import os import copy import random import argparse from pathlib import Path from typing import Dict, List, Tuple, Any import torch import torch.nn as nn import numpy as np from tqdm import tqdm from lerobot.policies.smolvla.modeling_smolvla import ( SmolVLAPolicy, make_att_2d_masks, ) from lerobot.datasets.lerobot_dataset import LeRobotDataset # Constants DEFAULT_REPO_ID = "adjust_bottle_clean_50_fps15_1instruction" DEFAULT_DEVICE = "cuda:0" DEFAULT_NUM_CALIBRATION_SAMPLES = 10 # Model component names ALL_OUTPUT_NAME = "board_outputs_all" TEST_DATA_NAME = "e2e_test_datas" STATE_NORM_NAME = "state_normalize_unnormalize" STATE_PROJ_NAME = "state_proj" VISION_ENCODER_NAME = "vlm_vision_encoder_with_connecter" VLM_EXPERT_MODEL_NAME = "vlm_expert" ACTION_EXPERT_MODEL_NAME = "action_expert" VLM_EXPERT_EMBEDDING = "language_embedding_matrix" def parse_arguments() -> argparse.Namespace: """Parse command line arguments.""" repo_id = "adjust_bottle_clean_50_fps15_1instruction" parser = argparse.ArgumentParser(description="Export SmolVLA models for BPU deployment") parser.add_argument("--repo-id", type=str, default=repo_id, help="Repository ID for the model and dataset") parser.add_argument("--smolvla-model-path", type=str, default=f"/home/chao.wu/SmolVLA_RoboTwin2_BPU/train_result_rtw15fps_1instruction/{repo_id}/checkpoints/040000/pretrained_model", help="Path to pretrained SmolVLA model") parser.add_argument("--lerobot-dataset-path", type=str, default=f"/home/chao.wu/SmolVLA_RoboTwin2_BPU/huggingface/lerobot/rtw15fps_1instruction/{repo_id}", help="Path to LeRobot dataset") parser.add_argument("--export-path", type=str, default="export_dir", help="Output directory for exported models") parser.add_argument("--jobs", type=int, default=32, help="Number of parallel jobs for compilation") parser.add_argument("--march", type=str, default="nash-m", help="Target architecture for BPU") parser.add_argument("--debug", type=str, default="False", choices=["True", "False"], help="Enable debug mode") parser.add_argument("--device", type=str, default=DEFAULT_DEVICE, help="Device to use (e.g., 'cuda:0', 'cpu')") parser.add_argument("--num-calibration-samples", type=int, default=DEFAULT_NUM_CALIBRATION_SAMPLES, help="Number of calibration samples to use") return parser.parse_args([]) def main(): """Main entry point.""" # Set environment variables os.environ["CUDA_VISIBLE_DEVICES"] = "4" os.environ["HTTPS_PROXY"] = "http://192.168.16.68:18000" args = parse_arguments() # Create exporter and run exporter = SmolVLAExporter( model_path=args.smolvla_model_path, dataset_path=args.lerobot_dataset_path, export_path=args.export_path, device=args.device, num_calibration_samples=args.num_calibration_samples, march=args.march, jobs=args.jobs, debug=args.debug, ) exporter.export_all() class BPUKVCache(nn.Module): """Wrapper for KV cache generation model.""" def __init__(self, policy: SmolVLAPolicy): super().__init__() self.policy = policy def forward( self, prefix_att_2d_masks: torch.Tensor, prefix_position_ids: torch.Tensor, prefix_embs: torch.Tensor, ) -> List[torch.Tensor]: """Generate KV cache from prefix embeddings.""" _, past_key_values = self.policy.model.vlm_with_expert.forward( attention_mask=prefix_att_2d_masks, position_ids=prefix_position_ids, past_key_values=None, inputs_embeds=[prefix_embs, None], use_cache=self.policy.model.config.use_cache, fill_kv_cache=True, ) results = [] for i in range(len(past_key_values)): results.append(past_key_values[i]['key_states']) results.append(past_key_values[i]['value_states']) return results class BPUDenoise(nn.Module): """Wrapper for denoising model.""" def __init__(self, policy: SmolVLAPolicy): super().__init__() self.policy = policy def forward( self, prefix_pad_masks: torch.Tensor, x_t: torch.Tensor, expanded_time: torch.Tensor, k_0: torch.Tensor, v_0: torch.Tensor, k_1: torch.Tensor, v_1: torch.Tensor, k_2: torch.Tensor, v_2: torch.Tensor, k_3: torch.Tensor, v_3: torch.Tensor, k_4: torch.Tensor, v_4: torch.Tensor, k_5: torch.Tensor, v_5: torch.Tensor, k_6: torch.Tensor, v_6: torch.Tensor, k_7: torch.Tensor, v_7: torch.Tensor, k_8: torch.Tensor, v_8: torch.Tensor, k_9: torch.Tensor, v_9: torch.Tensor, k_10: torch.Tensor, v_10: torch.Tensor, k_11: torch.Tensor, v_11: torch.Tensor, k_12: torch.Tensor, v_12: torch.Tensor, k_13: torch.Tensor, v_13: torch.Tensor, k_14: torch.Tensor, v_14: torch.Tensor, k_15: torch.Tensor, v_15: torch.Tensor, ) -> torch.Tensor: """Perform one denoising step.""" key_states = [ k_0, k_1, k_2, k_3, k_4, k_5, k_6, k_7, k_8, k_9, k_10, k_11, k_12, k_13, k_14, k_15, ] value_states = [ v_0, v_1, v_2, v_3, v_4, v_5, v_6, v_7, v_8, v_9, v_10, v_11, v_12, v_13, v_14, v_15, ] past_key_values = { i: {"key_states": key_states[i], "value_states": value_states[i]} for i in range(len(self.policy.model.vlm_with_expert.lm_expert.layers)) } v_t = self.policy.model.denoise_step( prefix_pad_masks, past_key_values, x_t, expanded_time, ) return v_t class DirectoryManager: """Manages export directory structure.""" def __init__(self, export_path: str, num_layers: int): self.export_path = Path(export_path) self.num_layers = num_layers self._create_directories() def _create_directories(self): """Create all necessary directories.""" # Main directories (self.export_path / ALL_OUTPUT_NAME).mkdir(parents=True, exist_ok=True) (self.export_path / TEST_DATA_NAME).mkdir(parents=True, exist_ok=True) # Create workspace directories for each model self._create_model_workspace(STATE_PROJ_NAME, ["state"], ["state_output"]) kv_output_names = [] for i in range(self.num_layers): kv_output_names.append(f"k_{i}") kv_output_names.append(f"v_{i}") self._create_model_workspace( VLM_EXPERT_MODEL_NAME, ["prefix_att_2d_masks", "prefix_position_ids", "prefix_embs"], kv_output_names, ) # Action expert inputs action_inputs = ["prefix_pad_masks", "x_t", "expanded_time"] action_inputs.extend(kv_output_names) self._create_model_workspace( ACTION_EXPERT_MODEL_NAME, action_inputs, ["x_t_output"] ) self._create_model_workspace( VISION_ENCODER_NAME, ["pixel_values"], ["hidden_state"] ) def _create_model_workspace( self, model_name: str, input_names: List[str], output_names: List[str] ): """Create workspace structure for a model.""" ws_path = self.export_path / f"{model_name}_ws" ws_path.mkdir(parents=True, exist_ok=True) # Calibration directories cal_path = ws_path / "cal" cal_path.mkdir(exist_ok=True) for name in input_names: (cal_path / name).mkdir(exist_ok=True) # Output directories output_path = ws_path / "output" output_path.mkdir(exist_ok=True) for name in output_names: (output_path / name).mkdir(exist_ok=True) class ConfigGenerator: """Generates YAML configuration and bash scripts.""" @staticmethod def generate_yaml_config( model_name: str, input_names: List[str], march: str, jobs: int, debug: str, extra_node_config: Dict[str, Dict] = None, ) -> str: """Generate YAML configuration for model compilation.""" input_name_str = ";".join(input_names) + ";" layout_str = "NCHW;" * len(input_names) type_str = "featuremap;" * len(input_names) norm_type_str = "no_preprocess;" * len(input_names) cal_data_dir_str = ";".join([f"./cal/{name}" for name in input_names]) + ";" node_config_str = "" if extra_node_config: node_config_items = ",\n".join( f' "{node}": {{"qtype": "float32"}}' for node in extra_node_config ) node_config_str = f""" "node_config": {{ {node_config_items} }}""" yaml_content = f"""model_parameters: onnx_model: {model_name}.onnx march: {march} layer_out_dump: False working_dir: bpu_output output_model_file_prefix: {model_name}_featuremaps enable_vpu: True input_parameters: input_name: {input_name_str} input_layout_rt: {layout_str} input_layout_train: {layout_str} input_type_rt: {type_str} input_type_train: {type_str} norm_type: {norm_type_str} calibration_parameters: cal_data_dir: '{cal_data_dir_str}' quant_config: {{ "model_config": {{ "all_node_type": "int16", "model_output_type": "float32", "activation": {{ "calibration_type": ["max"], "num_bin": [1024, 2048, 4096], "max_num_bin": 16384, "max_percentile": 1.0, "per_channel": true, "asymmetric": [true] }}, "weight": {{ "bias_correction": {{ "metric": "mae" }} }}, "modelwise_search": {{ "metric": "mae" }} }}, "model_config": {{ "all_node_type": "int16", "model_output_type": "int16", }}, "op_config": {{ "ReduceMean": {{"qtype": "int16"}}, "Sub": {{"qtype": "int16"}}, "Softmax": {{"qtype": "int16"}} }},{node_config_str} }} compiler_parameters: extra_params: {{'input_no_padding': True, 'output_no_padding': True}} jobs: {jobs} compile_mode: 'latency' debug: {debug} advice: 1 optimize_level: 'O2' """ return yaml_content @staticmethod def generate_bash_script(model_name: str) -> str: """Generate bash build script.""" return f"""hb_compile --config config.yaml chmod 777 ./* chmod 777 ./*/* chmod 777 ./*/*/* cp bpu_output/{model_name}_featuremaps.hbm ../{ALL_OUTPUT_NAME} """ class SmolVLAExporter: """Main exporter class for SmolVLA models.""" def __init__( self, model_path: str, dataset_path: str, export_path: str, device: str = DEFAULT_DEVICE, num_calibration_samples: int = DEFAULT_NUM_CALIBRATION_SAMPLES, march: str = "nash-m", jobs: int = 32, debug: str = "False", ): self.model_path = model_path self.dataset_path = dataset_path self.export_path = Path(export_path) self.device = torch.device(device) self.num_calibration_samples = num_calibration_samples self.march = march self.jobs = jobs self.debug = debug # Load policy and dataset self.policy = self._load_policy() self.dataset = self._load_dataset() self.data_indices = self._select_calibration_samples() # Setup directories num_layers = len(self.policy.model.vlm_with_expert.lm_expert.layers) self.dir_manager = DirectoryManager(export_path, num_layers) # Define input/output names self.input_names_kv = [ "prefix_att_2d_masks", "prefix_position_ids", "prefix_embs", ] self.output_names_kv = [] for i in range(num_layers): self.output_names_kv.append(f"k_{i}") self.output_names_kv.append(f"v_{i}") self.input_names_denoise = ["prefix_pad_masks", "x_t", "expanded_time"] self.input_names_denoise.extend(self.output_names_kv) self.output_names_denoise = ["x_t_output"] def _load_policy(self) -> SmolVLAPolicy: """Load pretrained policy model.""" policy = SmolVLAPolicy.from_pretrained(self.model_path) return policy.to(self.device).float().eval() def _load_dataset(self) -> LeRobotDataset: """Load LeRobot dataset.""" dataset = LeRobotDataset(repo_id="Foo/Bar", root=self.dataset_path) print(f"Dataset loaded: {len(dataset)} samples") return dataset def _select_calibration_samples(self) -> List[int]: """Randomly select calibration samples.""" return random.sample(range(len(self.dataset)), self.num_calibration_samples) def export_language_embedding(self): """Export language embedding matrix.""" embedding_matrix = ( self.policy.model.vlm_with_expert.vlm.model.text_model.embed_tokens.weight.detach() .cpu() .float() .numpy() ) print( f"Language embedding shape: {embedding_matrix.shape}, dtype: {embedding_matrix.dtype}" ) output_path = self.export_path / ALL_OUTPUT_NAME / f"{VLM_EXPERT_EMBEDDING}.npy" np.save(output_path, embedding_matrix) def export_normalization_params(self): """Export state normalization/unnormalization parameters.""" params = { "normalize_inputs.mean": self.policy.normalize_inputs.buffer_observation_state.mean.data.detach().cpu(), "normalize_inputs.std": self.policy.normalize_inputs.buffer_observation_state.std.data.detach().cpu(), "unnormalize_outputs.mean": self.policy.unnormalize_outputs.buffer_action.mean.data.detach().cpu(), "unnormalize_outputs.std": self.policy.unnormalize_outputs.buffer_action.std.data.detach().cpu(), } output_path = self.export_path / ALL_OUTPUT_NAME / f"{STATE_NORM_NAME}.pt" torch.save(params, output_path) def export_denoise_model(self): """Export denoising model to ONNX.""" # Prepare sample data sample_data = self.dataset[0] obs = self._prepare_observation(sample_data) batch = self.policy.normalize_inputs(copy.deepcopy(obs)) # Prepare inputs images, img_masks = self.policy.prepare_images(batch) state = self.policy.prepare_state(batch) lang_tokens, lang_masks = self.policy.prepare_language(batch) # Generate prefix embeddings prefix_embs, prefix_pad_masks, prefix_att_masks = ( self.policy.model.embed_prefix( images, img_masks, lang_tokens, lang_masks, state=state ) ) prefix_att_2d_masks = make_att_2d_masks(prefix_pad_masks, prefix_att_masks) prefix_position_ids = torch.cumsum(prefix_pad_masks, dim=1) - 1 # Generate KV cache m_kv = BPUKVCache(self.policy) with torch.no_grad(): kv_caches = m_kv(prefix_att_2d_masks, prefix_position_ids, prefix_embs) # Prepare denoising inputs noise = self._generate_noise(batch_size=1) m_denoise = BPUDenoise(self.policy) m_denoise.eval() dt = -1.0 / self.policy.model.config.num_steps dt = torch.tensor(dt, dtype=torch.float32, device=self.device) x_t = noise time = torch.tensor(1.0, dtype=torch.float32, device=self.device) while time >= -dt / 2: expanded_time = time.expand(1) input_tensors = (prefix_pad_masks, x_t, expanded_time, *kv_caches) with torch.no_grad(): v_t = m_denoise(*input_tensors) x_t += dt * v_t time += dt onnx_path = ( self.export_path / f"{ACTION_EXPERT_MODEL_NAME}_ws" / f"{ACTION_EXPERT_MODEL_NAME}.onnx" ) torch.onnx.export( m_denoise, input_tensors, onnx_path, export_params=True, opset_version=19, do_constant_folding=True, input_names=self.input_names_denoise, output_names=self.output_names_denoise, dynamic_axes=None, dynamo=False, ) print(f"Denoising model exported to {onnx_path}") def prepare_calibration_data(self): """Prepare calibration data for all models.""" m_kv = BPUKVCache(self.policy) m_denoise = BPUDenoise(self.policy) kv_cnt = 0 denoise_cnt = 0 for idx in tqdm(self.data_indices, desc="Preparing calibration data"): sample_data = self.dataset[idx] obs = self._prepare_observation(sample_data) batch = self.policy.normalize_inputs(copy.deepcopy(obs)) # Prepare inputs images, img_masks = self.policy.prepare_images(batch) state = self.policy.prepare_state(batch) lang_tokens, lang_masks = self.policy.prepare_language(batch) # Generate prefix embeddings prefix_embs, prefix_pad_masks, prefix_att_masks = ( self.policy.model.embed_prefix( images, img_masks, lang_tokens, lang_masks, state=state ) ) prefix_att_2d_masks = make_att_2d_masks(prefix_pad_masks, prefix_att_masks) prefix_position_ids = torch.cumsum(prefix_pad_masks, dim=1) - 1 # Save KV cache inputs and outputs self._save_calibration_tensors( [prefix_att_2d_masks, prefix_position_ids, prefix_embs], self.input_names_kv, VLM_EXPERT_MODEL_NAME, "cal", kv_cnt, ) with torch.no_grad(): kv_caches = m_kv(prefix_att_2d_masks, prefix_position_ids, prefix_embs) self._save_calibration_tensors( kv_caches, self.output_names_kv, VLM_EXPERT_MODEL_NAME, "output", kv_cnt ) kv_cnt += 1 # Diffusion loop for denoising calibration noise = self._generate_noise(batch_size=1) dt = -1.0 / self.policy.model.config.num_steps dt = torch.tensor(dt, dtype=torch.float32, device=self.device) x_t = noise time = torch.tensor(1.0, dtype=torch.float32, device=self.device) if kv_cnt % 5 == 0: # 保留所有trace作为校准数据 while time >= -dt / 2: expanded_time = time.expand(1) input_tensors = (prefix_pad_masks, x_t, expanded_time, *kv_caches) with torch.no_grad(): v_t = m_denoise(*input_tensors) self._save_calibration_tensors( input_tensors, self.input_names_denoise, ACTION_EXPERT_MODEL_NAME, "cal", denoise_cnt, ) self._save_calibration_tensors( [v_t], self.output_names_denoise, ACTION_EXPERT_MODEL_NAME, "output", denoise_cnt, ) x_t += dt * v_t time += dt denoise_cnt += 1 else: # 保留一半作为校准数据 cnt_ls = 0 while time >= -dt / 2: expanded_time = time.expand(1) input_tensors = (prefix_pad_masks, x_t, expanded_time, *kv_caches) with torch.no_grad(): v_t = m_denoise(*input_tensors) cnt_ls += 1 if cnt_ls % 3 == 0: self._save_calibration_tensors( input_tensors, self.input_names_denoise, ACTION_EXPERT_MODEL_NAME, "cal", denoise_cnt, ) self._save_calibration_tensors( [v_t], self.output_names_denoise, ACTION_EXPERT_MODEL_NAME, "output", denoise_cnt, ) denoise_cnt += 1 x_t += dt * v_t time += dt print( f"Calibration data prepared: {kv_cnt} KV samples, {denoise_cnt} denoise samples" ) def generate_config_files(self): """Generate YAML configs and bash scripts for compilation.""" config_gen = ConfigGenerator() # VLM expert config yaml_content = config_gen.generate_yaml_config( VLM_EXPERT_MODEL_NAME, self.input_names_kv, self.march, self.jobs, self.debug, ) self._write_config_files(VLM_EXPERT_MODEL_NAME, yaml_content) # Action expert config with extra node configuration extra_node_config = [ "/Unsqueeze", "/Mul", "/Cos", "/Sin", "/Concat", "/Cast", "/Unsqueeze_1", ] yaml_content = config_gen.generate_yaml_config( ACTION_EXPERT_MODEL_NAME, self.input_names_denoise, self.march, self.jobs, self.debug, extra_node_config, ) self._write_config_files(ACTION_EXPERT_MODEL_NAME, yaml_content) def _prepare_observation(self, data: Dict) -> Dict[str, Any]: """Prepare observation dictionary from dataset sample.""" obs = { "instruction": data["task"], "task": data["task"], "observation.images.cam_high": data["observation.images.cam_high"] .unsqueeze(0) .to(self.device), "observation.images.cam_left_wrist": data[ "observation.images.cam_left_wrist" ] .unsqueeze(0) .to(self.device), "observation.images.cam_right_wrist": data[ "observation.images.cam_right_wrist" ] .unsqueeze(0) .to(self.device), "observation.state": data["action"].unsqueeze(0).to(self.device), } return obs def _generate_noise(self, batch_size: int) -> torch.Tensor: """Generate noise tensor for diffusion.""" actions_shape = ( batch_size, self.policy.model.config.chunk_size, self.policy.model.config.max_action_dim, ) return torch.normal( mean=0.0, std=1.0, size=actions_shape, dtype=torch.float32, device=self.device, ) def _save_calibration_tensors( self, tensors: List[torch.Tensor], names: List[str], model_name: str, subdir: str, index: int, ): """Save calibration tensors to disk.""" for tensor, name in zip(tensors, names): output_path = ( self.export_path / f"{model_name}_ws" / subdir / name / f"{index}.npy" ) np.save(output_path, tensor.detach().cpu().numpy()) def _write_config_files(self, model_name: str, yaml_content: str): """Write YAML config and bash script for a model.""" ws_path = self.export_path / f"{model_name}_ws" # Write YAML config yaml_path = ws_path / "config.yaml" with open(yaml_path, "w", encoding="utf-8") as f: f.write(yaml_content) # Write bash script bash_content = ConfigGenerator.generate_bash_script(model_name) bash_path = ws_path / "build.bash" with open(bash_path, "w", encoding="utf-8") as f: f.write(bash_content) print(f"Config files written for {model_name}") def export_all(self): """Run complete export pipeline.""" print("Starting SmolVLA export...") print(f"Export path: {self.export_path}") print("\n[1/5] Exporting language embedding...") self.export_language_embedding() print("\n[2/5] Exporting normalization parameters...") self.export_normalization_params() print("\n[3/5] Exporting denoising model...") self.export_denoise_model() print("\n[4/5] Preparing calibration data...") self.prepare_calibration_data() print("\n[5/5] Generating configuration files...") self.generate_config_files() print("\n✓ Export completed successfully!") if __name__ == "__main__": main()