466 lines
21 KiB
Python
466 lines
21 KiB
Python
import logging
|
|
import os
|
|
import sys
|
|
import shutil
|
|
import cv2
|
|
import numpy as np
|
|
import torch
|
|
import torch.nn as nn
|
|
import argparse
|
|
import onnx
|
|
import json
|
|
import yaml
|
|
from copy import deepcopy
|
|
from termcolor import colored
|
|
from onnxsim import simplify
|
|
from pprint import pformat
|
|
import time
|
|
from lerobot.policies.act.modeling_act import ACTPolicy
|
|
from lerobot.datasets.factory import make_dataset
|
|
from lerobot.utils.utils import get_safe_torch_device, init_logging
|
|
from lerobot.configs import parser
|
|
from lerobot.configs.train import TrainPipelineConfig
|
|
|
|
_global_config = None
|
|
|
|
BPU_VisionEncoder = "BPU_ACTPolicy_VisionEncoder"
|
|
BPU_TransformerLayers = "BPU_ACTPolicy_TransformerLayers"
|
|
|
|
def onnx_sim(onnx_path, onnx_sim):
|
|
if onnx_sim:
|
|
model_onnx = onnx.load(onnx_path) # load onnx model
|
|
onnx.checker.check_model(model_onnx) # check onnx model
|
|
model_onnx, check = simplify(
|
|
model_onnx,
|
|
dynamic_input_shape=False,
|
|
input_shapes=None)
|
|
assert check, 'assert check failed'
|
|
onnx.save(model_onnx, onnx_path)
|
|
|
|
def load_config(config_path):
|
|
# 根据文件扩展名选择加载方式
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
config_dict = json.load(f)
|
|
args = []
|
|
|
|
if 'export' in config_dict:
|
|
export_cfg = config_dict['export']
|
|
if 'repo_id' in export_cfg:
|
|
args.extend(['--dataset.repo_id', str(export_cfg['repo_id'])])
|
|
if 'dataset_path' in export_cfg:
|
|
args.extend(['--dataset.root', str(export_cfg['dataset_path'])])
|
|
|
|
args.extend(['--policy.type', 'act'])
|
|
args.extend(['--policy.device', 'cpu' if 'gpu_id' not in config_dict else f"cuda"])
|
|
args.extend(['--policy.repo_id', str(export_cfg['repo_id'])])
|
|
|
|
# 使用 opencv 作为视频后端,避免 torchcodec 需要 FFmpeg 的问题
|
|
args.extend(['--dataset.video_backend', 'pyav'])
|
|
|
|
args.extend(['--wandb.enable', 'false'])
|
|
|
|
# 保留原始的脚本名称作为 sys.argv[0],然后添加参数
|
|
sys.argv = [sys.argv[0]] + args
|
|
|
|
logging.info(f"Loaded config from {config_path}")
|
|
logging.info(f"Config: {sys.argv}")
|
|
|
|
return config_dict
|
|
|
|
return None
|
|
|
|
class BPU_ACTPolicy_VisionEncoder(nn.Module):
|
|
def __init__(self, act_policy):
|
|
super().__init__()
|
|
self.backbone = deepcopy(act_policy.model.backbone)
|
|
self.encoder_img_feat_input_proj = deepcopy(act_policy.model.encoder_img_feat_input_proj)
|
|
def forward(self, images):
|
|
cam_features = self.backbone(images)["feature_map"]
|
|
cam_features = self.encoder_img_feat_input_proj(cam_features)
|
|
cam_features = cam_features
|
|
return cam_features
|
|
|
|
class BPU_ACTPolicy_TransformerLayers(nn.Module):
|
|
def __init__(self, act_policy, camera_names):
|
|
super().__init__()
|
|
self.model = deepcopy(act_policy.model)
|
|
self.camera_names = camera_names
|
|
|
|
def forward(self, states, *vision_features):
|
|
latent_sample = torch.zeros([1, self.model.config.latent_dim], dtype=torch.float32)
|
|
|
|
encoder_in_tokens = [self.model.encoder_latent_input_proj(latent_sample)]
|
|
encoder_in_pos_embed = self.model.encoder_1d_feature_pos_embed.weight.unsqueeze(1).unbind(dim=0)
|
|
encoder_in_tokens.append(self.model.encoder_robot_state_input_proj(states))
|
|
|
|
all_cam_features = []
|
|
all_cam_pos_embeds = []
|
|
|
|
# 动态处理所有相机的视觉特征
|
|
for vision_feature in vision_features:
|
|
cam_pos_embed = self.model.encoder_cam_feat_pos_embed(vision_feature)
|
|
all_cam_features.append(vision_feature)
|
|
all_cam_pos_embeds.append(cam_pos_embed)
|
|
|
|
|
|
tokens = []
|
|
for token in encoder_in_tokens:
|
|
tokens.append(token.view(1,1,self.model.config.dim_model))
|
|
all_cam_features = torch.cat(all_cam_features, axis=-1).permute(2, 3, 0, 1).view(-1,1,self.model.config.dim_model)
|
|
tokens.append(all_cam_features)
|
|
encoder_in_tokens = torch.cat(tokens, axis=0)
|
|
|
|
pos_embeds = []
|
|
for pos_embed in encoder_in_pos_embed:
|
|
pos_embeds.append(pos_embed.view(1,1,self.model.config.dim_model))
|
|
all_cam_pos_embeds = torch.cat(all_cam_pos_embeds, axis=-1).permute(2, 3, 0, 1).view(-1,1,self.model.config.dim_model)
|
|
pos_embeds.append(all_cam_pos_embeds)
|
|
encoder_in_pos_embed = torch.cat(pos_embeds, axis=0)
|
|
|
|
encoder_out = self.model.encoder(encoder_in_tokens, pos_embed=encoder_in_pos_embed)
|
|
|
|
decoder_in = torch.zeros(
|
|
(self.model.config.chunk_size, 1, self.model.config.dim_model),
|
|
dtype=encoder_in_pos_embed.dtype,
|
|
device=encoder_in_pos_embed.device,
|
|
)
|
|
decoder_out = self.model.decoder(
|
|
decoder_in,
|
|
encoder_out,
|
|
encoder_pos_embed=encoder_in_pos_embed,
|
|
decoder_pos_embed=self.model.decoder_pos_embed.weight.unsqueeze(1),
|
|
)
|
|
|
|
decoder_out = decoder_out.transpose(0, 1)
|
|
|
|
actions = self.model.action_head(decoder_out)
|
|
|
|
return actions
|
|
|
|
def lerobotTensor2cvmat(tensor):
|
|
img = (tensor*255).permute(0, 2, 3, 1).cpu().numpy().astype(np.uint8)[0,:,:,:]
|
|
# img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
|
|
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
|
|
return img
|
|
|
|
def onnx_sim(onnx_path, onnx_sim):
|
|
if onnx_sim:
|
|
model_onnx = onnx.load(onnx_path) # load onnx model
|
|
onnx.checker.check_model(model_onnx) # check onnx model
|
|
model_onnx, check = simplify(
|
|
model_onnx,
|
|
dynamic_input_shape=False,
|
|
input_shapes=None)
|
|
assert check, 'assert check failed'
|
|
onnx.save(model_onnx, onnx_path)
|
|
|
|
@parser.wrap()
|
|
def main(cfg: TrainPipelineConfig):
|
|
# LeRobot的参数列表
|
|
# 跳过validate(),配置不是用于训练,而是用于导出
|
|
# cfg.validate()
|
|
logging.info(pformat(cfg.to_dict()))
|
|
|
|
# BPU导出参数 - 从全局配置或命令行读取
|
|
global _global_config
|
|
|
|
class BPUOptions:
|
|
act_path = _global_config['export']['model_path']
|
|
export_path = _global_config['export']['output_path'] + "/" + _global_config['task_id']
|
|
cal_num = _global_config['export']['calibration_num']
|
|
onnx_sim = True
|
|
combine_jobs = 6
|
|
|
|
opt = BPUOptions()
|
|
|
|
if _global_config:
|
|
opt.act_path = _global_config['export']['model_path']
|
|
opt.export_path = _global_config['export']['output_path'] + "/" + _global_config['task_id']
|
|
opt.cal_num = _global_config['export']['calibration_num']
|
|
opt.onnx_sim = True
|
|
opt.march = _global_config['export']['march']
|
|
opt.combine_jobs = 6
|
|
logging.info("BPU parameters loaded from config file")
|
|
|
|
logging.info("="*80)
|
|
logging.info(colored("BPU Export Configuration:", 'light_cyan'))
|
|
logging.info(f" ACT Model Path: {opt.act_path}")
|
|
logging.info(f" Export Path: {opt.export_path}")
|
|
logging.info(f" Calibration Samples: {opt.cal_num}")
|
|
logging.info(f" ONNX Simplify: {opt.onnx_sim}")
|
|
logging.info(f" March: {opt.march}")
|
|
logging.info(f" Compiler Jobs: {opt.combine_jobs}")
|
|
logging.info(f" Dataset Root: {cfg.dataset.root}")
|
|
logging.info("="*80)
|
|
|
|
if not os.path.exists(opt.export_path):
|
|
os.makedirs(opt.export_path)
|
|
|
|
visionEncoder_ws = os.path.join(opt.export_path, BPU_VisionEncoder)
|
|
transformersLayers_ws = os.path.join(opt.export_path, BPU_TransformerLayers)
|
|
onnx_name_BPU_ACTPolicy_VisionEncoder = BPU_VisionEncoder + ".onnx"
|
|
onnx_path_BPU_ACTPolicy_VisionEncoder = os.path.join(visionEncoder_ws, onnx_name_BPU_ACTPolicy_VisionEncoder)
|
|
onnx_name_BPU_ACTPolicy_TransformerLayers = BPU_TransformerLayers + ".onnx"
|
|
onnx_path_BPU_ACTPolicy_TransformerLayers = os.path.join(transformersLayers_ws, onnx_name_BPU_ACTPolicy_TransformerLayers)
|
|
## 导出校准文件路径
|
|
calbrate_data_name_BPU_ACTPolicy_VisionEncoder = "calibration_data_" + BPU_VisionEncoder
|
|
calbrate_data_path_BPU_ACTPolicy_VisionEncoder = os.path.join(visionEncoder_ws, calbrate_data_name_BPU_ACTPolicy_VisionEncoder)
|
|
calbrate_data_name_BPU_ACTPolicy_TransformerLayers = "calibration_data_" + BPU_TransformerLayers
|
|
calbrate_data_path_BPU_ACTPolicy_TransformerLayers = os.path.join(transformersLayers_ws, calbrate_data_name_BPU_ACTPolicy_TransformerLayers)
|
|
state_calbrate_data_path_BPU_ACTPolicy_TransformerLayers = os.path.join(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, "state")
|
|
## 发布文件夹的脚本路径
|
|
bpu_output_name = "bpu_output"
|
|
bpu_output_path = os.path.join(opt.export_path, bpu_output_name)
|
|
bash_build_all_path = os.path.join(opt.export_path, "build_all.sh")
|
|
## 前后处理参数文件路径
|
|
action_std_path = os.path.join(bpu_output_path, "action_std.npy")
|
|
action_mean_path = os.path.join(bpu_output_path, "action_mean.npy")
|
|
action_std_unnormalize_path = os.path.join(bpu_output_path, "action_std_unnormalize.npy")
|
|
action_mean_unnormalize_path = os.path.join(bpu_output_path, "action_mean_unnormalize.npy")
|
|
## 新建工作目录
|
|
os.makedirs(visionEncoder_ws, exist_ok=True)
|
|
logging.info(colored(f"mkdir: {visionEncoder_ws} Success.", 'green'))
|
|
os.makedirs(transformersLayers_ws, exist_ok=True)
|
|
logging.info(colored(f"mkdir: {transformersLayers_ws} Success.", 'green'))
|
|
os.makedirs(calbrate_data_path_BPU_ACTPolicy_VisionEncoder, exist_ok=True)
|
|
logging.info(colored(f"mkdir: {calbrate_data_path_BPU_ACTPolicy_VisionEncoder} Success.", 'green'))
|
|
os.makedirs(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, exist_ok=True)
|
|
logging.info(colored(f"mkdir: {calbrate_data_path_BPU_ACTPolicy_TransformerLayers} Success.", 'green'))
|
|
os.makedirs(state_calbrate_data_path_BPU_ACTPolicy_TransformerLayers, exist_ok=True)
|
|
logging.info(colored(f"mkdir: {state_calbrate_data_path_BPU_ACTPolicy_TransformerLayers} Success.", 'green'))
|
|
os.makedirs(bpu_output_path, exist_ok=True)
|
|
logging.info(colored(f"mkdir: {bpu_output_path} Success.", 'green'))
|
|
|
|
policy = ACTPolicy.from_pretrained(opt.act_path).cpu().eval()
|
|
logging.info(colored(f"Load ACT Policy Model: {opt.act_path} Success.", 'light_red'))
|
|
device = get_safe_torch_device(cfg.policy.device, log=True)
|
|
torch.backends.cudnn.benchmark = True
|
|
torch.backends.cuda.matmul.allow_tf32 = True
|
|
|
|
# 加载数据集
|
|
dataset = make_dataset(cfg)
|
|
dataloader = torch.utils.data.DataLoader(
|
|
dataset,
|
|
num_workers=0,
|
|
batch_size=1,
|
|
shuffle=True,
|
|
sampler=None,
|
|
pin_memory=device.type != "cpu",
|
|
drop_last=False,
|
|
)
|
|
logging.info(colored(f"Load ACT Policy Dataset: \n{dataset} Success.", 'light_red'))
|
|
batch = next(iter(dataloader))
|
|
image_keys = [key for key in batch.keys() if key.startswith('observation.images.')]
|
|
camera_names = [key.split('.')[-1] for key in image_keys]
|
|
logging.info(colored(f"Camera Names: {camera_names} Success.", 'light_red'))
|
|
logging.info(colored(f"Image Keys: {image_keys} Success.", 'light_red'))
|
|
logging.info(colored(f"Batch: {batch} Success.", 'light_red'))
|
|
|
|
outputs = policy.select_action(deepcopy(batch))
|
|
|
|
## 动态获取前后处理参数
|
|
# 为每个相机保存归一化参数
|
|
for camera_name in camera_names:
|
|
buffer_name = f"buffer_observation_images_{camera_name}"
|
|
if hasattr(policy.normalize_inputs, buffer_name):
|
|
buffer = getattr(policy.normalize_inputs, buffer_name)
|
|
camera_std = buffer.std.data.detach().cpu().numpy()
|
|
camera_mean = buffer.mean.data.detach().cpu().numpy()
|
|
|
|
camera_std_path = os.path.join(bpu_output_path, f"{camera_name}_std.npy")
|
|
camera_mean_path = os.path.join(bpu_output_path, f"{camera_name}_mean.npy")
|
|
|
|
np.save(camera_std_path, camera_std)
|
|
np.save(camera_mean_path, camera_mean)
|
|
logging.info(f"Saved {camera_name} normalization parameters")
|
|
|
|
# 保存状态和动作归一化参数
|
|
action_std = policy.normalize_inputs.buffer_observation_state.std.data.detach().cpu().numpy()
|
|
action_mean = policy.normalize_inputs.buffer_observation_state.mean.data.detach().cpu().numpy()
|
|
action_std_unnormalize = policy.unnormalize_outputs.buffer_action.std.data.detach().cpu().numpy()
|
|
action_mean_unnormalize = policy.unnormalize_outputs.buffer_action.mean.data.detach().cpu().numpy()
|
|
|
|
np.save(action_std_path, action_std)
|
|
np.save(action_mean_path, action_mean)
|
|
np.save(action_std_unnormalize_path, action_std_unnormalize)
|
|
np.save(action_mean_unnormalize_path, action_mean_unnormalize)
|
|
|
|
## Vision Encoder
|
|
batch = policy.normalize_inputs(batch)
|
|
m_VisionEncoder = BPU_ACTPolicy_VisionEncoder(policy)
|
|
m_VisionEncoder.eval()
|
|
|
|
# 动态获取相机视觉特征
|
|
vision_features = []
|
|
for camera_name in camera_names:
|
|
input_tensor = batch[f'observation.images.{camera_name}']
|
|
vision_feature = m_VisionEncoder(input_tensor)
|
|
vision_features.append(vision_feature)
|
|
logging.info(f"Generated vision features for {camera_name}: {vision_feature.shape}")
|
|
|
|
# 确定ONNX版本
|
|
opset_version = 11 if "bayes" in opt.march else 19
|
|
logging.info(f"Using ONNX opset version: {opset_version} for type: {opt.march}")
|
|
|
|
onnx_path = onnx_path_BPU_ACTPolicy_VisionEncoder
|
|
torch.onnx.export(
|
|
m_VisionEncoder, # 要转换的模型
|
|
input_tensor, # 模型的输入
|
|
onnx_path, # 输出文件名
|
|
export_params=True, # 存储训练后的参数
|
|
opset_version=opset_version, # 动态ONNX版本
|
|
do_constant_folding=True, # 是否执行常量折叠优化
|
|
input_names=['images'], # 输入节点名称
|
|
output_names=['Vision_Features'], # 输出节点名称
|
|
dynamic_axes=None
|
|
)
|
|
onnx_sim(onnx_path, opt.onnx_sim)
|
|
logging.info(colored(f"Export {onnx_path} Success.", 'green'))
|
|
|
|
m_TransformerLayers = BPU_ACTPolicy_TransformerLayers(policy, camera_names)
|
|
m_TransformerLayers.eval()
|
|
state = batch["observation.state"]
|
|
actions = m_TransformerLayers(state, *vision_features)
|
|
# np.save(f"new_actions.npy", actions.detach().cpu().numpy())
|
|
|
|
input_names = ['states'] + [f'{camera_name}_features' for camera_name in camera_names]
|
|
logging.info(f"Transformer input names: {input_names}")
|
|
|
|
onnx_path = onnx_path_BPU_ACTPolicy_TransformerLayers
|
|
torch.onnx.export(
|
|
m_TransformerLayers, # 要转换的模型
|
|
(state, *vision_features), # 模型的输入
|
|
onnx_path, # 输出文件名
|
|
export_params=True, # 存储训练后的参数
|
|
opset_version=opset_version, # 动态ONNX版本
|
|
do_constant_folding=True, # 是否执行常量折叠优化
|
|
input_names=input_names, # 动态输入节点名称
|
|
output_names=['Actions'], # 输出节点名称
|
|
dynamic_axes=None
|
|
)
|
|
onnx_sim(onnx_path, opt.onnx_sim)
|
|
logging.info(colored(f"Export {onnx_path} Success.", 'green'))
|
|
|
|
if "nash" in opt.march:
|
|
## calibrate data - 动态生成相机校准数据目录
|
|
input_names_TransformerLayers = camera_names + ["state"]
|
|
input_cal_path = []
|
|
for input_name in input_names_TransformerLayers:
|
|
p = os.path.join(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, input_name)
|
|
input_cal_path.append(p)
|
|
os.makedirs(p, exist_ok=True)
|
|
logging.info(colored(f"mkdir: {p} Success.", 'green'))
|
|
|
|
for i, batch in enumerate(dataloader):
|
|
name = "%.10d.npy"%i
|
|
batch = policy.normalize_inputs(batch)
|
|
|
|
# 动态处理所有相机输入
|
|
camera_inputs = {}
|
|
for camera_name in camera_names:
|
|
camera_inputs[camera_name] = batch[f'observation.images.{camera_name}']
|
|
state_input = batch["observation.state"]
|
|
|
|
## VisionEncoder - 动态保存所有相机的校准数据
|
|
if i%4 == 0:
|
|
for camera_name in camera_names:
|
|
p = os.path.join(calbrate_data_path_BPU_ACTPolicy_VisionEncoder, f"{camera_name}_" + name)
|
|
np.save(p, camera_inputs[camera_name].detach().cpu().numpy())
|
|
logging.info(colored(f"save to: {p}", 'light_blue'))
|
|
|
|
## TransformerLayers - 动态处理所有相机的视觉特征
|
|
for camera_name in camera_names:
|
|
vision_feature = m_VisionEncoder(camera_inputs[camera_name])
|
|
camera_cal_path = os.path.join(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, camera_name)
|
|
p = os.path.join(camera_cal_path, name)
|
|
np.save(p, vision_feature.detach().cpu().numpy())
|
|
logging.info(colored(f"save to: {p}", 'light_magenta'))
|
|
|
|
p = os.path.join(state_calbrate_data_path_BPU_ACTPolicy_TransformerLayers, name)
|
|
np.save(p, state_input.detach().cpu().numpy())
|
|
logging.info(colored(f"save to: {p}", 'light_magenta'))
|
|
|
|
if i >= opt.cal_num:
|
|
break
|
|
|
|
if "bayes" in opt.march:
|
|
|
|
## calibrate data - 动态生成相机校准数据目录
|
|
input_names_TransformerLayers = camera_names + ["state"]
|
|
input_cal_path = []
|
|
for input_name in input_names_TransformerLayers:
|
|
p = os.path.join(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, input_name)
|
|
input_cal_path.append(p)
|
|
os.makedirs(p, exist_ok=True)
|
|
logging.info(colored(f"mkdir: {p} Success.", 'green'))
|
|
|
|
for i, batch in enumerate(dataloader):
|
|
name = "%.10d.nchw"%i
|
|
batch = policy.normalize_inputs(batch)
|
|
|
|
# 动态处理所有相机输入
|
|
camera_inputs = {}
|
|
for camera_name in camera_names:
|
|
camera_inputs[camera_name] = batch[f'observation.images.{camera_name}']
|
|
|
|
state_input = batch["observation.state"]
|
|
|
|
## VisionEncoder - 动态保存所有相机的校准数据 (Bayes格式)
|
|
if i%4 == 0:
|
|
for camera_name in camera_names:
|
|
p = os.path.join(calbrate_data_path_BPU_ACTPolicy_VisionEncoder, f"{camera_name}_" + name)
|
|
camera_inputs[camera_name].detach().cpu().numpy().tofile(p)
|
|
logging.info(colored(f"save to: {p}", 'light_blue'))
|
|
|
|
## TransformerLayers - 动态处理所有相机的视觉特征 (Bayes格式)
|
|
for camera_name in camera_names:
|
|
vision_feature = m_VisionEncoder(camera_inputs[camera_name])
|
|
camera_cal_path = os.path.join(calbrate_data_path_BPU_ACTPolicy_TransformerLayers, camera_name)
|
|
p = os.path.join(camera_cal_path, name)
|
|
vision_feature.detach().cpu().numpy().tofile(p)
|
|
logging.info(colored(f"save to: {p}", 'light_magenta'))
|
|
|
|
p = os.path.join(state_calbrate_data_path_BPU_ACTPolicy_TransformerLayers, name)
|
|
state_input.detach().cpu().numpy().tofile(p)
|
|
logging.info(colored(f"save to: {p}", 'light_magenta'))
|
|
|
|
if i >= opt.cal_num:
|
|
break
|
|
|
|
def generate_output_config(time_cost):
|
|
global _global_config
|
|
export_path = _global_config['export']['output_path'] + "/" + _global_config['task_id']
|
|
TransformerLayers = export_path + "/" + BPU_TransformerLayers
|
|
TransformerLayers_onnx = TransformerLayers + "/" + BPU_TransformerLayers + ".onnx"
|
|
TransformerLayers_calibration_data = TransformerLayers + "/" + "calibration_data_" + BPU_TransformerLayers
|
|
VisionEncoder = export_path + "/" + BPU_VisionEncoder
|
|
VisionEncoder_onnx = VisionEncoder + "/" + BPU_VisionEncoder + ".onnx"
|
|
VisionEncoder_calibration_data = VisionEncoder + "/" + "calibration_data_" + BPU_VisionEncoder
|
|
output_config = {
|
|
"task_name": _global_config['task_id'],
|
|
"march": _global_config['export']['march'],
|
|
"time_cost": time_cost,
|
|
"export_path": export_path,
|
|
"TransformerLayers": TransformerLayers_onnx,
|
|
"TransformerLayers_calibration_data": TransformerLayers_calibration_data,
|
|
"VisionEncoder": VisionEncoder_onnx,
|
|
"VisionEncoder_calibration_data": VisionEncoder_calibration_data,
|
|
}
|
|
with open(os.path.join(export_path, "output.json"), "w") as f:
|
|
json.dump(output_config, f)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
init_logging()
|
|
config_path = "input/config.json"
|
|
_global_config = load_config(config_path)
|
|
time_start = time.time()
|
|
main()
|
|
time_end = time.time()
|
|
time_cost = time_end - time_start
|
|
logging.info(colored(f"Time Cost: {time_cost} seconds", 'light_red'))
|
|
generate_output_config(time_cost)
|
|
|
|
|