2025-11-12 00:59:35 +08:00

466 lines
21 KiB
Python

import logging
import os
import sys
import shutil
import cv2
import numpy as np
import torch
import torch.nn as nn
import argparse
import onnx
import json
import yaml
from copy import deepcopy
from termcolor import colored
from onnxsim import simplify
from pprint import pformat
import time
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.datasets.factory import make_dataset
from lerobot.utils.utils import get_safe_torch_device, init_logging
from lerobot.configs import parser
from lerobot.configs.train import TrainPipelineConfig
_global_config = None
BPU_VisionEncoder = "BPU_ACTPolicy_VisionEncoder"
BPU_TransformerLayers = "BPU_ACTPolicy_TransformerLayers"
def onnx_sim(onnx_path, onnx_sim):
if onnx_sim:
model_onnx = onnx.load(onnx_path) # load onnx model
onnx.checker.check_model(model_onnx) # check onnx model
model_onnx, check = simplify(
model_onnx,
dynamic_input_shape=False,
input_shapes=None)
assert check, 'assert check failed'
onnx.save(model_onnx, onnx_path)
def load_config(config_path):
# 根据文件扩展名选择加载方式
with open(config_path, 'r', encoding='utf-8') as f:
config_dict = json.load(f)
args = []
if 'export' in config_dict:
export_cfg = config_dict['export']
if 'repo_id' in export_cfg:
args.extend(['--dataset.repo_id', str(export_cfg['repo_id'])])
if 'dataset_path' in export_cfg:
args.extend(['--dataset.root', str(export_cfg['dataset_path'])])
args.extend(['--policy.type', 'act'])
args.extend(['--policy.device', 'cpu' if 'gpu_id' not in config_dict else f"cuda"])
args.extend(['--policy.repo_id', str(export_cfg['repo_id'])])
# 使用 opencv 作为视频后端,避免 torchcodec 需要 FFmpeg 的问题
args.extend(['--dataset.video_backend', 'pyav'])
args.extend(['--wandb.enable', 'false'])
# 保留原始的脚本名称作为 sys.argv[0],然后添加参数
sys.argv = [sys.argv[0]] + args
logging.info(f"Loaded config from {config_path}")
logging.info(f"Config: {sys.argv}")
return config_dict
return None
class BPU_ACTPolicy_VisionEncoder(nn.Module):
def __init__(self, act_policy):
super().__init__()
self.backbone = deepcopy(act_policy.model.backbone)
self.encoder_img_feat_input_proj = deepcopy(act_policy.model.encoder_img_feat_input_proj)
def forward(self, images):
cam_features = self.backbone(images)["feature_map"]
cam_features = self.encoder_img_feat_input_proj(cam_features)
cam_features = cam_features
return cam_features
class BPU_ACTPolicy_TransformerLayers(nn.Module):
def __init__(self, act_policy, camera_names):
super().__init__()
self.model = deepcopy(act_policy.model)
self.camera_names = camera_names
def forward(self, states, *vision_features):
latent_sample = torch.zeros([1, self.model.config.latent_dim], dtype=torch.float32)
encoder_in_tokens = [self.model.encoder_latent_input_proj(latent_sample)]
encoder_in_pos_embed = self.model.encoder_1d_feature_pos_embed.weight.unsqueeze(1).unbind(dim=0)
encoder_in_tokens.append(self.model.encoder_robot_state_input_proj(states))
all_cam_features = []
all_cam_pos_embeds = []
# 动态处理所有相机的视觉特征
for vision_feature in vision_features:
cam_pos_embed = self.model.encoder_cam_feat_pos_embed(vision_feature)
all_cam_features.append(vision_feature)
all_cam_pos_embeds.append(cam_pos_embed)
tokens = []
for token in encoder_in_tokens:
tokens.append(token.view(1,1,self.model.config.dim_model))
all_cam_features = torch.cat(all_cam_features, axis=-1).permute(2, 3, 0, 1).view(-1,1,self.model.config.dim_model)
tokens.append(all_cam_features)
encoder_in_tokens = torch.cat(tokens, axis=0)
pos_embeds = []
for pos_embed in encoder_in_pos_embed:
pos_embeds.append(pos_embed.view(1,1,self.model.config.dim_model))
all_cam_pos_embeds = torch.cat(all_cam_pos_embeds, axis=-1).permute(2, 3, 0, 1).view(-1,1,self.model.config.dim_model)
pos_embeds.append(all_cam_pos_embeds)
encoder_in_pos_embed = torch.cat(pos_embeds, axis=0)
encoder_out = self.model.encoder(encoder_in_tokens, pos_embed=encoder_in_pos_embed)
decoder_in = torch.zeros(
(self.model.config.chunk_size, 1, self.model.config.dim_model),
dtype=encoder_in_pos_embed.dtype,
device=encoder_in_pos_embed.device,
)
decoder_out = self.model.decoder(
decoder_in,
encoder_out,
encoder_pos_embed=encoder_in_pos_embed,
decoder_pos_embed=self.model.decoder_pos_embed.weight.unsqueeze(1),
)
decoder_out = decoder_out.transpose(0, 1)
actions = self.model.action_head(decoder_out)
return actions
def lerobotTensor2cvmat(tensor):
img = (tensor*255).permute(0, 2, 3, 1).cpu().numpy().astype(np.uint8)[0,:,:,:]
# img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
return img
def onnx_sim(onnx_path, onnx_sim):
if onnx_sim:
model_onnx = onnx.load(onnx_path) # load onnx model
onnx.checker.check_model(model_onnx) # check onnx model
model_onnx, check = simplify(
model_onnx,
dynamic_input_shape=False,
input_shapes=None)
assert check, 'assert check failed'
onnx.save(model_onnx, onnx_path)
@parser.wrap()
def main(cfg: TrainPipelineConfig):
# LeRobot的参数列表
# 跳过validate(),配置不是用于训练,而是用于导出
# cfg.validate()
logging.info(pformat(cfg.to_dict()))
# BPU导出参数 - 从全局配置或命令行读取
global _global_config
class BPUOptions:
act_path = _global_config['export']['model_path']
export_path = _global_config['export']['output_path'] + "/" + _global_config['task_id']
cal_num = _global_config['export']['calibration_num']
onnx_sim = True
combine_jobs = 6
opt = BPUOptions()
if _global_config:
opt.act_path = _global_config['export']['model_path']
opt.export_path = _global_config['export']['output_path'] + "/" + _global_config['task_id']
opt.cal_num = _global_config['export']['calibration_num']
opt.onnx_sim = True
opt.march = _global_config['export']['march']
opt.combine_jobs = 6
logging.info("BPU parameters loaded from config file")
logging.info("="*80)
logging.info(colored("BPU Export Configuration:", 'light_cyan'))
logging.info(f" ACT Model Path: {opt.act_path}")
logging.info(f" Export Path: {opt.export_path}")
logging.info(f" Calibration Samples: {opt.cal_num}")
logging.info(f" ONNX Simplify: {opt.onnx_sim}")
logging.info(f" March: {opt.march}")
logging.info(f" Compiler Jobs: {opt.combine_jobs}")
logging.info(f" Dataset Root: {cfg.dataset.root}")
logging.info("="*80)
if not os.path.exists(opt.export_path):
os.makedirs(opt.export_path)
visionEncoder_ws = os.path.join(opt.export_path, BPU_VisionEncoder)
transformersLayers_ws = os.path.join(opt.export_path, BPU_TransformerLayers)
onnx_name_BPU_ACTPolicy_VisionEncoder = BPU_VisionEncoder + ".onnx"
onnx_path_BPU_ACTPolicy_VisionEncoder = os.path.join(visionEncoder_ws, onnx_name_BPU_ACTPolicy_VisionEncoder)
onnx_name_BPU_ACTPolicy_TransformerLayers = BPU_TransformerLayers + ".onnx"
onnx_path_BPU_ACTPolicy_TransformerLayers = os.path.join(transformersLayers_ws, onnx_name_BPU_ACTPolicy_TransformerLayers)
## 导出校准文件路径
calbrate_data_name_BPU_ACTPolicy_VisionEncoder = "calibration_data_" + BPU_VisionEncoder
calbrate_data_path_BPU_ACTPolicy_VisionEncoder = os.path.join(visionEncoder_ws, calbrate_data_name_BPU_ACTPolicy_VisionEncoder)
calbrate_data_name_BPU_ACTPolicy_TransformerLayers = "calibration_data_" + BPU_TransformerLayers
calbrate_data_path_BPU_ACTPolicy_TransformerLayers = os.path.join(transformersLayers_ws, calbrate_data_name_BPU_ACTPolicy_TransformerLayers)
state_calbrate_data_path_BPU_ACTPolicy_TransformerLayers = os.path.join(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, "state")
## 发布文件夹的脚本路径
bpu_output_name = "bpu_output"
bpu_output_path = os.path.join(opt.export_path, bpu_output_name)
bash_build_all_path = os.path.join(opt.export_path, "build_all.sh")
## 前后处理参数文件路径
action_std_path = os.path.join(bpu_output_path, "action_std.npy")
action_mean_path = os.path.join(bpu_output_path, "action_mean.npy")
action_std_unnormalize_path = os.path.join(bpu_output_path, "action_std_unnormalize.npy")
action_mean_unnormalize_path = os.path.join(bpu_output_path, "action_mean_unnormalize.npy")
## 新建工作目录
os.makedirs(visionEncoder_ws, exist_ok=True)
logging.info(colored(f"mkdir: {visionEncoder_ws} Success.", 'green'))
os.makedirs(transformersLayers_ws, exist_ok=True)
logging.info(colored(f"mkdir: {transformersLayers_ws} Success.", 'green'))
os.makedirs(calbrate_data_path_BPU_ACTPolicy_VisionEncoder, exist_ok=True)
logging.info(colored(f"mkdir: {calbrate_data_path_BPU_ACTPolicy_VisionEncoder} Success.", 'green'))
os.makedirs(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, exist_ok=True)
logging.info(colored(f"mkdir: {calbrate_data_path_BPU_ACTPolicy_TransformerLayers} Success.", 'green'))
os.makedirs(state_calbrate_data_path_BPU_ACTPolicy_TransformerLayers, exist_ok=True)
logging.info(colored(f"mkdir: {state_calbrate_data_path_BPU_ACTPolicy_TransformerLayers} Success.", 'green'))
os.makedirs(bpu_output_path, exist_ok=True)
logging.info(colored(f"mkdir: {bpu_output_path} Success.", 'green'))
policy = ACTPolicy.from_pretrained(opt.act_path).cpu().eval()
logging.info(colored(f"Load ACT Policy Model: {opt.act_path} Success.", 'light_red'))
device = get_safe_torch_device(cfg.policy.device, log=True)
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
# 加载数据集
dataset = make_dataset(cfg)
dataloader = torch.utils.data.DataLoader(
dataset,
num_workers=0,
batch_size=1,
shuffle=True,
sampler=None,
pin_memory=device.type != "cpu",
drop_last=False,
)
logging.info(colored(f"Load ACT Policy Dataset: \n{dataset} Success.", 'light_red'))
batch = next(iter(dataloader))
image_keys = [key for key in batch.keys() if key.startswith('observation.images.')]
camera_names = [key.split('.')[-1] for key in image_keys]
logging.info(colored(f"Camera Names: {camera_names} Success.", 'light_red'))
logging.info(colored(f"Image Keys: {image_keys} Success.", 'light_red'))
logging.info(colored(f"Batch: {batch} Success.", 'light_red'))
outputs = policy.select_action(deepcopy(batch))
## 动态获取前后处理参数
# 为每个相机保存归一化参数
for camera_name in camera_names:
buffer_name = f"buffer_observation_images_{camera_name}"
if hasattr(policy.normalize_inputs, buffer_name):
buffer = getattr(policy.normalize_inputs, buffer_name)
camera_std = buffer.std.data.detach().cpu().numpy()
camera_mean = buffer.mean.data.detach().cpu().numpy()
camera_std_path = os.path.join(bpu_output_path, f"{camera_name}_std.npy")
camera_mean_path = os.path.join(bpu_output_path, f"{camera_name}_mean.npy")
np.save(camera_std_path, camera_std)
np.save(camera_mean_path, camera_mean)
logging.info(f"Saved {camera_name} normalization parameters")
# 保存状态和动作归一化参数
action_std = policy.normalize_inputs.buffer_observation_state.std.data.detach().cpu().numpy()
action_mean = policy.normalize_inputs.buffer_observation_state.mean.data.detach().cpu().numpy()
action_std_unnormalize = policy.unnormalize_outputs.buffer_action.std.data.detach().cpu().numpy()
action_mean_unnormalize = policy.unnormalize_outputs.buffer_action.mean.data.detach().cpu().numpy()
np.save(action_std_path, action_std)
np.save(action_mean_path, action_mean)
np.save(action_std_unnormalize_path, action_std_unnormalize)
np.save(action_mean_unnormalize_path, action_mean_unnormalize)
## Vision Encoder
batch = policy.normalize_inputs(batch)
m_VisionEncoder = BPU_ACTPolicy_VisionEncoder(policy)
m_VisionEncoder.eval()
# 动态获取相机视觉特征
vision_features = []
for camera_name in camera_names:
input_tensor = batch[f'observation.images.{camera_name}']
vision_feature = m_VisionEncoder(input_tensor)
vision_features.append(vision_feature)
logging.info(f"Generated vision features for {camera_name}: {vision_feature.shape}")
# 确定ONNX版本
opset_version = 11 if "bayes" in opt.march else 19
logging.info(f"Using ONNX opset version: {opset_version} for type: {opt.march}")
onnx_path = onnx_path_BPU_ACTPolicy_VisionEncoder
torch.onnx.export(
m_VisionEncoder, # 要转换的模型
input_tensor, # 模型的输入
onnx_path, # 输出文件名
export_params=True, # 存储训练后的参数
opset_version=opset_version, # 动态ONNX版本
do_constant_folding=True, # 是否执行常量折叠优化
input_names=['images'], # 输入节点名称
output_names=['Vision_Features'], # 输出节点名称
dynamic_axes=None
)
onnx_sim(onnx_path, opt.onnx_sim)
logging.info(colored(f"Export {onnx_path} Success.", 'green'))
m_TransformerLayers = BPU_ACTPolicy_TransformerLayers(policy, camera_names)
m_TransformerLayers.eval()
state = batch["observation.state"]
actions = m_TransformerLayers(state, *vision_features)
# np.save(f"new_actions.npy", actions.detach().cpu().numpy())
input_names = ['states'] + [f'{camera_name}_features' for camera_name in camera_names]
logging.info(f"Transformer input names: {input_names}")
onnx_path = onnx_path_BPU_ACTPolicy_TransformerLayers
torch.onnx.export(
m_TransformerLayers, # 要转换的模型
(state, *vision_features), # 模型的输入
onnx_path, # 输出文件名
export_params=True, # 存储训练后的参数
opset_version=opset_version, # 动态ONNX版本
do_constant_folding=True, # 是否执行常量折叠优化
input_names=input_names, # 动态输入节点名称
output_names=['Actions'], # 输出节点名称
dynamic_axes=None
)
onnx_sim(onnx_path, opt.onnx_sim)
logging.info(colored(f"Export {onnx_path} Success.", 'green'))
if "nash" in opt.march:
## calibrate data - 动态生成相机校准数据目录
input_names_TransformerLayers = camera_names + ["state"]
input_cal_path = []
for input_name in input_names_TransformerLayers:
p = os.path.join(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, input_name)
input_cal_path.append(p)
os.makedirs(p, exist_ok=True)
logging.info(colored(f"mkdir: {p} Success.", 'green'))
for i, batch in enumerate(dataloader):
name = "%.10d.npy"%i
batch = policy.normalize_inputs(batch)
# 动态处理所有相机输入
camera_inputs = {}
for camera_name in camera_names:
camera_inputs[camera_name] = batch[f'observation.images.{camera_name}']
state_input = batch["observation.state"]
## VisionEncoder - 动态保存所有相机的校准数据
if i%4 == 0:
for camera_name in camera_names:
p = os.path.join(calbrate_data_path_BPU_ACTPolicy_VisionEncoder, f"{camera_name}_" + name)
np.save(p, camera_inputs[camera_name].detach().cpu().numpy())
logging.info(colored(f"save to: {p}", 'light_blue'))
## TransformerLayers - 动态处理所有相机的视觉特征
for camera_name in camera_names:
vision_feature = m_VisionEncoder(camera_inputs[camera_name])
camera_cal_path = os.path.join(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, camera_name)
p = os.path.join(camera_cal_path, name)
np.save(p, vision_feature.detach().cpu().numpy())
logging.info(colored(f"save to: {p}", 'light_magenta'))
p = os.path.join(state_calbrate_data_path_BPU_ACTPolicy_TransformerLayers, name)
np.save(p, state_input.detach().cpu().numpy())
logging.info(colored(f"save to: {p}", 'light_magenta'))
if i >= opt.cal_num:
break
if "bayes" in opt.march:
## calibrate data - 动态生成相机校准数据目录
input_names_TransformerLayers = camera_names + ["state"]
input_cal_path = []
for input_name in input_names_TransformerLayers:
p = os.path.join(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, input_name)
input_cal_path.append(p)
os.makedirs(p, exist_ok=True)
logging.info(colored(f"mkdir: {p} Success.", 'green'))
for i, batch in enumerate(dataloader):
name = "%.10d.nchw"%i
batch = policy.normalize_inputs(batch)
# 动态处理所有相机输入
camera_inputs = {}
for camera_name in camera_names:
camera_inputs[camera_name] = batch[f'observation.images.{camera_name}']
state_input = batch["observation.state"]
## VisionEncoder - 动态保存所有相机的校准数据 (Bayes格式)
if i%4 == 0:
for camera_name in camera_names:
p = os.path.join(calbrate_data_path_BPU_ACTPolicy_VisionEncoder, f"{camera_name}_" + name)
camera_inputs[camera_name].detach().cpu().numpy().tofile(p)
logging.info(colored(f"save to: {p}", 'light_blue'))
## TransformerLayers - 动态处理所有相机的视觉特征 (Bayes格式)
for camera_name in camera_names:
vision_feature = m_VisionEncoder(camera_inputs[camera_name])
camera_cal_path = os.path.join(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, camera_name)
p = os.path.join(camera_cal_path, name)
vision_feature.detach().cpu().numpy().tofile(p)
logging.info(colored(f"save to: {p}", 'light_magenta'))
p = os.path.join(state_calbrate_data_path_BPU_ACTPolicy_TransformerLayers, name)
state_input.detach().cpu().numpy().tofile(p)
logging.info(colored(f"save to: {p}", 'light_magenta'))
if i >= opt.cal_num:
break
def generate_output_config(time_cost):
global _global_config
export_path = _global_config['export']['output_path'] + "/" + _global_config['task_id']
TransformerLayers = export_path + "/" + BPU_TransformerLayers
TransformerLayers_onnx = TransformerLayers + "/" + BPU_TransformerLayers + ".onnx"
TransformerLayers_calibration_data = TransformerLayers + "/" + "calibration_data_" + BPU_TransformerLayers
VisionEncoder = export_path + "/" + BPU_VisionEncoder
VisionEncoder_onnx = VisionEncoder + "/" + BPU_VisionEncoder + ".onnx"
VisionEncoder_calibration_data = VisionEncoder + "/" + "calibration_data_" + BPU_VisionEncoder
output_config = {
"task_name": _global_config['task_id'],
"march": _global_config['export']['march'],
"time_cost": time_cost,
"export_path": export_path,
"TransformerLayers": TransformerLayers_onnx,
"TransformerLayers_calibration_data": TransformerLayers_calibration_data,
"VisionEncoder": VisionEncoder_onnx,
"VisionEncoder_calibration_data": VisionEncoder_calibration_data,
}
with open(os.path.join(export_path, "output.json"), "w") as f:
json.dump(output_config, f)
if __name__ == "__main__":
init_logging()
config_path = "input/config.json"
_global_config = load_config(config_path)
time_start = time.time()
main()
time_end = time.time()
time_cost = time_end - time_start
logging.info(colored(f"Time Cost: {time_cost} seconds", 'light_red'))
generate_output_config(time_cost)