743 lines
25 KiB
Python
743 lines
25 KiB
Python
"""
|
|
SmolVLA Model Export Tool
|
|
|
|
This module exports SmolVLA models for BPU deployment, including:
|
|
- Vision encoder with connector
|
|
- VLM expert model (KV cache generation)
|
|
- Action expert model (denoising)
|
|
- Calibration data preparation
|
|
- Configuration file generation
|
|
"""
|
|
|
|
import os
|
|
import copy
|
|
import random
|
|
import argparse
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple, Any
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
import numpy as np
|
|
from tqdm import tqdm
|
|
|
|
from lerobot.policies.smolvla.modeling_smolvla import (
|
|
SmolVLAPolicy,
|
|
make_att_2d_masks,
|
|
)
|
|
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
|
|
|
|
|
# Constants
|
|
DEFAULT_REPO_ID = "adjust_bottle_clean_50_fps15_1instruction"
|
|
DEFAULT_DEVICE = "cuda:0"
|
|
DEFAULT_NUM_CALIBRATION_SAMPLES = 10
|
|
|
|
# Model component names
|
|
ALL_OUTPUT_NAME = "board_outputs_all"
|
|
TEST_DATA_NAME = "e2e_test_datas"
|
|
STATE_NORM_NAME = "state_normalize_unnormalize"
|
|
STATE_PROJ_NAME = "state_proj"
|
|
VISION_ENCODER_NAME = "vlm_vision_encoder_with_connecter"
|
|
VLM_EXPERT_MODEL_NAME = "vlm_expert"
|
|
ACTION_EXPERT_MODEL_NAME = "action_expert"
|
|
VLM_EXPERT_EMBEDDING = "language_embedding_matrix"
|
|
|
|
|
|
def parse_arguments() -> argparse.Namespace:
|
|
"""Parse command line arguments."""
|
|
repo_id = "adjust_bottle_clean_50_fps15_1instruction"
|
|
parser = argparse.ArgumentParser(description="Export SmolVLA models for BPU deployment")
|
|
parser.add_argument("--repo-id", type=str, default=repo_id, help="Repository ID for the model and dataset")
|
|
parser.add_argument("--smolvla-model-path", type=str, default=f"/home/chao.wu/SmolVLA_RoboTwin2_BPU/train_result_rtw15fps_1instruction/{repo_id}/checkpoints/040000/pretrained_model", help="Path to pretrained SmolVLA model")
|
|
parser.add_argument("--lerobot-dataset-path", type=str, default=f"/home/chao.wu/SmolVLA_RoboTwin2_BPU/huggingface/lerobot/rtw15fps_1instruction/{repo_id}", help="Path to LeRobot dataset")
|
|
parser.add_argument("--export-path", type=str, default="export_dir", help="Output directory for exported models")
|
|
parser.add_argument("--jobs", type=int, default=32, help="Number of parallel jobs for compilation")
|
|
parser.add_argument("--march", type=str, default="nash-m", help="Target architecture for BPU")
|
|
parser.add_argument("--debug", type=str, default="False", choices=["True", "False"], help="Enable debug mode")
|
|
parser.add_argument("--device", type=str, default=DEFAULT_DEVICE, help="Device to use (e.g., 'cuda:0', 'cpu')")
|
|
parser.add_argument("--num-calibration-samples", type=int, default=DEFAULT_NUM_CALIBRATION_SAMPLES, help="Number of calibration samples to use")
|
|
return parser.parse_args([])
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
# Set environment variables
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = "4"
|
|
os.environ["HTTPS_PROXY"] = "http://192.168.16.68:18000"
|
|
|
|
args = parse_arguments()
|
|
|
|
# Create exporter and run
|
|
exporter = SmolVLAExporter(
|
|
model_path=args.smolvla_model_path,
|
|
dataset_path=args.lerobot_dataset_path,
|
|
export_path=args.export_path,
|
|
device=args.device,
|
|
num_calibration_samples=args.num_calibration_samples,
|
|
march=args.march,
|
|
jobs=args.jobs,
|
|
debug=args.debug,
|
|
)
|
|
|
|
exporter.export_all()
|
|
|
|
|
|
class BPUKVCache(nn.Module):
|
|
"""Wrapper for KV cache generation model."""
|
|
|
|
def __init__(self, policy: SmolVLAPolicy):
|
|
super().__init__()
|
|
self.policy = policy
|
|
|
|
def forward(
|
|
self,
|
|
prefix_att_2d_masks: torch.Tensor,
|
|
prefix_position_ids: torch.Tensor,
|
|
prefix_embs: torch.Tensor,
|
|
) -> List[torch.Tensor]:
|
|
"""Generate KV cache from prefix embeddings."""
|
|
_, past_key_values = self.policy.model.vlm_with_expert.forward(
|
|
attention_mask=prefix_att_2d_masks,
|
|
position_ids=prefix_position_ids,
|
|
past_key_values=None,
|
|
inputs_embeds=[prefix_embs, None],
|
|
use_cache=self.policy.model.config.use_cache,
|
|
fill_kv_cache=True,
|
|
)
|
|
|
|
results = []
|
|
for i in range(len(past_key_values)):
|
|
results.append(past_key_values[i]['key_states'])
|
|
results.append(past_key_values[i]['value_states'])
|
|
return results
|
|
|
|
|
|
class BPUDenoise(nn.Module):
|
|
"""Wrapper for denoising model."""
|
|
|
|
def __init__(self, policy: SmolVLAPolicy):
|
|
super().__init__()
|
|
self.policy = policy
|
|
|
|
def forward(
|
|
self,
|
|
prefix_pad_masks: torch.Tensor,
|
|
x_t: torch.Tensor,
|
|
expanded_time: torch.Tensor,
|
|
k_0: torch.Tensor,
|
|
v_0: torch.Tensor,
|
|
k_1: torch.Tensor,
|
|
v_1: torch.Tensor,
|
|
k_2: torch.Tensor,
|
|
v_2: torch.Tensor,
|
|
k_3: torch.Tensor,
|
|
v_3: torch.Tensor,
|
|
k_4: torch.Tensor,
|
|
v_4: torch.Tensor,
|
|
k_5: torch.Tensor,
|
|
v_5: torch.Tensor,
|
|
k_6: torch.Tensor,
|
|
v_6: torch.Tensor,
|
|
k_7: torch.Tensor,
|
|
v_7: torch.Tensor,
|
|
k_8: torch.Tensor,
|
|
v_8: torch.Tensor,
|
|
k_9: torch.Tensor,
|
|
v_9: torch.Tensor,
|
|
k_10: torch.Tensor,
|
|
v_10: torch.Tensor,
|
|
k_11: torch.Tensor,
|
|
v_11: torch.Tensor,
|
|
k_12: torch.Tensor,
|
|
v_12: torch.Tensor,
|
|
k_13: torch.Tensor,
|
|
v_13: torch.Tensor,
|
|
k_14: torch.Tensor,
|
|
v_14: torch.Tensor,
|
|
k_15: torch.Tensor,
|
|
v_15: torch.Tensor,
|
|
) -> torch.Tensor:
|
|
"""Perform one denoising step."""
|
|
key_states = [
|
|
k_0,
|
|
k_1,
|
|
k_2,
|
|
k_3,
|
|
k_4,
|
|
k_5,
|
|
k_6,
|
|
k_7,
|
|
k_8,
|
|
k_9,
|
|
k_10,
|
|
k_11,
|
|
k_12,
|
|
k_13,
|
|
k_14,
|
|
k_15,
|
|
]
|
|
value_states = [
|
|
v_0,
|
|
v_1,
|
|
v_2,
|
|
v_3,
|
|
v_4,
|
|
v_5,
|
|
v_6,
|
|
v_7,
|
|
v_8,
|
|
v_9,
|
|
v_10,
|
|
v_11,
|
|
v_12,
|
|
v_13,
|
|
v_14,
|
|
v_15,
|
|
]
|
|
|
|
past_key_values = {
|
|
i: {"key_states": key_states[i], "value_states": value_states[i]}
|
|
for i in range(len(self.policy.model.vlm_with_expert.lm_expert.layers))
|
|
}
|
|
|
|
v_t = self.policy.model.denoise_step(
|
|
prefix_pad_masks,
|
|
past_key_values,
|
|
x_t,
|
|
expanded_time,
|
|
)
|
|
return v_t
|
|
|
|
|
|
class DirectoryManager:
|
|
"""Manages export directory structure."""
|
|
|
|
def __init__(self, export_path: str, num_layers: int):
|
|
self.export_path = Path(export_path)
|
|
self.num_layers = num_layers
|
|
self._create_directories()
|
|
|
|
def _create_directories(self):
|
|
"""Create all necessary directories."""
|
|
# Main directories
|
|
(self.export_path / ALL_OUTPUT_NAME).mkdir(parents=True, exist_ok=True)
|
|
(self.export_path / TEST_DATA_NAME).mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create workspace directories for each model
|
|
self._create_model_workspace(STATE_PROJ_NAME, ["state"], ["state_output"])
|
|
kv_output_names = []
|
|
for i in range(self.num_layers):
|
|
kv_output_names.append(f"k_{i}")
|
|
kv_output_names.append(f"v_{i}")
|
|
self._create_model_workspace(
|
|
VLM_EXPERT_MODEL_NAME,
|
|
["prefix_att_2d_masks", "prefix_position_ids", "prefix_embs"],
|
|
kv_output_names,
|
|
)
|
|
|
|
# Action expert inputs
|
|
action_inputs = ["prefix_pad_masks", "x_t", "expanded_time"]
|
|
action_inputs.extend(kv_output_names)
|
|
self._create_model_workspace(
|
|
ACTION_EXPERT_MODEL_NAME, action_inputs, ["x_t_output"]
|
|
)
|
|
|
|
self._create_model_workspace(
|
|
VISION_ENCODER_NAME, ["pixel_values"], ["hidden_state"]
|
|
)
|
|
|
|
def _create_model_workspace(
|
|
self, model_name: str, input_names: List[str], output_names: List[str]
|
|
):
|
|
"""Create workspace structure for a model."""
|
|
ws_path = self.export_path / f"{model_name}_ws"
|
|
ws_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Calibration directories
|
|
cal_path = ws_path / "cal"
|
|
cal_path.mkdir(exist_ok=True)
|
|
for name in input_names:
|
|
(cal_path / name).mkdir(exist_ok=True)
|
|
|
|
# Output directories
|
|
output_path = ws_path / "output"
|
|
output_path.mkdir(exist_ok=True)
|
|
for name in output_names:
|
|
(output_path / name).mkdir(exist_ok=True)
|
|
|
|
|
|
|
|
class ConfigGenerator:
|
|
"""Generates YAML configuration and bash scripts."""
|
|
|
|
@staticmethod
|
|
def generate_yaml_config(
|
|
model_name: str,
|
|
input_names: List[str],
|
|
march: str,
|
|
jobs: int,
|
|
debug: str,
|
|
extra_node_config: Dict[str, Dict] = None,
|
|
) -> str:
|
|
"""Generate YAML configuration for model compilation."""
|
|
input_name_str = ";".join(input_names) + ";"
|
|
layout_str = "NCHW;" * len(input_names)
|
|
type_str = "featuremap;" * len(input_names)
|
|
norm_type_str = "no_preprocess;" * len(input_names)
|
|
cal_data_dir_str = ";".join([f"./cal/{name}" for name in input_names]) + ";"
|
|
|
|
node_config_str = ""
|
|
if extra_node_config:
|
|
node_config_items = ",\n".join(
|
|
f' "{node}": {{"qtype": "float32"}}'
|
|
for node in extra_node_config
|
|
)
|
|
node_config_str = f"""
|
|
"node_config": {{
|
|
{node_config_items}
|
|
}}"""
|
|
|
|
yaml_content = f"""model_parameters:
|
|
onnx_model: {model_name}.onnx
|
|
march: {march}
|
|
layer_out_dump: False
|
|
working_dir: bpu_output
|
|
output_model_file_prefix: {model_name}_featuremaps
|
|
enable_vpu: True
|
|
input_parameters:
|
|
input_name: {input_name_str}
|
|
input_layout_rt: {layout_str}
|
|
input_layout_train: {layout_str}
|
|
input_type_rt: {type_str}
|
|
input_type_train: {type_str}
|
|
norm_type: {norm_type_str}
|
|
calibration_parameters:
|
|
cal_data_dir: '{cal_data_dir_str}'
|
|
quant_config: {{
|
|
"model_config": {{
|
|
"all_node_type": "int16",
|
|
"model_output_type": "float32",
|
|
"activation": {{
|
|
"calibration_type": ["max"],
|
|
"num_bin": [1024, 2048, 4096],
|
|
"max_num_bin": 16384,
|
|
"max_percentile": 1.0,
|
|
"per_channel": true,
|
|
"asymmetric": [true]
|
|
}},
|
|
"weight": {{
|
|
"bias_correction": {{
|
|
"metric": "mae"
|
|
}}
|
|
}},
|
|
"modelwise_search": {{
|
|
"metric": "mae"
|
|
}}
|
|
}},
|
|
"model_config": {{
|
|
"all_node_type": "int16",
|
|
"model_output_type": "int16",
|
|
}},
|
|
"op_config": {{
|
|
"ReduceMean": {{"qtype": "int16"}},
|
|
"Sub": {{"qtype": "int16"}},
|
|
"Softmax": {{"qtype": "int16"}}
|
|
}},{node_config_str}
|
|
}}
|
|
compiler_parameters:
|
|
extra_params: {{'input_no_padding': True, 'output_no_padding': True}}
|
|
jobs: {jobs}
|
|
compile_mode: 'latency'
|
|
debug: {debug}
|
|
advice: 1
|
|
optimize_level: 'O2'
|
|
"""
|
|
return yaml_content
|
|
|
|
@staticmethod
|
|
def generate_bash_script(model_name: str) -> str:
|
|
"""Generate bash build script."""
|
|
return f"""hb_compile --config config.yaml
|
|
chmod 777 ./*
|
|
chmod 777 ./*/*
|
|
chmod 777 ./*/*/*
|
|
cp bpu_output/{model_name}_featuremaps.hbm ../{ALL_OUTPUT_NAME}
|
|
"""
|
|
|
|
|
|
|
|
class SmolVLAExporter:
|
|
"""Main exporter class for SmolVLA models."""
|
|
|
|
def __init__(
|
|
self,
|
|
model_path: str,
|
|
dataset_path: str,
|
|
export_path: str,
|
|
device: str = DEFAULT_DEVICE,
|
|
num_calibration_samples: int = DEFAULT_NUM_CALIBRATION_SAMPLES,
|
|
march: str = "nash-m",
|
|
jobs: int = 32,
|
|
debug: str = "False",
|
|
):
|
|
self.model_path = model_path
|
|
self.dataset_path = dataset_path
|
|
self.export_path = Path(export_path)
|
|
self.device = torch.device(device)
|
|
self.num_calibration_samples = num_calibration_samples
|
|
self.march = march
|
|
self.jobs = jobs
|
|
self.debug = debug
|
|
|
|
# Load policy and dataset
|
|
self.policy = self._load_policy()
|
|
self.dataset = self._load_dataset()
|
|
self.data_indices = self._select_calibration_samples()
|
|
|
|
# Setup directories
|
|
num_layers = len(self.policy.model.vlm_with_expert.lm_expert.layers)
|
|
self.dir_manager = DirectoryManager(export_path, num_layers)
|
|
|
|
# Define input/output names
|
|
self.input_names_kv = [
|
|
"prefix_att_2d_masks",
|
|
"prefix_position_ids",
|
|
"prefix_embs",
|
|
]
|
|
self.output_names_kv = []
|
|
for i in range(num_layers):
|
|
self.output_names_kv.append(f"k_{i}")
|
|
self.output_names_kv.append(f"v_{i}")
|
|
self.input_names_denoise = ["prefix_pad_masks", "x_t", "expanded_time"]
|
|
self.input_names_denoise.extend(self.output_names_kv)
|
|
self.output_names_denoise = ["x_t_output"]
|
|
|
|
def _load_policy(self) -> SmolVLAPolicy:
|
|
"""Load pretrained policy model."""
|
|
policy = SmolVLAPolicy.from_pretrained(self.model_path)
|
|
return policy.to(self.device).float().eval()
|
|
|
|
def _load_dataset(self) -> LeRobotDataset:
|
|
"""Load LeRobot dataset."""
|
|
dataset = LeRobotDataset(repo_id="Foo/Bar", root=self.dataset_path)
|
|
print(f"Dataset loaded: {len(dataset)} samples")
|
|
return dataset
|
|
|
|
def _select_calibration_samples(self) -> List[int]:
|
|
"""Randomly select calibration samples."""
|
|
return random.sample(range(len(self.dataset)), self.num_calibration_samples)
|
|
|
|
def export_language_embedding(self):
|
|
"""Export language embedding matrix."""
|
|
embedding_matrix = (
|
|
self.policy.model.vlm_with_expert.vlm.model.text_model.embed_tokens.weight.detach()
|
|
.cpu()
|
|
.float()
|
|
.numpy()
|
|
)
|
|
print(
|
|
f"Language embedding shape: {embedding_matrix.shape}, dtype: {embedding_matrix.dtype}"
|
|
)
|
|
|
|
output_path = self.export_path / ALL_OUTPUT_NAME / f"{VLM_EXPERT_EMBEDDING}.npy"
|
|
np.save(output_path, embedding_matrix)
|
|
|
|
def export_normalization_params(self):
|
|
"""Export state normalization/unnormalization parameters."""
|
|
params = {
|
|
"normalize_inputs.mean": self.policy.normalize_inputs.buffer_observation_state.mean.data.detach().cpu(),
|
|
"normalize_inputs.std": self.policy.normalize_inputs.buffer_observation_state.std.data.detach().cpu(),
|
|
"unnormalize_outputs.mean": self.policy.unnormalize_outputs.buffer_action.mean.data.detach().cpu(),
|
|
"unnormalize_outputs.std": self.policy.unnormalize_outputs.buffer_action.std.data.detach().cpu(),
|
|
}
|
|
|
|
output_path = self.export_path / ALL_OUTPUT_NAME / f"{STATE_NORM_NAME}.pt"
|
|
torch.save(params, output_path)
|
|
|
|
def export_denoise_model(self):
|
|
"""Export denoising model to ONNX."""
|
|
# Prepare sample data
|
|
sample_data = self.dataset[0]
|
|
obs = self._prepare_observation(sample_data)
|
|
batch = self.policy.normalize_inputs(copy.deepcopy(obs))
|
|
|
|
# Prepare inputs
|
|
images, img_masks = self.policy.prepare_images(batch)
|
|
state = self.policy.prepare_state(batch)
|
|
lang_tokens, lang_masks = self.policy.prepare_language(batch)
|
|
|
|
# Generate prefix embeddings
|
|
prefix_embs, prefix_pad_masks, prefix_att_masks = (
|
|
self.policy.model.embed_prefix(
|
|
images, img_masks, lang_tokens, lang_masks, state=state
|
|
)
|
|
)
|
|
prefix_att_2d_masks = make_att_2d_masks(prefix_pad_masks, prefix_att_masks)
|
|
prefix_position_ids = torch.cumsum(prefix_pad_masks, dim=1) - 1
|
|
|
|
# Generate KV cache
|
|
m_kv = BPUKVCache(self.policy)
|
|
with torch.no_grad():
|
|
kv_caches = m_kv(prefix_att_2d_masks, prefix_position_ids, prefix_embs)
|
|
|
|
# Prepare denoising inputs
|
|
noise = self._generate_noise(batch_size=1)
|
|
m_denoise = BPUDenoise(self.policy)
|
|
m_denoise.eval()
|
|
|
|
dt = -1.0 / self.policy.model.config.num_steps
|
|
dt = torch.tensor(dt, dtype=torch.float32, device=self.device)
|
|
x_t = noise
|
|
time = torch.tensor(1.0, dtype=torch.float32, device=self.device)
|
|
while time >= -dt / 2:
|
|
expanded_time = time.expand(1)
|
|
input_tensors = (prefix_pad_masks, x_t, expanded_time, *kv_caches)
|
|
with torch.no_grad():
|
|
v_t = m_denoise(*input_tensors)
|
|
x_t += dt * v_t
|
|
time += dt
|
|
|
|
onnx_path = (
|
|
self.export_path
|
|
/ f"{ACTION_EXPERT_MODEL_NAME}_ws"
|
|
/ f"{ACTION_EXPERT_MODEL_NAME}.onnx"
|
|
)
|
|
torch.onnx.export(
|
|
m_denoise,
|
|
input_tensors,
|
|
onnx_path,
|
|
export_params=True,
|
|
opset_version=19,
|
|
do_constant_folding=True,
|
|
input_names=self.input_names_denoise,
|
|
output_names=self.output_names_denoise,
|
|
dynamic_axes=None,
|
|
dynamo=False,
|
|
)
|
|
print(f"Denoising model exported to {onnx_path}")
|
|
|
|
def prepare_calibration_data(self):
|
|
"""Prepare calibration data for all models."""
|
|
m_kv = BPUKVCache(self.policy)
|
|
m_denoise = BPUDenoise(self.policy)
|
|
|
|
kv_cnt = 0
|
|
denoise_cnt = 0
|
|
|
|
for idx in tqdm(self.data_indices, desc="Preparing calibration data"):
|
|
sample_data = self.dataset[idx]
|
|
obs = self._prepare_observation(sample_data)
|
|
batch = self.policy.normalize_inputs(copy.deepcopy(obs))
|
|
|
|
# Prepare inputs
|
|
images, img_masks = self.policy.prepare_images(batch)
|
|
state = self.policy.prepare_state(batch)
|
|
lang_tokens, lang_masks = self.policy.prepare_language(batch)
|
|
|
|
# Generate prefix embeddings
|
|
prefix_embs, prefix_pad_masks, prefix_att_masks = (
|
|
self.policy.model.embed_prefix(
|
|
images, img_masks, lang_tokens, lang_masks, state=state
|
|
)
|
|
)
|
|
prefix_att_2d_masks = make_att_2d_masks(prefix_pad_masks, prefix_att_masks)
|
|
prefix_position_ids = torch.cumsum(prefix_pad_masks, dim=1) - 1
|
|
|
|
# Save KV cache inputs and outputs
|
|
self._save_calibration_tensors(
|
|
[prefix_att_2d_masks, prefix_position_ids, prefix_embs],
|
|
self.input_names_kv,
|
|
VLM_EXPERT_MODEL_NAME,
|
|
"cal",
|
|
kv_cnt,
|
|
)
|
|
|
|
with torch.no_grad():
|
|
kv_caches = m_kv(prefix_att_2d_masks, prefix_position_ids, prefix_embs)
|
|
|
|
self._save_calibration_tensors(
|
|
kv_caches, self.output_names_kv, VLM_EXPERT_MODEL_NAME, "output", kv_cnt
|
|
)
|
|
kv_cnt += 1
|
|
|
|
# Diffusion loop for denoising calibration
|
|
noise = self._generate_noise(batch_size=1)
|
|
dt = -1.0 / self.policy.model.config.num_steps
|
|
dt = torch.tensor(dt, dtype=torch.float32, device=self.device)
|
|
x_t = noise
|
|
time = torch.tensor(1.0, dtype=torch.float32, device=self.device)
|
|
|
|
if kv_cnt % 5 == 0: # 保留所有trace作为校准数据
|
|
while time >= -dt / 2:
|
|
expanded_time = time.expand(1)
|
|
input_tensors = (prefix_pad_masks, x_t, expanded_time, *kv_caches)
|
|
with torch.no_grad():
|
|
v_t = m_denoise(*input_tensors)
|
|
self._save_calibration_tensors(
|
|
input_tensors, self.input_names_denoise,
|
|
ACTION_EXPERT_MODEL_NAME, "cal", denoise_cnt,
|
|
)
|
|
self._save_calibration_tensors(
|
|
[v_t], self.output_names_denoise,
|
|
ACTION_EXPERT_MODEL_NAME, "output", denoise_cnt,
|
|
)
|
|
x_t += dt * v_t
|
|
time += dt
|
|
denoise_cnt += 1
|
|
else: # 保留一半作为校准数据
|
|
cnt_ls = 0
|
|
while time >= -dt / 2:
|
|
expanded_time = time.expand(1)
|
|
input_tensors = (prefix_pad_masks, x_t, expanded_time, *kv_caches)
|
|
with torch.no_grad():
|
|
v_t = m_denoise(*input_tensors)
|
|
cnt_ls += 1
|
|
if cnt_ls % 3 == 0:
|
|
self._save_calibration_tensors(
|
|
input_tensors, self.input_names_denoise,
|
|
ACTION_EXPERT_MODEL_NAME, "cal", denoise_cnt,
|
|
)
|
|
self._save_calibration_tensors(
|
|
[v_t], self.output_names_denoise,
|
|
ACTION_EXPERT_MODEL_NAME, "output", denoise_cnt,
|
|
)
|
|
denoise_cnt += 1
|
|
x_t += dt * v_t
|
|
time += dt
|
|
|
|
print(
|
|
f"Calibration data prepared: {kv_cnt} KV samples, {denoise_cnt} denoise samples"
|
|
)
|
|
|
|
def generate_config_files(self):
|
|
"""Generate YAML configs and bash scripts for compilation."""
|
|
config_gen = ConfigGenerator()
|
|
|
|
# VLM expert config
|
|
yaml_content = config_gen.generate_yaml_config(
|
|
VLM_EXPERT_MODEL_NAME,
|
|
self.input_names_kv,
|
|
self.march,
|
|
self.jobs,
|
|
self.debug,
|
|
)
|
|
self._write_config_files(VLM_EXPERT_MODEL_NAME, yaml_content)
|
|
|
|
# Action expert config with extra node configuration
|
|
extra_node_config = [
|
|
"/Unsqueeze",
|
|
"/Mul",
|
|
"/Cos",
|
|
"/Sin",
|
|
"/Concat",
|
|
"/Cast",
|
|
"/Unsqueeze_1",
|
|
]
|
|
yaml_content = config_gen.generate_yaml_config(
|
|
ACTION_EXPERT_MODEL_NAME,
|
|
self.input_names_denoise,
|
|
self.march,
|
|
self.jobs,
|
|
self.debug,
|
|
extra_node_config,
|
|
)
|
|
self._write_config_files(ACTION_EXPERT_MODEL_NAME, yaml_content)
|
|
|
|
def _prepare_observation(self, data: Dict) -> Dict[str, Any]:
|
|
"""Prepare observation dictionary from dataset sample."""
|
|
obs = {
|
|
"instruction": data["task"],
|
|
"task": data["task"],
|
|
"observation.images.cam_high": data["observation.images.cam_high"]
|
|
.unsqueeze(0)
|
|
.to(self.device),
|
|
"observation.images.cam_left_wrist": data[
|
|
"observation.images.cam_left_wrist"
|
|
]
|
|
.unsqueeze(0)
|
|
.to(self.device),
|
|
"observation.images.cam_right_wrist": data[
|
|
"observation.images.cam_right_wrist"
|
|
]
|
|
.unsqueeze(0)
|
|
.to(self.device),
|
|
"observation.state": data["action"].unsqueeze(0).to(self.device),
|
|
}
|
|
return obs
|
|
|
|
def _generate_noise(self, batch_size: int) -> torch.Tensor:
|
|
"""Generate noise tensor for diffusion."""
|
|
actions_shape = (
|
|
batch_size,
|
|
self.policy.model.config.chunk_size,
|
|
self.policy.model.config.max_action_dim,
|
|
)
|
|
return torch.normal(
|
|
mean=0.0,
|
|
std=1.0,
|
|
size=actions_shape,
|
|
dtype=torch.float32,
|
|
device=self.device,
|
|
)
|
|
|
|
def _save_calibration_tensors(
|
|
self,
|
|
tensors: List[torch.Tensor],
|
|
names: List[str],
|
|
model_name: str,
|
|
subdir: str,
|
|
index: int,
|
|
):
|
|
"""Save calibration tensors to disk."""
|
|
for tensor, name in zip(tensors, names):
|
|
output_path = (
|
|
self.export_path / f"{model_name}_ws" / subdir / name / f"{index}.npy"
|
|
)
|
|
np.save(output_path, tensor.detach().cpu().numpy())
|
|
|
|
def _write_config_files(self, model_name: str, yaml_content: str):
|
|
"""Write YAML config and bash script for a model."""
|
|
ws_path = self.export_path / f"{model_name}_ws"
|
|
|
|
# Write YAML config
|
|
yaml_path = ws_path / "config.yaml"
|
|
with open(yaml_path, "w", encoding="utf-8") as f:
|
|
f.write(yaml_content)
|
|
|
|
# Write bash script
|
|
bash_content = ConfigGenerator.generate_bash_script(model_name)
|
|
bash_path = ws_path / "build.bash"
|
|
with open(bash_path, "w", encoding="utf-8") as f:
|
|
f.write(bash_content)
|
|
|
|
print(f"Config files written for {model_name}")
|
|
|
|
def export_all(self):
|
|
"""Run complete export pipeline."""
|
|
print("Starting SmolVLA export...")
|
|
print(f"Export path: {self.export_path}")
|
|
|
|
print("\n[1/5] Exporting language embedding...")
|
|
self.export_language_embedding()
|
|
|
|
print("\n[2/5] Exporting normalization parameters...")
|
|
self.export_normalization_params()
|
|
|
|
print("\n[3/5] Exporting denoising model...")
|
|
self.export_denoise_model()
|
|
|
|
print("\n[4/5] Preparing calibration data...")
|
|
self.prepare_calibration_data()
|
|
|
|
print("\n[5/5] Generating configuration files...")
|
|
self.generate_config_files()
|
|
|
|
print("\n✓ Export completed successfully!")
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|