RoboTwin_image/envs/base_task.py
2025-07-02 03:13:07 +00:00

2345 lines
107 KiB
Python
Executable File

import mplib.planner
import sapien.core as sapien
from sapien.utils.viewer import Viewer
import mplib
import numpy as np
import gymnasium as gym
import pdb
import numpy as np
from PIL import Image, ImageColor
import toppra as ta
import open3d as o3d
import json
import transforms3d as t3d
from .utils import fps
from collections import OrderedDict
from .utils import *
import collections
from collections import deque
import cv2
import torch
from copy import deepcopy
class Base_task(gym.Env):
def __init__(self):
pass
def _init(self, **kwags):
'''
Initialization
- `self.PCD_INDEX`: The index of the file saved for the current scene.
- `self.fcitx5-configtool`: Left gripper pose (close <=0, open >=0.4).
- `self.ep_num`: Episode ID.
- `self.task_name`: Task name.
- `self.save_dir`: Save path.
- `self.left_original_pose`: Left arm original pose.
- `self.right_original_pose`: Right arm original pose.
- `self.left_arm_joint_id`: [6,14,18,22,26,30].
- `self.right_arm_joint_id`: [7,15,19,23,27,31].
- `self.render_fre`: Render frequency.
'''
super().__init__()
ta.setup_logging("CRITICAL") # hide logging
np.random.seed(kwags.get('seed', 0))
global left_pub_data
global right_pub_data
self.PCD_INDEX = 0
self.task_name = kwags.get('task_name')
self.save_dir = kwags.get('save_path', 'data') +'/' + kwags.get('task_name', 'Empty')
self.ep_num = kwags.get('now_ep_num', 0)
self.render_freq = kwags.get('render_freq', 10)
self.data_type = kwags.get('data_type', None)
self.save_type = kwags.get('save_type', None)
self.pcd_crop = kwags.get('pcd_crop', False)
self.pcd_down_sample_num = kwags.get('pcd_down_sample_num', 0)
self.is_save = kwags.get('is_save', False)
self.pcd_crop_bbox = kwags.get('bbox', [[-0.6, -0.35, 0.7401],[0.6, 0.35, 2]])
self.dual_arm = kwags.get('dual_arm', True)
self.table_static = kwags.get('table_static', True)
self.head_camera_fovy = kwags.get('head_camera_fovy', 45)
self.save_freq = kwags.get('save_freq', 15)
self.file_path = []
self.plan_success = True
self.left_plan_success = True
self.right_plan_success = True
self.step_lim = None
self.fix_gripper = False
self.setup_scene()
self.left_js = None
self.right_js = None
self.raw_head_pcl = None
self.real_head_pcl = None
self.real_head_pcl_color = None
left_pub_data = [0,0,0,0,0,0,0]
right_pub_data = [0,0,0,0,0,0,0]
self.grasp_direction_dic = {
'left': [0, 0, 0, -1],
'front_left': [-0.383, 0, 0, -0.924],
'front' : [-0.707, 0, 0, -0.707],
'front_right': [-0.924, 0, 0, -0.383],
'right': [-1, 0, 0, 0],
'top_down': [-0.5, 0.5, -0.5, -0.5],
}
self.world_direction_dic = {
'left': [0.5, 0.5, 0.5, 0.5],
'front_left': [0.65334811, 0.27043713, 0.65334811, 0.27043713],
'front' : [0.707, 0, 0.707, 0],
'front_right': [0.65334811, -0.27043713, 0.65334811, -0.27043713],
'right': [0.5, -0.5, 0.5, 0.5],
'top_down': [0, 0, 1, 0],
}
self.target_left_pose_front = [-0.19,-0.12,0.92,1,0,0,0]
self.target_right_pose_front = [0.19,-0.12,0.92,-0.01,0.01,0.03,-1]
self.handover_block_pose = [-0.054, -0.09, 0.82]
self.actor_name_dic = {}
self.actor_data_dic = {}
self.used_contant = []
self.left_prepare_grasp_data = None
self.left_prepare_grasp_point_group = None
self.right_prepare_grasp_data = None
self.right_prepare_grasp_point_group = None
self.pre_left_pose = None
self.pre_right_pose = None
self.pose_of_close_left_gripper = None
self.pose_of_close_right_gripper = None
self.now_left_pose = None
self.now_right_pose = None
def setup_scene(self,**kwargs):
'''
Set the scene
- Set up the basic scene: light source, viewer.
'''
self.engine = sapien.Engine()
# declare sapien renderer
from sapien.render import set_global_config
set_global_config(max_num_materials = 50000, max_num_textures = 50000)
self.renderer = sapien.SapienRenderer()
# give renderer to sapien sim
self.engine.set_renderer(self.renderer)
sapien.render.set_camera_shader_dir("rt")
sapien.render.set_ray_tracing_samples_per_pixel(32)
sapien.render.set_ray_tracing_path_depth(8)
sapien.render.set_ray_tracing_denoiser("oidn")
# declare sapien scene
scene_config = sapien.SceneConfig()
self.scene = self.engine.create_scene(scene_config)
# set simulation timestep
self.scene.set_timestep(kwargs.get("timestep", 1 / 250))
# add ground to scene
self.scene.add_ground(kwargs.get("ground_height", 0))
# set default physical material
self.scene.default_physical_material = self.scene.create_physical_material(
kwargs.get("static_friction", 0.5),
kwargs.get("dynamic_friction", 0.5),
kwargs.get("restitution", 0),
)
# give some white ambient light of moderate intensity
self.scene.set_ambient_light(kwargs.get("ambient_light", [0.5, 0.5, 0.5]))
# default enable shadow unless specified otherwise
shadow = kwargs.get("shadow", True)
# default spotlight angle and intensity
direction_lights = kwargs.get(
"direction_lights", [[[0, 0.5, -1], [0.5, 0.5, 0.5]]]
)
for direction_light in direction_lights:
self.scene.add_directional_light(
direction_light[0], direction_light[1], shadow=shadow
)
# default point lights position and intensity
point_lights = kwargs.get(
"point_lights",
[[[1, 0, 1.8], [1, 1, 1]], [[-1, 0, 1.8], [1, 1, 1]]]
)
for point_light in point_lights:
self.scene.add_point_light(point_light[0], point_light[1], shadow=shadow)
# initialize viewer with camera position and orientation
if self.render_freq:
self.viewer = Viewer(self.renderer)
self.viewer.set_scene(self.scene)
self.viewer.set_camera_xyz(
x=kwargs.get("camera_xyz_x", 0.4),
y=kwargs.get("camera_xyz_y", 0.22),
z=kwargs.get("camera_xyz_z", 1.5),
)
self.viewer.set_camera_rpy(
r=kwargs.get("camera_rpy_r", 0),
p=kwargs.get("camera_rpy_p", -0.8),
y=kwargs.get("camera_rpy_y", 2.45),
)
def create_table_and_wall(self):
# creat wall
self.wall = create_box(
self.scene,
sapien.Pose(p=[0, 1, 1.5]),
half_size=[3, 0.6, 1.5],
color=(1, 0.9, 0.9),
name='wall',
)
# creat table
self.table = create_table(
self.scene,
sapien.Pose(p=[0, 0, 0.74]),
length=1.2,
width=0.7,
height=0.74,
thickness=0.05,
is_static=self.table_static
)
def load_robot(self, **kwargs):
"""
load aloha robot urdf file, set root pose and set joints
"""
# load urdf file
loader: sapien.URDFLoader = self.scene.create_urdf_loader()
loader.fix_root_link = True
self.robot = loader.load(
kwargs.get("urdf_path", "./aloha_maniskill_sim/urdf/arx5_description_isaac.urdf")
)
# set root pose
self.robot.set_root_pose(
sapien.Pose(
kwargs.get("robot_origin_xyz", [0, -0.65, 0]),
kwargs.get("robot_origin_quat", [1, 0, 0, 1]),
)
)
# set joints
self.active_joints = self.robot.get_active_joints()
for joint in self.active_joints:
joint.set_drive_property(
stiffness=kwargs.get("joint_stiffness", 1000),
damping=kwargs.get("joint_damping", 200),
)
self.all_joints = self.robot.get_joints()
self.all_links = self.robot.get_links()
self.left_endpose = self.robot.find_joint_by_name('fl_joint6')
self.right_endpose = self.robot.find_joint_by_name('fr_joint6')
self.left_gripper_val = 0.
self.right_gripper_val = 0.
self.left_original_pose = [-0.3,-0.32,0.935,1,0,0,1]
self.right_original_pose = [0.3,-0.32,0.935,1,0,0,1]
self.left_arm_joint_id = [6,14,18,22,26,30]
self.right_arm_joint_id = [7,15,19,23,27,31]
self.left_init_pose = sapien.Pose([-0.29881, -0.311307, 0.9375], [-5.33147e-06, -0.707107, -0.707107, -5.04532e-06])
self.right_init_pose = sapien.Pose([0.30119, -0.311307, 0.9375], [-5.33147e-06, -0.707107, -0.707107, -5.04532e-06])
def load_camera(self,camera_w,camera_h):
'''
Add cameras and set camera parameters
- Including four cameras: left, right, front, head.
'''
near, far = 0.1, 100
width, height = camera_w, camera_h
# front camera
front_cam_pos = np.array([0, -0.45, 0.85])
front_cam_forward = np.array([0,1,-0.1]) / np.linalg.norm(np.array([0,1,-0.1]))
front_cam_left = np.cross([0, 0, 1], front_cam_forward)
front_cam_left = front_cam_left / np.linalg.norm(front_cam_left)
front_up = np.cross(front_cam_forward, front_cam_left)
front_mat44 = np.eye(4)
front_mat44[:3, :3] = np.stack([front_cam_forward, front_cam_left, front_up], axis=1)
front_mat44[:3, 3] = front_cam_pos
# head camera
head_cam_pos = np.array([-0.032, -0.45, 1.35])
head_cam_forward = np.array([0,0.1,-0.55])
head_cam_left = np.cross([0, 0, 1], head_cam_forward)
head_cam_left = head_cam_left / np.linalg.norm(head_cam_left)
head_up = np.cross(head_cam_forward, head_cam_left)
head_mat44 = np.eye(4)
head_mat44[:3, :3] = np.stack([head_cam_forward, head_cam_left, head_up], axis=1)
head_mat44[:3, 3] = head_cam_pos
# observer camera
observer_cam_pos = np.array([0.4, 0.22, 1.42])
observer_cam_forward = np.array([-1,-1,-1])
observer_cam_left = np.array([1,-1, 0])
observer_up = np.cross(observer_cam_forward, observer_cam_left)
observer_mat44 = np.eye(4)
observer_mat44[:3, :3] = np.stack([observer_cam_forward, observer_cam_left, observer_up], axis=1)
observer_mat44[:3, 3] = observer_cam_pos
self.left_camera = self.scene.add_camera(
name="left_camera",
width=width,
height=height,
fovy=np.deg2rad(37),
near=near,
far=far,
)
self.right_camera = self.scene.add_camera(
name="right_camera",
width=width,
height=height,
fovy=np.deg2rad(37),
near=near,
far=far,
)
self.front_camera = self.scene.add_camera(
name="front_camera",
width=width,
height=height,
fovy=np.deg2rad(37),
near=near,
far=far,
)
self.head_camera = self.scene.add_camera(
name="head_camera",
width=width,
height=height,
fovy=np.deg2rad(self.head_camera_fovy),
near=near,
far=far,
)
self.observer_camera = self.scene.add_camera(
name = "observer_camera",
width=width,
height=height,
fovy=np.deg2rad(93),
near=near,
far=far,
)
self.front_camera.entity.set_pose(sapien.Pose(front_mat44))
self.head_camera.entity.set_pose(sapien.Pose(head_mat44))
self.observer_camera.entity.set_pose(sapien.Pose(observer_mat44))
self.left_camera.entity.set_pose(self.all_links[46].get_pose())
self.right_camera.entity.set_pose(self.all_links[49].get_pose())
self.scene.step() # run a physical step
self.scene.update_render() # sync pose from SAPIEN to renderer
def setup_planner(self, **kwargs):
"""
Create an mplib planner using the default robot.
See planner.py for more details on the arguments.
"""
self.left_planner = mplib.Planner(
urdf=kwargs.get("urdf_path", "./aloha_maniskill_sim/urdf/arx5_description_isaac.urdf"),
srdf=kwargs.get("srdf_path", "./aloha_maniskill_sim/srdf/arx5_description_isaac.srdf"),
move_group=kwargs.get("move_group", "fl_link6"),
)
self.right_planner = mplib.Planner(
urdf=kwargs.get("urdf_path", "./aloha_maniskill_sim/urdf/arx5_description_isaac.urdf"),
srdf=kwargs.get("srdf_path", "./aloha_maniskill_sim/srdf/arx5_description_isaac.srdf"),
move_group=kwargs.get("move_group", "fr_link6"),
)
robot_pose_in_world = [0,-0.65,0,1,0,0,1]
self.left_planner.set_base_pose(robot_pose_in_world)
self.right_planner.set_base_pose(robot_pose_in_world)
def _update_render(self):
"""
Update rendering to refresh the camera's RGBD information
(rendering must be updated even when disabled, otherwise data cannot be collected).
"""
self.left_camera.entity.set_pose(self.all_links[46].get_pose())
self.right_camera.entity.set_pose(self.all_links[49].get_pose())
self.scene.update_render()
self.scene.update_render()
def left_follow_path(self, result, save_freq=-1): # For left arm
save_freq = self.save_freq if save_freq == -1 else save_freq
n_step = result["position"].shape[0]
if n_step > 2000:
self.plan_success = False
return
if save_freq != None:
self._take_picture()
for i in range(n_step):
qf = self.robot.compute_passive_force(
gravity=True, coriolis_and_centrifugal=True
)
self.robot.set_qf(qf)
for j in range(len(self.left_arm_joint_id)):
n_j = self.left_arm_joint_id[j]
self.active_joints[n_j].set_drive_target(result["position"][i][j])
self.active_joints[n_j].set_drive_velocity_target(
result["velocity"][i][j]
)
self.scene.step()
if i%5 == 0:
self._update_render()
if self.render_freq and i % self.render_freq == 0:
self.viewer.render()
if save_freq != None and i % save_freq == 0:
self._take_picture()
if save_freq != None:
self._take_picture()
def right_follow_path(self, result, save_freq=-1): # For right arm
save_freq = self.save_freq if save_freq == -1 else save_freq
n_step = result["position"].shape[0]
if n_step > 2000:
self.plan_success = False
return
if save_freq != None:
self._take_picture()
for i in range(n_step):
qf = self.robot.compute_passive_force(
gravity=True, coriolis_and_centrifugal=True
)
self.robot.set_qf(qf)
for j in range(len(self.right_arm_joint_id)):
n_j = self.right_arm_joint_id[j]
self.active_joints[n_j].set_drive_target(result["position"][i][j])
self.active_joints[n_j].set_drive_velocity_target(
result["velocity"][i][j]
)
self.scene.step()
if i % 5 == 0:
self._update_render()
if self.render_freq and i % self.render_freq == 0:
self.viewer.render()
if save_freq != None and i % save_freq == 0:
self._take_picture()
if save_freq != None:
self._take_picture()
def together_follow_path(self, left_result,right_result, save_freq=-1):
save_freq = self.save_freq if save_freq == -1 else save_freq
left_n_step = left_result["position"].shape[0]
right_n_step = right_result["position"].shape[0]
n_step = max(left_n_step, right_n_step)
if n_step > 2000:
self.plan_success = False
return
if save_freq != None:
self._take_picture()
now_left_id = 0
now_right_id = 0
i = 0
while now_left_id < left_n_step or now_right_id < right_n_step:
qf = self.robot.compute_passive_force(
gravity=True, coriolis_and_centrifugal=True
)
self.robot.set_qf(qf)
# set the joint positions and velocities for move group joints only.
# The others are not the responsibility of the planner
if now_left_id < left_n_step and now_left_id / left_n_step <= now_right_id / right_n_step:
for j in range(len(self.left_arm_joint_id)):
left_j = self.left_arm_joint_id[j]
self.active_joints[left_j].set_drive_target(left_result["position"][now_left_id][j])
self.active_joints[left_j].set_drive_velocity_target(left_result["velocity"][now_left_id][j])
now_left_id +=1
if now_right_id < right_n_step and now_right_id / right_n_step <= now_left_id / left_n_step:
for j in range(len(self.right_arm_joint_id)):
right_j = self.right_arm_joint_id[j]
self.active_joints[right_j].set_drive_target(right_result["position"][now_right_id][j])
self.active_joints[right_j].set_drive_velocity_target(right_result["velocity"][now_right_id][j])
now_right_id +=1
self.scene.step()
if i % 5==0:
self._update_render()
if self.render_freq and i % self.render_freq == 0:
self.viewer.render()
if save_freq != None and i % save_freq == 0:
self._take_picture()
i+=1
if save_freq != None:
self._take_picture()
def set_gripper(self, left_pos = 0.045, right_pos = 0.045, set_tag = 'together', save_freq=-1):
'''
Set gripper posture
- `left_pos`: Left gripper pose
- `right_pos`: Right gripper pose
- `set_tag`: "left" to set the left gripper, "right" to set the right gripper, "together" to set both grippers simultaneously.
'''
save_freq = self.save_freq if save_freq == -1 else save_freq
if save_freq != None:
self._take_picture()
left_gripper_step = 0
right_gripper_step = 0
real_left_gripper_step = 0
real_right_gripper_step = 0
if set_tag == 'left' or set_tag == 'together':
left_gripper_step = (left_pos - self.left_gripper_val) / 400
real_left_gripper_step = (left_pos - self.active_joints[34].get_drive_target()[0]) / 200
if set_tag == 'right' or set_tag == 'together':
right_gripper_step = (right_pos - self.right_gripper_val) / 400
real_right_gripper_step = (right_pos - self.active_joints[36].get_drive_target()[0]) / 200
for i in range(400):
self.left_gripper_val += left_gripper_step
self.right_gripper_val += right_gripper_step
if i < 200:
real_left_gripper_val = self.active_joints[34].get_drive_target()[0] + real_left_gripper_step
real_right_gripper_val = self.active_joints[36].get_drive_target()[0] + real_right_gripper_step
qf = self.robot.compute_passive_force(
gravity=True, coriolis_and_centrifugal=True
)
self.robot.set_qf(qf)
if set_tag == 'left' or set_tag == 'together':
for joint in self.active_joints[34:36]:
joint.set_drive_target(real_left_gripper_val)
joint.set_drive_velocity_target(0.05)
if set_tag == 'right' or set_tag == 'together':
for joint in self.active_joints[36:38]:
joint.set_drive_target(real_right_gripper_val)
joint.set_drive_velocity_target(0.05)
self.scene.step()
if i % 5==0:
self._update_render()
if self.render_freq and i % self.render_freq == 0:
self.viewer.render()
if save_freq != None and i % save_freq == 0:
self._take_picture()
if save_freq != None:
self._take_picture()
if set_tag == 'left' or set_tag == 'together':
self.left_gripper_val = left_pos
if set_tag == 'right' or set_tag == 'together':
self.right_gripper_val = right_pos
def gripper_move_back(self, endpose_tag: str,save_freq=-1):
save_freq = self.save_freq if save_freq == -1 else save_freq
if endpose_tag == 'left':
pose_of_close_gripper = self.pose_of_close_left_gripper
now_pose = self.now_left_pose
pre_pose = self.pre_left_pose
move_function = self.left_move_to_pose_with_screw
elif endpose_tag == 'right':
pose_of_close_gripper = self.pose_of_close_right_gripper
now_pose = self.now_right_pose
pre_pose = self.pre_right_pose
move_function = self.right_move_to_pose_with_screw
else:
return
if pose_of_close_gripper is not None and pre_pose is not None and now_pose is not None:
las_pose_matrix = np.eye(4)
las_pose_matrix[:3,3] = pose_of_close_gripper[:3]
las_pose_matrix[:3,:3] = t3d.quaternions.quat2mat(pose_of_close_gripper[3:])
now_pose_matrix = np.eye(4)
now_pose_matrix[:3,3] = now_pose[:3]
now_pose_matrix[:3,:3] = t3d.quaternions.quat2mat(now_pose[3:])
trans_matrix = np.dot(now_pose_matrix, np.linalg.inv(las_pose_matrix))
las_pre_pose_matrix = np.eye(4)
las_pre_pose_matrix[:3,3] = pre_pose[:3]
las_pre_pose_matrix[:3,:3] = t3d.quaternions.quat2mat(pre_pose[3:])
nxt_pose_matrix = np.dot(trans_matrix, las_pre_pose_matrix)
xyz_dis = np.ones(4)
xyz_dis[:3] = np.array(pre_pose[:3]) - np.array(pose_of_close_gripper[:3])
xyz_dis[:3] = trans_matrix[:3,:3] @ xyz_dis[:3]
nxt_pose_matrix[:3,3] = now_pose[:3] + xyz_dis[:3]
nxt_pose = list(nxt_pose_matrix[:3,3]) + list(t3d.quaternions.mat2quat(nxt_pose_matrix[:3,:3]))
if self.is_plan_success(endpose_tag, nxt_pose) == False:
return
move_function(nxt_pose, save_freq=save_freq)
def open_left_gripper(self, save_freq=-1, pos = 0.045):
save_freq = self.save_freq if save_freq == -1 else save_freq
self.set_gripper(left_pos = pos, set_tag='left', save_freq=save_freq)
if self.left_prepare_grasp_point_group is not None:
self.left_prepare_grasp_data['contact_points_mask'][self.left_prepare_grasp_point_group] = True
self.left_prepare_grasp_data = None
self.left_prepare_grasp_point_group = None
self.gripper_move_back('left',save_freq=save_freq)
def close_left_gripper(self, save_freq=-1, pos = 0):
save_freq = self.save_freq if save_freq == -1 else save_freq
self.set_gripper(left_pos = pos, set_tag='left',save_freq=save_freq)
if self.left_prepare_grasp_point_group is not None:
self.left_prepare_grasp_data['contact_points_mask'][self.left_prepare_grasp_point_group] = False
self.pose_of_close_left_gripper = self.now_left_pose
def open_right_gripper(self, save_freq=-1,pos = 0.045):
save_freq = self.save_freq if save_freq == -1 else save_freq
self.set_gripper(right_pos=pos, set_tag='right', save_freq=save_freq)
if self.right_prepare_grasp_point_group is not None:
self.right_prepare_grasp_data['contact_points_mask'][self.right_prepare_grasp_point_group] = True
self.right_prepare_grasp_data = None
self.right_prepare_grasp_point_group = None
self.gripper_move_back('right',save_freq=save_freq)
def close_right_gripper(self, save_freq=-1,pos = 0):
save_freq = self.save_freq if save_freq == -1 else save_freq
self.set_gripper(right_pos=pos, set_tag='right', save_freq=save_freq)
if self.right_prepare_grasp_point_group is not None:
self.right_prepare_grasp_data['contact_points_mask'][self.right_prepare_grasp_point_group] = False
self.pose_of_close_right_gripper = self.now_right_pose
def together_open_gripper(self, save_freq=-1, left_pos = 0.045, right_pos = 0.045):
save_freq = self.save_freq if save_freq == -1 else save_freq
self.set_gripper(left_pos=left_pos, right_pos=right_pos, set_tag='together', save_freq=save_freq)
self.gripper_move_back('left',save_freq=save_freq)
self.gripper_move_back('right',save_freq=save_freq)
def together_close_gripper(self, save_freq=-1,left_pos = 0, right_pos = 0):
save_freq = self.save_freq if save_freq == -1 else save_freq
self.set_gripper(left_pos=left_pos, right_pos=right_pos, set_tag='together', save_freq=save_freq)
self.pose_of_close_left_gripper = self.now_left_pose
self.pose_of_close_right_gripper = self.now_right_pose
def move_to_pose_with_RRTConnect(
self, pose, use_point_cloud=False, use_attach=False,freq =10
):
"""
Plan and follow a path to a pose using RRTConnect
Args:
pose: [x, y, z, qx, qy, qz, qw]
use_point_cloud (optional): if to take the point cloud into consideration
for collision checking.
use_attach (optional): if to take the attach into consideration
for collision checking.
"""
# result is a dictionary with keys 'status', 'time', 'position', 'velocity',
# 'acceleration', 'duration'
a = self.robot.get_qpos()
qpos = np.array([a[6],a[14],a[18],a[22],a[26],a[30]])
result = self.left_planner.plan_qpos_to_pose(
pose,
current_qpos = self.robot.get_qpos(),
time_step=1 / 250,
use_point_cloud=use_point_cloud,
use_attach=use_attach,
# mask = [0],
planner_name="RRTConnect",
)
# plan_qpos_to_pose ankor end
if result["status"] != "Success":
# print(result["status"])
return -1
# do nothing if the planning fails; follow the path if the planning succeeds
self.left_follow_path(result,freq)
return 0
def left_move_to_pose_with_screw(self, pose, use_point_cloud=False, use_attach=False,save_freq=-1):
"""
Interpolative planning with screw motion.
Will not avoid collision and will fail if the path contains collision.
"""
save_freq = self.save_freq if save_freq == -1 else save_freq
if type(pose) != list or len(pose) != 7:
print("left arm pose error!")
return
if self.is_left_gripper_open():
self.pre_left_pose = self.now_left_pose
self.now_left_pose = pose
joint_pose = self.robot.get_qpos()
qpos=[]
for i in range(6):
qpos.append(joint_pose[self.left_arm_joint_id[i]])
result = self.left_planner.plan_screw(
target_pose=pose,
qpos=qpos,
time_step=1 / 250,
use_point_cloud=use_point_cloud,
use_attach=use_attach,
)
if result["status"] == "Success":
self.left_follow_path(result,save_freq=save_freq)
return 0
else:
print("\n left arm palnning failed!")
self.left_plan_success = False
self.plan_success = False
def right_move_to_pose_with_screw(self, pose, use_point_cloud=False, use_attach=False,save_freq=-1):
"""
Interpolative planning with screw motion.
Will not avoid collision and will fail if the path contains collision.
"""
save_freq = self.save_freq if save_freq == -1 else save_freq
if type(pose) != list or len(pose) != 7:
print("right arm pose error!")
return
if self.is_right_gripper_open():
self.pre_right_pose = self.now_right_pose
self.now_right_pose = pose
joint_pose = self.robot.get_qpos()
qpos=[]
for i in range(6):
qpos.append(joint_pose[self.right_arm_joint_id[i]])
result = self.right_planner.plan_screw(
target_pose=pose,
qpos=qpos,
time_step=1 / 250,
use_point_cloud=use_point_cloud,
use_attach=use_attach,
)
if result["status"] == "Success":
self.right_follow_path(result,save_freq=save_freq)
return 0
else:
print("\n right arm palnning failed!")
self.right_plan_success = False
self.plan_success = False
def together_move_to_pose_with_screw(self, left_target_pose,right_target_pose, use_point_cloud=False, use_attach=False,save_freq=-1):
"""
Interpolative planning with screw motion.
Will not avoid collision and will fail if the path contains collision.
"""
save_freq = self.save_freq if save_freq == -1 else save_freq
if type(left_target_pose) != list or len(left_target_pose) != 7:
print("left arm pose error!")
return
if type(right_target_pose) != list or len(right_target_pose) != 7:
print("right arm pose error!")
return
if self.is_left_gripper_open():
self.pre_left_pose = self.now_left_pose
if self.is_right_gripper_open():
self.now_left_pose = left_target_pose
self.pre_right_pose = self.now_right_pose
self.now_right_pose = right_target_pose
joint_pose = self.robot.get_qpos()
left_qpos=[]
right_qpos=[]
for i in range(6):
left_qpos.append(joint_pose[self.left_arm_joint_id[i]])
right_qpos.append(joint_pose[self.right_arm_joint_id[i]])
left_result = self.left_planner.plan_screw(
target_pose=left_target_pose,
qpos=left_qpos,
time_step=1 / 250,
use_point_cloud=use_point_cloud,
use_attach=use_attach,
)
right_result = self.right_planner.plan_screw(
target_pose=right_target_pose,
qpos=right_qpos,
time_step=1 / 250,
use_point_cloud=use_point_cloud,
use_attach=use_attach,
)
if left_result["status"] == "Success" and right_result["status"] == "Success":
self.together_follow_path(left_result,right_result,save_freq=save_freq)
return 0
else:
if left_result["status"] != "Success":
print("\n left arm palnning failed!")
self.left_plan_success = False
if right_result["status"] != "Success":
print("\n right arm palnning failed!")
self.right_plan_success = False
self.plan_success = False
# Get Camera RGBA
def _get_camera_rgba(self, camera):
rgba = camera.get_picture("Color")
rgba_img = (rgba * 255).clip(0, 255).astype("uint8")
return rgba_img
# Get Camera Segmentation
def _get_camera_segmentation(self, camera,level = "mesh"):
# visual_id is the unique id of each visual shape
seg_labels = camera.get_picture("Segmentation") # [H, W, 4]
colormap = sorted(set(ImageColor.colormap.values()))
color_palette = np.array(
[ImageColor.getrgb(color) for color in colormap], dtype=np.uint8
)
if level == "mesh":
label0_image = seg_labels[..., 0].astype(np.uint8) # mesh-level
elif level == "actor":
label0_image = seg_labels[..., 1].astype(np.uint8) # actor-level
return color_palette[label0_image]
# Get Camera Depth
def _get_camera_depth(self, camera):
position = camera.get_picture("Position")
depth = -position[..., 2]
depth_image = (depth * 1000.0).astype(np.float64)
return depth_image
# Get Camera PointCloud
def _get_camera_pcd(self, camera, point_num = 0):
rgba = camera.get_picture_cuda("Color").torch() # [H, W, 4]
position = camera.get_picture_cuda("Position").torch()
model_matrix = camera.get_model_matrix()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_matrix = torch.tensor(model_matrix, dtype=torch.float32).to(device)
# Extract valid three-dimensional points and corresponding color data.
valid_mask = position[..., 3] < 1
points_opengl = position[..., :3][valid_mask]
points_color = rgba[valid_mask][:,:3]
# Transform into the world coordinate system.
points_world = torch.bmm(points_opengl.view(1, -1, 3), model_matrix[:3, :3].transpose(0,1).view(-1, 3, 3)).squeeze(1) + model_matrix[:3, 3]
# Format color data.
points_color = torch.clamp(points_color, 0, 1)
points_world = points_world.squeeze(0)
# If crop is needed
if self.pcd_crop:
min_bound = torch.tensor(self.pcd_crop_bbox[0], dtype=torch.float32).to(device)
max_bound = torch.tensor(self.pcd_crop_bbox[1], dtype=torch.float32).to(device)
inside_bounds_mask = (points_world.squeeze(0) >= min_bound).all(dim=1) & (points_world.squeeze(0) <= max_bound).all(dim=1)
points_world = points_world[inside_bounds_mask]
points_color = points_color[inside_bounds_mask]
# Convert the tensor back to a NumPy array for use with Open3D.
points_world_np = points_world.cpu().numpy()
points_color_np = points_color.cpu().numpy()
if point_num > 0:
points_world_np,index = fps(points_world_np,point_num)
index = index.detach().cpu().numpy()[0]
points_color_np = points_color_np[index,:]
return np.hstack((points_world_np, points_color_np))
def arr2pcd(self,point_arr,colors_arr = None):
point_cloud = o3d.geometry.PointCloud()
point_cloud.points = o3d.utility.Vector3dVector(point_arr)
point_cloud.colors = o3d.utility.Vector3dVector(colors_arr)
return point_cloud
def get_left_arm_jointState(self) -> list:
jointState_list = []
for id in self.left_arm_joint_id:
jointState_list.append(self.active_joints[id].get_drive_target()[0].astype(float))
jointState_list.append(self.left_gripper_val)
return jointState_list
def get_right_arm_jointState(self) -> list:
jointState_list = []
for id in self.right_arm_joint_id:
jointState_list.append(self.active_joints[id].get_drive_target()[0].astype(float))
jointState_list.append(self.right_gripper_val)
return jointState_list
def endpose_transform(self, joint, gripper_val):
rpy = joint.global_pose.get_rpy()
roll, pitch, yaw = rpy
x,y,z = joint.global_pose.p
endpose = {
"gripper": float(gripper_val),
"pitch" : float(pitch),
"roll" : float(roll),
"x": float(x),
"y": float(y),
"yaw" : float(yaw),
"z": float(z),
}
return endpose
def get_camera_config(self,camera: sapien.render.RenderCameraComponent):
config = {
"D" : [ 0, 0, 0, 0, 0 ],
"K" : camera.get_intrinsic_matrix().ravel().tolist(),
"P" : np.vstack((camera.get_intrinsic_matrix(), [0,0,0])).ravel().tolist(),
"R" : [ 1, 0, 0, 0, 1, 0, 0, 0, 1 ],
"binning_x" : 0,
"binning_y" : 0,
"distortion_model" : "plumb_bob",
"height" : camera.get_height(),
"parent_frame" :
{
"pitch" : 0,
"roll" : 1.57,
"x" : 0,
"y" : 0,
"yaw" : 1.57,
"z" : 0
},
"roi" :
{
"do_rectify" : 0,
"height" : 0,
"width" : 0,
"x_offset" : 0,
"y_offset" : 0
},
"width" : camera.get_width()
}
return config
def is_left_gripper_open(self):
return self.active_joints[34].get_drive_target()[0] > 0.04
def is_right_gripper_open(self):
return self.active_joints[36].get_drive_target()[0] > 0.04
def is_left_gripper_open_half(self):
return self.active_joints[34].get_drive_target()[0] > 0.02
def is_right_gripper_open_half(self):
return self.active_joints[36].get_drive_target()[0] > 0.02
def is_left_gripper_close(self):
return self.active_joints[34].get_drive_target()[0] < 0.01
def is_right_gripper_close(self):
return self.active_joints[34].get_drive_target()[0] < 0.01
def get_left_endpose_pose(self):
return self.left_endpose.global_pose
def get_right_endpose_pose(self):
return self.right_endpose.global_pose
def _take_picture(self): # Save data
if not self.is_save:
return
print('saving: episode = ', self.ep_num, ' index = ',self.PCD_INDEX, end='\r')
self._update_render()
self.left_camera.take_picture()
self.right_camera.take_picture()
self.head_camera.take_picture()
self.observer_camera.take_picture()
self.front_camera.take_picture()
if self.PCD_INDEX==0:
self.file_path ={
"observer_color" : f"{self.save_dir}/episode{self.ep_num}/camera/color/observer/",
"l_color" : f"{self.save_dir}/episode{self.ep_num}/camera/color/left/",
"l_depth" : f"{self.save_dir}/episode{self.ep_num}/camera/depth/left/",
"l_pcd" : f"{self.save_dir}/episode{self.ep_num}/camera/pointCloud/left/",
"f_color" : f"{self.save_dir}/episode{self.ep_num}/camera/color/front/",
"f_depth" : f"{self.save_dir}/episode{self.ep_num}/camera/depth/front/",
"f_pcd" : f"{self.save_dir}/episode{self.ep_num}/camera/pointCloud/front/",
"r_color" : f"{self.save_dir}/episode{self.ep_num}/camera/color/right/",
"r_depth" : f"{self.save_dir}/episode{self.ep_num}/camera/depth/right/",
"r_pcd" : f"{self.save_dir}/episode{self.ep_num}/camera/pointCloud/right/",
"t_color" : f"{self.save_dir}/episode{self.ep_num}/camera/color/head/",
"t_depth" : f"{self.save_dir}/episode{self.ep_num}/camera/depth/head/",
"t_pcd" : f"{self.save_dir}/episode{self.ep_num}/camera/pointCloud/head/",
"f_seg_mesh" : f"{self.save_dir}/episode{self.ep_num}/camera/segmentation/front/mesh/",
"l_seg_mesh" : f"{self.save_dir}/episode{self.ep_num}/camera/segmentation/left/mesh/",
"r_seg_mesh" : f"{self.save_dir}/episode{self.ep_num}/camera/segmentation/right/mesh/",
"t_seg_mesh" : f"{self.save_dir}/episode{self.ep_num}/camera/segmentation/top/mesh/",
"f_seg_actor" : f"{self.save_dir}/episode{self.ep_num}/camera/segmentation/front/actor/",
"l_seg_actor" : f"{self.save_dir}/episode{self.ep_num}/camera/segmentation/left/actor/",
"r_seg_actor" : f"{self.save_dir}/episode{self.ep_num}/camera/segmentation/right/actor/",
"t_seg_actor" : f"{self.save_dir}/episode{self.ep_num}/camera/segmentation/head/actor/",
"f_camera" : f"{self.save_dir}/episode{self.ep_num}/camera/model_camera/front/",
"t_camera" : f"{self.save_dir}/episode{self.ep_num}/camera/model_camera/head/",
"l_camera" : f"{self.save_dir}/episode{self.ep_num}/camera/model_camera/left/",
"r_camera" : f"{self.save_dir}/episode{self.ep_num}/camera/model_camera/right/",
"ml_ep" : f"{self.save_dir}/episode{self.ep_num}/arm/endPose/masterLeft/",
"mr_ep" : f"{self.save_dir}/episode{self.ep_num}/arm/endPose/masterRight/",
"pl_ep" : f"{self.save_dir}/episode{self.ep_num}/arm/endPose/puppetLeft/",
"pr_ep" : f"{self.save_dir}/episode{self.ep_num}/arm/endPose/puppetRight/",
"pl_joint" : f"{self.save_dir}/episode{self.ep_num}/arm/jointState/puppetLeft/",
"pr_joint" : f"{self.save_dir}/episode{self.ep_num}/arm/jointState/puppetRight/",
"ml_joint" : f"{self.save_dir}/episode{self.ep_num}/arm/jointState/masterLeft/",
"mr_joint" : f"{self.save_dir}/episode{self.ep_num}/arm/jointState/masterRight/",
"pkl" : f"{self.save_dir}_pkl/episode{self.ep_num}/",
"conbine_pcd" : f"{self.save_dir}/episode{self.ep_num}/camera/pointCloud/conbine/",
}
for directory in self.file_path.values():
if os.path.exists(directory):
file_list = os.listdir(directory)
for file in file_list:
os.remove(directory + file)
pkl_dic = {
"observation":{
"head_camera":{}, # rbg , mesh_seg , actior_seg , depth , intrinsic_cv , extrinsic_cv , cam2world_gl(model_matrix)
"left_camera":{},
"right_camera":{},
"front_camera":{}
},
"pointcloud":[], # conbinet pcd
"joint_action":[],
"endpose":[]
}
head_camera_intrinsic_cv = self.head_camera.get_intrinsic_matrix()
head_camera_extrinsic_cv = self.head_camera.get_extrinsic_matrix()
head_camera_model_matrix = self.head_camera.get_model_matrix()
pkl_dic["observation"]["head_camera"] = {
"intrinsic_cv" : head_camera_intrinsic_cv,
"extrinsic_cv" : head_camera_extrinsic_cv,
"cam2world_gl" : head_camera_model_matrix
}
front_camera_intrinsic_cv = self.front_camera.get_intrinsic_matrix()
front_camera_extrinsic_cv = self.front_camera.get_extrinsic_matrix()
front_camera_model_matrix = self.front_camera.get_model_matrix()
pkl_dic["observation"]["front_camera"] = {
"intrinsic_cv" : front_camera_intrinsic_cv,
"extrinsic_cv" : front_camera_extrinsic_cv,
"cam2world_gl" : front_camera_model_matrix
}
left_camera_intrinsic_cv = self.left_camera.get_intrinsic_matrix()
left_camera_extrinsic_cv = self.left_camera.get_extrinsic_matrix()
left_camera_model_matrix = self.left_camera.get_model_matrix()
pkl_dic["observation"]["left_camera"] = {
"intrinsic_cv" : left_camera_intrinsic_cv,
"extrinsic_cv" : left_camera_extrinsic_cv,
"cam2world_gl" : left_camera_model_matrix
}
right_camera_intrinsic_cv = self.right_camera.get_intrinsic_matrix()
right_camera_extrinsic_cv = self.right_camera.get_extrinsic_matrix()
right_camera_model_matrix = self.right_camera.get_model_matrix()
pkl_dic["observation"]["right_camera"] = {
"intrinsic_cv" : right_camera_intrinsic_cv,
"extrinsic_cv" : right_camera_extrinsic_cv,
"cam2world_gl" : right_camera_model_matrix
}
# # ---------------------------------------------------------------------------- #
# # RGBA
# # ---------------------------------------------------------------------------- #
if self.data_type.get('rgb', False):
front_rgba = self._get_camera_rgba(self.front_camera)
head_rgba = self._get_camera_rgba(self.head_camera)
left_rgba = self._get_camera_rgba(self.left_camera)
right_rgba = self._get_camera_rgba(self.right_camera)
if self.save_type.get('raw_data', True):
if self.data_type.get('observer', False):
observer_rgba = self._get_camera_rgba(self.observer_camera)
save_img(self.file_path["observer_color"]+f"{self.PCD_INDEX}.png",observer_rgba)
save_img(self.file_path["t_color"]+f"{self.PCD_INDEX}.png",head_rgba)
save_img(self.file_path["f_color"]+f"{self.PCD_INDEX}.png",front_rgba)
save_img(self.file_path["l_color"]+f"{self.PCD_INDEX}.png",left_rgba)
save_img(self.file_path["r_color"]+f"{self.PCD_INDEX}.png",right_rgba)
if self.save_type.get('pkl' , True):
pkl_dic["observation"]["head_camera"]["rgb"] = head_rgba[:,:,:3]
pkl_dic["observation"]["front_camera"]["rgb"] = front_rgba[:,:,:3]
pkl_dic["observation"]["left_camera"]["rgb"] = left_rgba[:,:,:3]
pkl_dic["observation"]["right_camera"]["rgb"] = right_rgba[:,:,:3]
# # ---------------------------------------------------------------------------- #
# # mesh_segmentation
# # ---------------------------------------------------------------------------- #
if self.data_type.get('mesh_segmentation', False):
head_seg = self._get_camera_segmentation(self.head_camera,level="mesh")
left_seg = self._get_camera_segmentation(self.left_camera,level="mesh")
right_seg = self._get_camera_segmentation(self.right_camera,level="mesh")
front_seg = self._get_camera_segmentation(self.front_camera,level="mesh")
if self.save_type.get('raw_data', True):
save_img(self.file_path["t_seg_mesh"]+f"{self.PCD_INDEX}.png", head_seg)
save_img(self.file_path["l_seg_mesh"]+f"{self.PCD_INDEX}.png", left_seg)
save_img(self.file_path["r_seg_mesh"]+f"{self.PCD_INDEX}.png", right_seg)
save_img(self.file_path["f_seg_mesh"]+f"{self.PCD_INDEX}.png", front_seg)
if self.save_type.get('pkl' , True):
pkl_dic["observation"]["head_camera"]["mesh_segmentation"] = head_seg
pkl_dic["observation"]["front_camera"]["mesh_segmentation"] = front_seg
pkl_dic["observation"]["left_camera"]["mesh_segmentation"] = left_seg
pkl_dic["observation"]["right_camera"]["mesh_segmentation"] = right_seg
# # ---------------------------------------------------------------------------- #
# # actor_segmentation
# # --------------------------------------------------------------------------- #
if self.data_type.get('actor_segmentation', False):
head_seg = self._get_camera_segmentation(self.head_camera,level="actor")
left_seg = self._get_camera_segmentation(self.left_camera,level="actor")
right_seg = self._get_camera_segmentation(self.right_camera,level="actor")
front_seg = self._get_camera_segmentation(self.front_camera,level="actor")
if self.save_type.get('raw_data', True):
save_img(self.file_path["t_seg_actor"]+f"{self.PCD_INDEX}.png", head_seg)
save_img(self.file_path["l_seg_actor"]+f"{self.PCD_INDEX}.png", left_seg)
save_img(self.file_path["r_seg_actor"]+f"{self.PCD_INDEX}.png", right_seg)
save_img(self.file_path["f_seg_actor"]+f"{self.PCD_INDEX}.png", front_seg)
if self.save_type.get('pkl' , True):
pkl_dic["observation"]["head_camera"]["actor_segmentation"] = head_seg
pkl_dic["observation"]["left_camera"]["actor_segmentation"] = left_seg
pkl_dic["observation"]["right_camera"]["actor_segmentation"] = right_seg
pkl_dic["observation"]["front_camera"]["actor_segmentation"] = front_seg
# # ---------------------------------------------------------------------------- #
# # DEPTH
# # ---------------------------------------------------------------------------- #
if self.data_type.get('depth', False):
front_depth = self._get_camera_depth(self.front_camera)
head_depth = self._get_camera_depth(self.head_camera)
left_depth = self._get_camera_depth(self.left_camera)
right_depth = self._get_camera_depth(self.right_camera)
if self.save_type.get('raw_data', True):
save_img(self.file_path["t_depth"]+f"{self.PCD_INDEX}.png", head_depth.astype(np.uint16))
save_img(self.file_path["f_depth"]+f"{self.PCD_INDEX}.png", front_depth.astype(np.uint16))
save_img(self.file_path["l_depth"]+f"{self.PCD_INDEX}.png", left_depth.astype(np.uint16))
save_img(self.file_path["r_depth"]+f"{self.PCD_INDEX}.png", right_depth.astype(np.uint16))
if self.save_type.get('pkl' , True):
pkl_dic["observation"]["head_camera"]["depth"] = head_depth
pkl_dic["observation"]["front_camera"]["depth"] = front_depth
pkl_dic["observation"]["left_camera"]["depth"] = left_depth
pkl_dic["observation"]["right_camera"]["depth"] = right_depth
# # ---------------------------------------------------------------------------- #
# # endpose JSON
# # ---------------------------------------------------------------------------- #
if self.data_type.get('endpose', False):
left_endpose = self.endpose_transform(self.all_joints[42], self.left_gripper_val)
right_endpose = self.endpose_transform(self.all_joints[43], self.right_gripper_val)
if self.save_type.get('raw_data', True):
save_json(self.file_path["ml_ep"]+f"{self.PCD_INDEX}.json", left_endpose)
save_json(self.file_path["pl_ep"]+f"{self.PCD_INDEX}.json", left_endpose)
save_json(self.file_path["mr_ep"]+f"{self.PCD_INDEX}.json", right_endpose)
save_json(self.file_path["pr_ep"]+f"{self.PCD_INDEX}.json", right_endpose)
if self.save_type.get('pkl' , True):
if self.dual_arm:
pkl_dic["endpose"] = np.array([left_endpose["x"],left_endpose["y"],left_endpose["z"],left_endpose["roll"],
left_endpose["pitch"],left_endpose["yaw"],left_endpose["gripper"],
right_endpose["x"],right_endpose["y"],right_endpose["z"],right_endpose["roll"],
right_endpose["pitch"],right_endpose["yaw"],right_endpose["gripper"],])
else:
pkl_dic["endpose"] = np.array([right_endpose["x"],right_endpose["y"],right_endpose["z"],right_endpose["roll"],
right_endpose["pitch"],right_endpose["yaw"],right_endpose["gripper"],])
# # ---------------------------------------------------------------------------- #
# # JointState JSON
# # ---------------------------------------------------------------------------- #
if self.data_type.get('qpos', False):
left_jointstate = {
"effort" : [ 0, 0, 0, 0, 0, 0, 0 ],
"position" : self.get_left_arm_jointState(),
"velocity" : [ 0, 0, 0, 0, 0, 0, 0 ]
}
right_jointstate = {
"effort" : [ 0, 0, 0, 0, 0, 0, 0 ],
"position" : self.get_right_arm_jointState(),
"velocity" : [ 0, 0, 0, 0, 0, 0, 0 ]
}
if self.save_type.get('raw_data', True):
save_json(self.file_path["ml_joint"]+f"{self.PCD_INDEX}.json", left_jointstate)
save_json(self.file_path["pl_joint"]+f"{self.PCD_INDEX}.json", left_jointstate)
save_json(self.file_path["mr_joint"]+f"{self.PCD_INDEX}.json", right_jointstate)
save_json(self.file_path["pr_joint"]+f"{self.PCD_INDEX}.json", right_jointstate)
if self.save_type.get('pkl' , True):
if self.dual_arm:
pkl_dic["joint_action"] = np.array(left_jointstate["position"]+right_jointstate["position"])
else:
pkl_dic["joint_action"] = np.array(right_jointstate["position"])
# # ---------------------------------------------------------------------------- #
# # PointCloud
# # ---------------------------------------------------------------------------- #
if self.data_type.get('pointcloud', False):
head_pcd = self._get_camera_pcd(self.head_camera, point_num=0)
front_pcd = self._get_camera_pcd(self.front_camera, point_num=0)
left_pcd = self._get_camera_pcd(self.left_camera, point_num=0)
right_pcd = self._get_camera_pcd(self.right_camera, point_num=0)
# Merge pointcloud
if self.data_type.get("conbine", False):
conbine_pcd = np.vstack((head_pcd , left_pcd , right_pcd, front_pcd))
else:
conbine_pcd = head_pcd
pcd_array,index = conbine_pcd[:,:3], np.array(range(len(conbine_pcd)))
if self.pcd_down_sample_num > 0:
pcd_array,index = fps(conbine_pcd[:,:3],self.pcd_down_sample_num)
index = index.detach().cpu().numpy()[0]
if self.save_type.get('raw_data', True):
ensure_dir(self.file_path["t_pcd"] + f"{self.PCD_INDEX}.pcd")
o3d.io.write_point_cloud(self.file_path["t_pcd"] + f"{self.PCD_INDEX}.pcd", self.arr2pcd(head_pcd[:,:3], head_pcd[:,3:]))
ensure_dir(self.file_path["l_pcd"] + f"{self.PCD_INDEX}.pcd")
o3d.io.write_point_cloud(self.file_path["l_pcd"] + f"{self.PCD_INDEX}.pcd", self.arr2pcd(left_pcd[:,:3], left_pcd[:,3:]))
ensure_dir(self.file_path["r_pcd"] + f"{self.PCD_INDEX}.pcd")
o3d.io.write_point_cloud(self.file_path["r_pcd"] + f"{self.PCD_INDEX}.pcd", self.arr2pcd(right_pcd[:,:3], right_pcd[:,3:]))
ensure_dir(self.file_path["f_pcd"] + f"{self.PCD_INDEX}.pcd")
o3d.io.write_point_cloud(self.file_path["f_pcd"] + f"{self.PCD_INDEX}.pcd", self.arr2pcd(front_pcd[:,:3], front_pcd[:,3:]))
if self.data_type.get("conbine", False):
ensure_dir(self.file_path["conbine_pcd"] + f"{self.PCD_INDEX}.pcd")
o3d.io.write_point_cloud(self.file_path["conbine_pcd"] + f"{self.PCD_INDEX}.pcd", self.arr2pcd(pcd_array, conbine_pcd[index,3:]))
if self.save_type.get('pkl' , True):
pkl_dic["pointcloud"] = conbine_pcd[index]
#===========================================================#
if self.save_type.get('pkl' , True):
save_pkl(self.file_path["pkl"]+f"{self.PCD_INDEX}.pkl", pkl_dic)
self.PCD_INDEX +=1
def get_obs(self):
self.scene.step()
self._update_render()
self._update_render()
obs = collections.OrderedDict()
left_endpose = self.endpose_transform(self.all_joints[42], self.left_gripper_val)
right_endpose = self.endpose_transform(self.all_joints[43], self.right_gripper_val)
right_jointState = self.get_right_arm_jointState()
right_jointState_array = np.array(right_jointState)
left_jointState = self.get_left_arm_jointState()
left_jointState_array = np.array(left_jointState)
self.left_camera.take_picture()
self.right_camera.take_picture()
self.head_camera.take_picture()
self.front_camera.take_picture()
head_pcd = self._get_camera_pcd(self.head_camera, point_num=0)
left_pcd = self._get_camera_pcd(self.left_camera, point_num=0)
right_pcd = self._get_camera_pcd(self.right_camera, point_num=0)
front_pcd = self._get_camera_pcd(self.front_camera, point_num=0)
head_rgba = self._get_camera_rgba(self.head_camera)
left_rgba = self._get_camera_rgba(self.left_camera)
right_rgba = self._get_camera_rgba(self.right_camera)
front_rgba = self._get_camera_rgba(self.front_camera)
head_depth = self._get_camera_depth(self.head_camera)
left_depth = self._get_camera_depth(self.left_camera)
right_depth = self._get_camera_depth(self.right_camera)
front_depth = self._get_camera_depth(self.front_camera)
# Merge PointCloud
if self.data_type.get("conbine", False):
conbine_pcd = np.vstack((head_pcd , left_pcd , right_pcd, front_pcd))
else:
conbine_pcd = head_pcd
pcd_array, index = fps(conbine_pcd[:,:3],self.pcd_down_sample_num)
obs = {
"observation":{
"head_camera":{}, # rbg , mesh_seg , actior_seg , depth , intrinsic_cv , extrinsic_cv , cam2world_gl(model_matrix)
"left_camera":{},
"right_camera":{},
"front_camera":{}
},
"pointcloud":[], # conbinet pcd
"joint_action":[],
"endpose":[]
}
head_camera_intrinsic_cv = self.head_camera.get_intrinsic_matrix()
head_camera_extrinsic_cv = self.head_camera.get_extrinsic_matrix()
head_camera_model_matrix = self.head_camera.get_model_matrix()
obs["observation"]["head_camera"] = {
"intrinsic_cv" : head_camera_intrinsic_cv,
"extrinsic_cv" : head_camera_extrinsic_cv,
"cam2world_gl" : head_camera_model_matrix
}
front_camera_intrinsic_cv = self.front_camera.get_intrinsic_matrix()
front_camera_extrinsic_cv = self.front_camera.get_extrinsic_matrix()
front_camera_model_matrix = self.front_camera.get_model_matrix()
obs["observation"]["front_camera"] = {
"intrinsic_cv" : front_camera_intrinsic_cv,
"extrinsic_cv" : front_camera_extrinsic_cv,
"cam2world_gl" : front_camera_model_matrix
}
left_camera_intrinsic_cv = self.left_camera.get_intrinsic_matrix()
left_camera_extrinsic_cv = self.left_camera.get_extrinsic_matrix()
left_camera_model_matrix = self.left_camera.get_model_matrix()
obs["observation"]["left_camera"] = {
"intrinsic_cv" : left_camera_intrinsic_cv,
"extrinsic_cv" : left_camera_extrinsic_cv,
"cam2world_gl" : left_camera_model_matrix
}
right_camera_intrinsic_cv = self.right_camera.get_intrinsic_matrix()
right_camera_extrinsic_cv = self.right_camera.get_extrinsic_matrix()
right_camera_model_matrix = self.right_camera.get_model_matrix()
obs["observation"]["right_camera"] = {
"intrinsic_cv" : right_camera_intrinsic_cv,
"extrinsic_cv" : right_camera_extrinsic_cv,
"cam2world_gl" : right_camera_model_matrix
}
obs["observation"]["head_camera"]["rgb"] = head_rgba[:,:,:3]
obs["observation"]["front_camera"]["rgb"] = front_rgba[:,:,:3]
obs["observation"]["left_camera"]["rgb"] = left_rgba[:,:,:3]
obs["observation"]["right_camera"]["rgb"] = right_rgba[:,:,:3]
obs["observation"]["head_camera"]["depth"] = head_depth
obs["observation"]["front_camera"]["depth"] = front_depth
obs["observation"]["left_camera"]["depth"] = left_depth
obs["observation"]["right_camera"]["depth"] = right_depth
obs["pointcloud"] = conbine_pcd[index.detach().cpu().numpy()[0]]
obs["endpose"] = np.array([left_endpose["x"],left_endpose["y"],left_endpose["z"],left_endpose["roll"],
left_endpose["pitch"],left_endpose["yaw"],left_endpose["gripper"],
right_endpose["x"],right_endpose["y"],right_endpose["z"],right_endpose["roll"],
right_endpose["pitch"],right_endpose["yaw"],right_endpose["gripper"],])
obs["joint_action"] = np.hstack((left_jointState_array, right_jointState_array))
return obs
def get_cam_obs(self, observation: dict) -> dict:
head_cam = np.moveaxis(observation['observation']['head_camera']['rgb'], -1, 0) / 255
front_cam = np.moveaxis(observation['observation']['front_camera']['rgb'], -1, 0) / 255
left_cam = np.moveaxis(observation['observation']['left_camera']['rgb'], -1, 0) / 255
right_cam = np.moveaxis(observation['observation']['right_camera']['rgb'], -1, 0) / 255
return dict(
head_cam = head_cam,
front_cam = front_cam,
left_cam = left_cam,
right_cam = right_cam
)
def apply_dp(self, model, video_log=False, save_dir='default'):
cnt = 0
self.test_num += 1
if video_log:
import subprocess
from pathlib import Path
save_dir = Path('video') / save_dir
save_dir.mkdir(parents=True, exist_ok=True)
ffmpeg = subprocess.Popen([
'ffmpeg', '-y',
'-f', 'rawvideo',
'-pixel_format', 'rgb24',
'-video_size', '320x240',
'-framerate', '10',
'-i', '-',
'-pix_fmt', 'yuv420p',
'-vcodec', 'libx264',
'-crf', '23',
f'{save_dir}/{self.test_num}.mp4'
], stdin=subprocess.PIPE)
success_flag = False
self._update_render()
if self.render_freq:
self.viewer.render()
self.actor_pose = True
observation = self.get_obs()
obs = self.get_cam_obs(observation)
obs['agent_pos'] = observation['joint_action']
model.update_obs(obs)
while cnt < self.step_lim:
if video_log:
ffmpeg.stdin.write(observation['head_camera'].tobytes())
actions = model.get_action()
obs = model.get_last_obs()
left_arm_actions , left_gripper , left_current_qpos, left_path = [], [], [], []
right_arm_actions , right_gripper , right_current_qpos, right_path = [], [], [], []
if self.dual_arm:
left_arm_actions,left_gripper = actions[:, :6],actions[:, 6]
right_arm_actions,right_gripper = actions[:, 7:13],actions[:, 13]
left_current_qpos, right_current_qpos = obs['agent_pos'][:6], obs['agent_pos'][7:13]
else:
right_arm_actions,right_gripper = actions[:, :6],actions[:, 6]
right_current_qpos = obs['agent_pos'][:6]
if self.dual_arm:
left_path = np.vstack((left_current_qpos, left_arm_actions))
right_path = np.vstack((right_current_qpos, right_arm_actions))
topp_left_flag, topp_right_flag = True, True
try:
times, left_pos, left_vel, acc, duration = self.left_planner.TOPP(left_path, 1/250, verbose=True)
left_result = dict()
left_result['position'], left_result['velocity'] = left_pos, left_vel
left_n_step = left_result["position"].shape[0]
left_gripper = np.linspace(left_gripper[0], left_gripper[-1], left_n_step)
except:
topp_left_flag = False
left_n_step = 1
if left_n_step == 0 or (not self.dual_arm):
topp_left_flag = False
left_n_step = 1
try:
times, right_pos, right_vel, acc, duration = self.right_planner.TOPP(right_path, 1/250, verbose=True)
right_result = dict()
right_result['position'], right_result['velocity'] = right_pos, right_vel
right_n_step = right_result["position"].shape[0]
right_gripper = np.linspace(right_gripper[0], right_gripper[-1], right_n_step)
except:
topp_right_flag = False
right_n_step = 1
if right_n_step == 0:
topp_right_flag = False
right_n_step = 1
cnt += actions.shape[0]
n_step = max(left_n_step, right_n_step)
obs_update_freq = n_step // actions.shape[0]
now_left_id = 0 if topp_left_flag else 1e9
now_right_id = 0 if topp_right_flag else 1e9
i = 0
while now_left_id < left_n_step or now_right_id < right_n_step:
qf = self.robot.compute_passive_force(
gravity=True, coriolis_and_centrifugal=True
)
self.robot.set_qf(qf)
if topp_left_flag and now_left_id < left_n_step and now_left_id / left_n_step <= now_right_id / right_n_step:
for j in range(len(self.left_arm_joint_id)):
left_j = self.left_arm_joint_id[j]
self.active_joints[left_j].set_drive_target(left_result["position"][now_left_id][j])
self.active_joints[left_j].set_drive_velocity_target(left_result["velocity"][now_left_id][j])
if not self.fix_gripper:
for joint in self.active_joints[34:36]:
# joint.set_drive_target(left_result["position"][i][6])
joint.set_drive_target(left_gripper[now_left_id])
joint.set_drive_velocity_target(0.05)
self.left_gripper_val = left_gripper[now_left_id]
now_left_id +=1
if topp_right_flag and now_right_id < right_n_step and now_right_id / right_n_step <= now_left_id / left_n_step:
for j in range(len(self.right_arm_joint_id)):
right_j = self.right_arm_joint_id[j]
self.active_joints[right_j].set_drive_target(right_result["position"][now_right_id][j])
self.active_joints[right_j].set_drive_velocity_target(right_result["velocity"][now_right_id][j])
if not self.fix_gripper:
for joint in self.active_joints[36:38]:
# joint.set_drive_target(right_result["position"][i][6])
joint.set_drive_target(right_gripper[now_right_id])
joint.set_drive_velocity_target(0.05)
self.right_gripper_val = right_gripper[now_right_id]
now_right_id +=1
self.scene.step()
self._update_render()
if i != 0 and i % obs_update_freq == 0:
observation = self.get_obs()
obs = self.get_cam_obs(observation)
obs['agent_pos'] = observation['joint_action']
model.update_obs(obs)
self._take_picture()
if i % 5==0:
self._update_render()
if self.render_freq and i % self.render_freq == 0:
self.viewer.render()
i+=1
if self.check_success():
success_flag = True
break
if self.actor_pose == False:
break
self. _update_render()
if self.render_freq:
self.viewer.render()
self._take_picture()
print(f'step: {cnt} / {self.step_lim}', end='\r')
if success_flag:
print("\nsuccess!")
self.suc +=1
if video_log:
ffmpeg.stdin.close()
ffmpeg.wait()
del ffmpeg
return
if self.actor_pose == False:
break
continue
print("\nfail!")
if video_log:
ffmpeg.stdin.close()
ffmpeg.wait()
del ffmpeg
def apply_dp3(self, model):
cnt = 0
self.test_num += 1
success_flag = False
self._update_render()
if self.render_freq:
self.viewer.render()
self.actor_pose = True
while cnt < self.step_lim:
observation = self.get_obs()
obs = dict()
obs['point_cloud'] = observation['pointcloud']
if self.dual_arm:
obs['agent_pos'] = observation['joint_action']
assert obs['agent_pos'].shape[0] == 14, 'agent_pose shape, error'
else:
obs['agent_pos'] = observation['joint_action']
assert obs['agent_pos'].shape[0] == 7, 'agent_pose shape, error'
actions = model.get_action(obs)
left_arm_actions , left_gripper , left_current_qpos, left_path = [], [], [], []
right_arm_actions , right_gripper , right_current_qpos, right_path = [], [], [], []
if self.dual_arm:
left_arm_actions,left_gripper = actions[:, :6],actions[:, 6]
right_arm_actions,right_gripper = actions[:, 7:13],actions[:, 13]
left_current_qpos, right_current_qpos = obs['agent_pos'][:6], obs['agent_pos'][7:13]
else:
right_arm_actions,right_gripper = actions[:, :6],actions[:, 6]
right_current_qpos = obs['agent_pos'][:6]
if self.dual_arm:
left_path = np.vstack((left_current_qpos, left_arm_actions))
right_path = np.vstack((right_current_qpos, right_arm_actions))
topp_left_flag, topp_right_flag = True, True
try:
times, left_pos, left_vel, acc, duration = self.left_planner.TOPP(left_path, 1/250, verbose=True)
left_result = dict()
left_result['position'], left_result['velocity'] = left_pos, left_vel
left_n_step = left_result["position"].shape[0]
left_gripper = np.linspace(left_gripper[0], left_gripper[-1], left_n_step)
except:
topp_left_flag = False
left_n_step = 1
if left_n_step == 0 or (not self.dual_arm):
topp_left_flag = False
left_n_step = 1
try:
times, right_pos, right_vel, acc, duration = self.right_planner.TOPP(right_path, 1/250, verbose=True)
right_result = dict()
right_result['position'], right_result['velocity'] = right_pos, right_vel
right_n_step = right_result["position"].shape[0]
right_gripper = np.linspace(right_gripper[0], right_gripper[-1], right_n_step)
except:
topp_right_flag = False
right_n_step = 1
if right_n_step == 0:
topp_right_flag = False
right_n_step = 1
cnt += actions.shape[0]
n_step = max(left_n_step, right_n_step)
obs_update_freq = n_step // actions.shape[0]
now_left_id = 0 if topp_left_flag else 1e9
now_right_id = 0 if topp_right_flag else 1e9
i = 0
while now_left_id < left_n_step or now_right_id < right_n_step:
qf = self.robot.compute_passive_force(
gravity=True, coriolis_and_centrifugal=True
)
self.robot.set_qf(qf)
if topp_left_flag and now_left_id < left_n_step and now_left_id / left_n_step <= now_right_id / right_n_step:
for j in range(len(self.left_arm_joint_id)):
left_j = self.left_arm_joint_id[j]
self.active_joints[left_j].set_drive_target(left_result["position"][now_left_id][j])
self.active_joints[left_j].set_drive_velocity_target(left_result["velocity"][now_left_id][j])
if not self.fix_gripper:
for joint in self.active_joints[34:36]:
joint.set_drive_target(left_gripper[now_left_id])
joint.set_drive_velocity_target(0.05)
self.left_gripper_val = left_gripper[now_left_id]
now_left_id +=1
if topp_right_flag and now_right_id < right_n_step and now_right_id / right_n_step <= now_left_id / left_n_step:
for j in range(len(self.right_arm_joint_id)):
right_j = self.right_arm_joint_id[j]
self.active_joints[right_j].set_drive_target(right_result["position"][now_right_id][j])
self.active_joints[right_j].set_drive_velocity_target(right_result["velocity"][now_right_id][j])
if not self.fix_gripper:
for joint in self.active_joints[36:38]:
joint.set_drive_target(right_gripper[now_right_id])
joint.set_drive_velocity_target(0.05)
self.right_gripper_val = right_gripper[now_right_id]
now_right_id +=1
self.scene.step()
self._update_render()
if i != 0 and i % obs_update_freq == 0:
observation = self.get_obs()
obs=dict()
obs['point_cloud'] = observation['pointcloud']
if self.dual_arm:
obs['agent_pos'] = observation['joint_action']
assert obs['agent_pos'].shape[0] == 14, 'agent_pose shape, error'
else:
obs['agent_pos'] = observation['joint_action']
assert obs['agent_pos'].shape[0] == 7, 'agent_pose shape, error'
model.update_obs(obs)
self._take_picture()
if i % 5==0:
self._update_render()
if self.render_freq and i % self.render_freq == 0:
self.viewer.render()
i+=1
if self.check_success():
success_flag = True
break
if self.actor_pose == False:
break
self. _update_render()
if self.render_freq:
self.viewer.render()
self._take_picture()
print(f'step: {cnt} / {self.step_lim}', end='\r')
if success_flag:
print("\nsuccess!")
self.suc +=1
return
if self.actor_pose == False:
break
print("\nfail!")
################################################# Generate Data API #################################################
def load_actors(self, scene_info_file):
"""
Scene generation function
- scene_info_file: Path to the JSON file containing scene information
"""
try:
with open(scene_info_file, 'r') as f:
scene_info = json.load(f)
except:
print("Load Scene Info Error!")
return
actor_dis_mat = scene_info['actor_dis_lim_mat']
actor_info_list = scene_info['actor_info_list']
try:
actor_pos_link = scene_info['actor_position_link']
except:
actor_pos_link = False
try:
model_id_link = scene_info['model_id_link']
except:
model_id_link = False
position_id = -1
if actor_pos_link:
position_id = np.random.randint(0, scene_info['actor_position_num'])
model_id = -1
if model_id_link:
model_id = np.random.choice(self.id_list)
actor_num = len(actor_info_list)
actor_position = []
def create_rand_pose(actor_info, now_actor_id):
rotate_tag = actor_info['rotate_tag']
ylim_prop = actor_info['ylim_prop']
now_position_id = position_id if position_id != -1 else np.random.randint(0, len(actor_info["x_limit"]))
x_lim = actor_info["x_limit"][now_position_id]
y_lim = actor_info["y_limit"][min(now_position_id, len(actor_info["y_limit"])-1)]
z_lim = actor_info["z_limit"][min(now_position_id, len(actor_info["z_limit"])-1)]
qpose = actor_info['qpose'][min(now_position_id, len(actor_info["qpose"])-1)]
rotate_limit = actor_info['rotate_limit'][min(now_position_id, len(actor_info["rotate_limit"])-1)] if rotate_tag else [0,0,0]
pose = rand_pose(
xlim=x_lim,
ylim=y_lim,
zlim=z_lim,
ylim_prop=ylim_prop,
rotate_rand=rotate_tag,
rotate_lim=rotate_limit,
qpos=qpose
)
recreate = True
while recreate:
recreate = False
for i in range(now_actor_id):
las_actor_position = actor_position[i]
try:
dis = np.max(actor_dis_mat[i][now_actor_id], actor_dis_mat[now_actor_id][i])
if np.sum(pow(pose.p[:2] - las_actor_position,2)) < dis**2:
recreate = True
except:
continue
if recreate:
pose = rand_pose(
xlim=x_lim,
ylim=y_lim,
zlim=z_lim,
ylim_prop=ylim_prop,
rotate_rand=rotate_tag,
rotate_limit=rotate_limit,
qpos=qpose
)
actor_position.append(pose.p[:2])
return pose
for i in range(actor_num):
actor_info = actor_info_list[i]
actor_name = actor_info['actor_name']
actor_type = actor_info['actor_type']
is_static = actor_info['is_static_or_fix_root_link']
try:
convex = actor_info['convex']
except:
convex = False
try:
actor_file = actor_info['actor_file']
except:
actor_file = ''
z_val_protect = actor_info['z_val_protect']
actor_data_from = actor_info['actor_data_from']
rand_model_id = actor_info['rand_model_id']
setattr(self, actor_name, None)
setattr(self, actor_name + "_data", actor_data_from)
now_actor = getattr(self, actor_name)
now_actor_data = getattr(self, actor_name + "_data")
if actor_data_from != 'file':
get_actor_data_func = getattr(self, actor_data_from)
actor_pose = create_rand_pose(actor_info, i)
if actor_type == 'box':
now_actor = create_box(
self.scene,
pose=actor_pose,
half_size=actor_info["half_size"],
color=actor_info["color"],
is_static=is_static,
name=actor_name
)
now_actor_data = get_actor_data_func(actor_info["half_size"])
elif actor_type == 'urdf':
now_actor, now_actor_data = create_urdf_obj(
self.scene,
pose=actor_pose,
modelname=actor_file,
fix_root_link=is_static,
)
active_joints = now_actor.get_active_joints()
for joint in active_joints:
joint.set_drive_property(stiffness=20, damping=5, force_limit=1000, mode="force")
elif actor_type == 'obj' or actor_type == 'glb':
now_model_id = None
if rand_model_id:
now_model_id = model_id if model_id != -1 else np.random.choice(self.id_list)
now_actor, now_actor_data = create_actor(
self.scene,
pose=actor_pose,
modelname=actor_file,
convex=convex,
is_static=is_static,
model_id=now_model_id,
z_val_protect=z_val_protect
)
else:
print("Actor info type Error!")
continue
if is_static == False and actor_type != 'urdf':
now_actor.find_component_by_type(sapien.physx.PhysxRigidDynamicComponent).mass = actor_info["mass"]
self.actor_name_dic[actor_name] = now_actor
self.actor_data_dic[actor_name+'_data'] = now_actor_data
# rand create actor, and no need to record specific information
try:
if scene_info["rand_create_actor"] == True:
rand_actor_num = scene_info["rand_actor_num"]
rand_actor_tol = len(scene_info["rand_choice_actor_list"])
rand_actor_id_list = np.random.choice(rand_actor_tol, size = rand_actor_num, replace=True)
rand_choice_actor_list = scene_info["rand_choice_actor_list"]
for i,id in enumerate(rand_actor_id_list):
actor_info = rand_choice_actor_list[id]
actor_name = actor_info['actor_name']
actor_type = actor_info['actor_type']
is_static = actor_info['is_static_or_fix_root_link']
try:
convex = actor_info['convex']
except:
convex = False
try:
actor_file = actor_info['actor_file']
except:
actor_file = ''
z_val_protect = actor_info['z_val_protect']
actor_data_from = actor_info['actor_data_from']
rand_model_id = actor_info['rand_model_id']
if actor_data_from != 'file':
get_actor_data_func = getattr(self, actor_data_from)
actor_pose = create_rand_pose(actor_info, i + actor_num)
if actor_type == 'box':
now_actor = create_box(
self.scene,
pose=actor_pose,
half_size=actor_info["half_size"],
color=actor_info["color"],
is_static=is_static,
name=actor_name
)
now_actor_data = get_actor_data_func(actor_info["half_size"])
elif actor_type == 'urdf':
now_actor, now_actor_data = create_urdf_obj(
self.scene,
pose=actor_pose,
modelname=actor_file,
fix_root_link=is_static,
)
active_joints = now_actor.get_active_joints()
for joint in active_joints:
joint.set_drive_property(stiffness=20, damping=5, force_limit=1000, mode="force")
elif actor_type == 'obj' or actor_type == 'glb':
now_actor, now_actor_data = create_actor(
self.scene,
pose=actor_pose,
modelname=actor_file,
convex=convex,
is_static=is_static,
model_id=None if not rand_model_id else np.random.choice(self.id_list),
z_val_protect=z_val_protect
)
if is_static == False and actor_type != 'urdf':
now_actor.find_component_by_type(sapien.physx.PhysxRigidDynamicComponent).mass = actor_info["mass"]
except:
pass
# add other data to 'actor_name_dic' and 'actor_data_dic'
try:
for other_actor_data in scene_info['other_actor_data']:
self.actor_name_dic[other_actor_data] = getattr(self, other_actor_data)
self.actor_data_dic[other_actor_data] = getattr(self, other_actor_data)
except:
pass
# A short delay to make the actor stop shaking
render_freq = self.render_freq
self.render_freq = 0
for _ in range(4):
self.together_open_gripper(save_freq=None)
self.render_freq = render_freq
def get_target_pose_from_goal_point_and_direction(self, actor, actor_data = None, endpose = None, target_pose = None, target_grasp_qpose = None):
actor_matrix = actor.get_pose().to_transformation_matrix()
local_target_matrix = np.asarray(actor_data['target_pose'][0])
local_target_matrix[:3,3] *= actor_data['scale']
res_matrix = np.eye(4)
res_matrix[:3,3] = (actor_matrix @ local_target_matrix)[:3,3] - endpose.global_pose.p
res_matrix[:3,3] = np.linalg.inv(t3d.quaternions.quat2mat(endpose.global_pose.q) @ np.array([[1,0,0],[0,-1,0],[0,0,-1]])) @ res_matrix[:3,3]
res_pose = list(target_pose - t3d.quaternions.quat2mat(target_grasp_qpose) @ res_matrix[:3,3]) + target_grasp_qpose
return res_pose
def get_grasp_pose_w_labeled_direction(self, actor, actor_data, pre_dis = 0., id = 0):
"""
Obtain the grasp pose through the marked grasp point.
- actor: The instance of the object to be grasped.
- actor_data: The annotation data corresponding to the instance of the object to be grasped.
- pre_dis: The distance in front of the grasp point.
- id: The index of the grasp point.
"""
actor_matrix = actor.get_pose().to_transformation_matrix()
local_contact_matrix = np.asarray(actor_data['contact_points_pose'][id])
local_contact_matrix[:3,3] *= actor_data['scale']
global_contact_pose_matrix = actor_matrix @ local_contact_matrix @ np.array([[0, 0, 1, 0],
[-1,0, 0, 0],
[0, -1,0, 0],
[0, 0, 0, 1]])
global_contact_pose_matrix_q = global_contact_pose_matrix[:3,:3]
global_grasp_pose_p = global_contact_pose_matrix[:3,3] + global_contact_pose_matrix_q @ np.array([-0.12-pre_dis,0,0]).T
global_grasp_pose_q = t3d.quaternions.mat2quat(global_contact_pose_matrix_q)
res_pose = list(global_grasp_pose_p)+list(global_grasp_pose_q)
return res_pose
def get_grasp_pose_from_goal_point_and_direction(self, actor, actor_data, endpose_tag: str, actor_functional_point_id = 0, target_point = None,
target_approach_direction = [0,0,1,0], actor_target_orientation = None, pre_dis = 0.):
"""
Obtain the grasp pose through the given target point and contact direction.
- actor: The instance of the object to be grasped.
- actor_data: The annotation data corresponding to the instance of the object to be grasped.
- endpose_tag: Left and right gripper marks, with values "left" or "right".
- actor_functional_point_id: The index of the functional point to which the object to be grasped needs to be aligned.
- target_point: The target point coordinates for aligning the functional points of the object to be grasped.
- target_approach_direction: The direction of the grasped object's contact target point,
represented as a quaternion in the world coordinate system.
- actor_target_orientation: The final target orientation of the object,
represented as a direction vector in the world coordinate system.
- pre_dis: The distance in front of the grasp point.
"""
target_approach_direction_mat = t3d.quaternions.quat2mat(target_approach_direction)
actor_matrix = actor.get_pose().to_transformation_matrix()
target_point_copy = deepcopy(target_point[:3])
target_point_copy -= target_approach_direction_mat @ np.array([0,0,pre_dis])
try:
actor_orientation_point = np.array(actor_data['orientation_point'])[:3,3]
except:
actor_orientation_point = [0,0,0]
if actor_target_orientation is not None:
actor_target_orientation = actor_target_orientation / np.linalg.norm(actor_target_orientation)
adjunction_matrix_list = [
# 90 degree
t3d.euler.euler2mat(0,0,0),
t3d.euler.euler2mat(0,0,np.pi/2),
t3d.euler.euler2mat(0,0,-np.pi/2),
t3d.euler.euler2mat(0,0,np.pi),
# 45 degree
t3d.euler.euler2mat(0,0,np.pi/4),
t3d.euler.euler2mat(0,0,np.pi*3/4),
t3d.euler.euler2mat(0,0,-np.pi*3/4),
t3d.euler.euler2mat(0,0,-np.pi/4),
]
end_effector_pose = self.left_endpose if endpose_tag == 'left' else self.right_endpose
res_pose = None
res_eval= -1e10
for adjunction_matrix in adjunction_matrix_list:
local_target_matrix = np.asarray(actor_data['functional_matrix'][actor_functional_point_id])
local_target_matrix[:3,3] *= actor_data['scale']
fuctional_matrix = actor_matrix[:3,:3] @ np.asarray(actor_data['functional_matrix'][actor_functional_point_id])[:3,:3]
fuctional_matrix = fuctional_matrix @ adjunction_matrix
trans_matrix = target_approach_direction_mat @ np.linalg.inv(fuctional_matrix)
end_effector_pose_matrix = t3d.quaternions.quat2mat(end_effector_pose.global_pose.q) @ np.array([[1,0,0],[0,-1,0],[0,0,-1]])
target_grasp_matrix = trans_matrix @ end_effector_pose_matrix
# Use actor target orientation to filter
if actor_target_orientation is not None:
now_actor_orientation_point = trans_matrix @ actor_matrix[:3,:3] @ np.array(actor_orientation_point)
now_actor_orientation_point = now_actor_orientation_point / np.linalg.norm(now_actor_orientation_point)
produt = np.dot(now_actor_orientation_point, actor_target_orientation)
# The difference from the target orientation is too large
if produt < 0.8:
continue
res_matrix = np.eye(4)
res_matrix[:3,3] = (actor_matrix @ local_target_matrix)[:3,3] - end_effector_pose.global_pose.p
res_matrix[:3,3] = np.linalg.inv(end_effector_pose_matrix) @ res_matrix[:3,3]
target_grasp_qpose = t3d.quaternions.mat2quat(target_grasp_matrix)
# priget_grasp_pose_w_labeled_directionnt(target_grasp_matrix @ res_matrix[:3,3])
now_pose = (target_point_copy - target_grasp_matrix @ res_matrix[:3,3]).tolist() + target_grasp_qpose.tolist()
now_pose_eval = self.evaluate_grasp_pose(endpose_tag, now_pose, actor, is_grasp_actor=False, target_point=target_point[:3])
if actor_target_orientation is not None and produt > res_eval or now_pose_eval > res_eval:
res_pose = now_pose
res_eval = now_pose_eval if actor_target_orientation is None else produt
return res_pose
# Get the pose coordinates of the actor's target point in the world coordinate system.
# Return value: [x, y, z]
def get_actor_goal_pose(self,actor,actor_data, id = 0):
if type(actor) == list:
return actor
actor_matrix = actor.get_pose().to_transformation_matrix()
local_target_matrix = np.asarray(actor_data['target_pose'][id])
local_target_matrix[:3,3] *= actor_data['scale']
return (actor_matrix @ local_target_matrix)[:3,3]
# Get the actor's functional point and axis corresponding to the index in the world coordinate system.
# Return value: [x, y, z, quaternion].
def get_actor_functional_pose(self, actor, actor_data, actor_functional_point_id = 0):
if type(actor) == list:
return actor
actor_matrix = actor.get_pose().to_transformation_matrix()
# if "model_type" in actor_data.keys() and actor_data["model_type"] == "urdf": actor_matrix[:3,:3] = self.URDF_MATRIX
local_functional_matrix = np.asarray(actor_data['functional_matrix'][actor_functional_point_id])
local_functional_matrix[:3,3] *= actor_data['scale']
res_matrix = actor_matrix @ local_functional_matrix
return res_matrix[:3,3].tolist() + t3d.quaternions.mat2quat(res_matrix[:3,:3]).tolist()
# Get the actor's grasp point and axis corresponding to the index in the world coordinate system.
# Return value: [x, y, z, quaternion]
def get_actor_contact_point_position(self, actor, actor_data, actor_contact_id = 0):
if type(actor) == list:
return actor
actor_matrix = actor.get_pose().to_transformation_matrix()
# if "model_type" in actor_data.keys() and actor_data["model_type"] == "urdf": actor_matrix[:3,:3] = self.URDF_MATRIX
local_contact_matrix = np.asarray(actor_data['contact_points_pose'][actor_contact_id])
local_contact_matrix[:3,3] *= actor_data['scale']
res_matrix = actor_matrix @ local_contact_matrix
return res_matrix[:3,3].tolist() + t3d.quaternions.mat2quat(res_matrix[:3,:3]).tolist()
def get_grasp_pose_to_grasp_object(self, endpose_tag: str, actor, actor_data, pre_dis = 0):
"""
Grasp the target object and obtain the appropriate grasp pose for the left and right arms when grasping the target actor.
- endpose_tag: Left and right gripper marks, with values "left" or "right".
- actor: The instance of the object to be grasped.
- actor_data: The annotation data corresponding to the instance of the object to be grasped.
- pre_dis: The distance in front of the grasp point.
"""
endpose = self.left_endpose if endpose_tag == 'left' else self.right_endpose
contact_points = actor_data['contact_points_pose']
grasp_pose_eval = -1e9
res_grasp_pose = None
id = None
mask = [True] * len(actor_data["contact_points_pose"])
for i in range(len(actor_data["contact_points_group"])):
if actor_data["contact_points_mask"][i] == False:
for id in actor_data["contact_points_group"][i]:
mask[id] = False
for i in range(len(contact_points)):
if mask[i] == False:
continue
grasp_pose = self.get_grasp_pose_w_labeled_direction(actor, actor_data, pre_dis, i)
now_grasp_pose_eval = self.evaluate_grasp_pose(endpose_tag, grasp_pose, actor, is_grasp_actor = True)
if now_grasp_pose_eval > grasp_pose_eval:
grasp_pose_eval = now_grasp_pose_eval
res_grasp_pose = grasp_pose
id = i
for i, contact_points in enumerate(actor_data["contact_points_group"]):
if id in contact_points:
if endpose_tag == 'left':
self.left_prepare_grasp_data = actor_data
self.left_prepare_grasp_point_group = i
else:
self.right_prepare_grasp_data = actor_data
self.right_prepare_grasp_point_group = i
break
return res_grasp_pose
# Use the planner to test whether the target pose of the left and right robotic arms is reachable.
def is_plan_success(self, endpose_tag: str, grasp_pose: list):
planner = self.left_planner if endpose_tag == 'left' else self.right_planner
arm_joint_id = self.left_arm_joint_id if endpose_tag == 'left' else self.right_arm_joint_id
joint_pose = self.robot.get_qpos()
qpos=[]
for i in range(6):
qpos.append(joint_pose[arm_joint_id[i]])
las_robot_qpose = planner.robot.get_qpos()
result = planner.plan_screw(
target_pose=grasp_pose,
qpos=qpos,
time_step=1 / 250,
use_point_cloud=False,
use_attach=False,
)
planner.robot.set_qpos(las_robot_qpose,full=True)
return result["status"] == "Success" and result["position"].shape[0] <= 2000
def evaluate_grasp_pose(self, endpose_tag: str, grasp_pose: list, actor, is_grasp_actor = True, target_point = None):
"""
Evaluate whether the grasp poses of the left and right arms are appropriate.
- endpose_tag: Left and right gripper marks, with values "left" or "right".
- grasp_pose: The target pose of the gripper that needs to be evaluated.
- actor: The instance of the object to be grasped.
- is_grasp_actor: Whether it is the pose being evaluated during the grasping of the actor,
this parameter affects the evaluation criteria.
- target_point: The the target position the object should reach after being grasped.
"""
# Use the planner to test the grasp pose
is_plan_suc = self.is_plan_success(endpose_tag=endpose_tag, grasp_pose=grasp_pose)
if not is_plan_suc:
return -1e10
endpose = self.left_endpose if endpose_tag == 'left' else self.right_endpose
base_xy = np.array([-0.3,-0.417]) if endpose_tag == 'left' else np.array([0.3,-0.417])
actor_xy = np.array([actor.get_pose().p[0], actor.get_pose().p[1]]) if is_grasp_actor else np.array(target_point[:2])
angle = np.arctan2(actor_xy[1]-base_xy[1], actor_xy[0]-base_xy[0])
res = 0
# The angles of the three rotation axes relative to the reference position should not exceed [-pi/2, pi/2]
trans_matrix = t3d.quaternions.quat2mat(grasp_pose[3:]) @ np.linalg.inv(np.array([[0,-1,0],[1,0,0],[0,0,1]]))
delta_euler = np.array(t3d.euler.mat2euler(trans_matrix))
# print(delta_euler)
if np.any(delta_euler > np.pi/2 + 0.1):
res += 1e9
base_pose = self.left_original_pose if endpose_tag == 'left' else self.right_original_pose
distance = np.sqrt(np.sum((np.array(base_pose[:3]) - np.array(grasp_pose)[:3]) ** 2))
if np.fabs(delta_euler[0]) > 1.2 and distance > 0.4:
res += 1e9
# Restrict the range of x, y, and z coordinates
if endpose_tag == 'left':
grasp_limit = [[-0.4,0.1],[-0.3,0.3],[0.85, 1.2]]
elif endpose_tag == 'right':
grasp_limit = [[-0.1, 0.4],[-0.3,0.3],[0.85, 1.2]]
if np.any([grasp_pose[i] < grasp_limit[i][0] or grasp_pose[i] > grasp_limit[i][1] for i in range(3)]):
res += 1e8
# Calculate the evaluation value of the grasp pose
res += np.sqrt(np.sum((endpose.global_pose.p - np.array(grasp_pose)[:3]) ** 2)) / 0.7
trans_now_pose_matrix = t3d.quaternions.quat2mat(grasp_pose[3:]) @ np.linalg.inv(endpose.global_pose.to_transformation_matrix()[:3,:3])
theta_xy = np.mod(np.abs(t3d.euler.mat2euler(trans_now_pose_matrix)[:2]), np.pi)
theta_z = delta_euler[-1] + np.pi/2 - np.mod(angle + np.pi, np.pi)
res += 2 * np.sum(theta_xy/np.pi) + 2 * np.abs(theta_z)/np.pi
return -res
def get_avoid_collision_pose(self, avoid_collision_arm_tag: str):
"""
Obtain the poses of the left and right arms that can avoid arm collisions.
- avoid_collision_arm_tag: The gripper mark that needs to avoid arm collisions, with values "left" or "right".
"""
if avoid_collision_arm_tag == 'right':
endpose = self.right_endpose.global_pose
target_pose = [0.28, -0.07, endpose.p[2]] + t3d.quaternions.qmult(endpose.q, [0,1,0,0]).tolist()
else:
endpose = self.left_endpose.global_pose
target_pose = [-0.28, -0.07, endpose.p[2]] + t3d.quaternions.qmult(endpose.q, [0,1,0,0]).tolist()
return target_pose
# Get the contact points, target points, and functional points of the object.
def get_actor_points_discription(self):
res_dic = {}
for actor_data_str in self.actor_data_dic.keys():
res_dic[actor_data_str] = {}
for actor_data_str in self.actor_data_dic.keys():
try:
res_dic[actor_data_str]['contact_points'] = self.actor_data_dic[actor_data_str]['contact_points_discription']
except:
res_dic[actor_data_str]['contact_points'] = ""
try:
res_dic[actor_data_str]['target_point'] = self.actor_data_dic[actor_data_str]['target_point_discription']
except:
res_dic[actor_data_str]['target_point'] = ""
try:
res_dic[actor_data_str]['functional_point'] = self.actor_data_dic[actor_data_str]['functional_point_discription']
except:
res_dic[actor_data_str]['functional_point'] = ""
try:
res_dic[actor_data_str]['actor_orientation'] = self.actor_data_dic[actor_data_str]['orientation_point_discription']
except:
res_dic[actor_data_str]['actor_orientation'] = ""
return res_dic
################################################# Generate Data API #################################################
def play_once(self):
pass
def check_success(self):
pass
def pre_move(self):
pass
# ================= For Your Policy Deployment =================
def apply_policy_demo(self, model):
step_cnt = 0
self.test_num += 1
success_flag = False
self._update_render()
if self.render_freq:
self.viewer.render()
self.actor_pose = True
while step_cnt < self.step_lim: # If it is not successful within the specified number of steps, it is judged as a failure.
obs = self.get_obs() # get observation
actions = model.get_action(obs) # TODO, get actions according to your policy and current obs
left_arm_actions , left_gripper , left_current_qpos, left_path = [], [], [], []
right_arm_actions , right_gripper , right_current_qpos, right_path = [], [], [], []
left_arm_actions, left_gripper = actions[:, :6], actions[:, 6] # 0-5 left joint action, 6 left gripper action
right_arm_actions, right_gripper = actions[:, 7:13], actions[:, 13] # 7-12 right joint action, 13 right gripper action
left_current_qpos, right_current_qpos = obs['joint_action'][:6], obs['joint_action'][7:13] # current joint and gripper action
left_path = np.vstack((left_current_qpos, left_arm_actions))
right_path = np.vstack((right_current_qpos, right_arm_actions))
topp_left_flag, topp_right_flag = True, True
try:
times, left_pos, left_vel, acc, duration = self.left_planner.TOPP(left_path, 1/250, verbose=True)
left_result = dict()
left_result['position'], left_result['velocity'] = left_pos, left_vel
left_n_step = left_result["position"].shape[0]
left_gripper = np.linspace(left_gripper[0], left_gripper[-1], left_n_step)
except:
topp_left_flag = False
left_n_step = 1
try:
times, right_pos, right_vel, acc, duration = self.right_planner.TOPP(right_path, 1/250, verbose=True)
right_result = dict()
right_result['position'], right_result['velocity'] = right_pos, right_vel
right_n_step = right_result["position"].shape[0]
right_gripper = np.linspace(right_gripper[0], right_gripper[-1], right_n_step)
except:
topp_right_flag = False
right_n_step = 1
if right_n_step == 0:
topp_right_flag = False
right_n_step = 1
step_cnt += actions.shape[0]
n_step = max(left_n_step, right_n_step)
obs_update_freq = n_step // actions.shape[0]
now_left_id = 0 if topp_left_flag else 1e9
now_right_id = 0 if topp_right_flag else 1e9
i = 0
while now_left_id < left_n_step or now_right_id < right_n_step:
qf = self.robot.compute_passive_force(
gravity=True, coriolis_and_centrifugal=True
)
self.robot.set_qf(qf)
if topp_left_flag and now_left_id < left_n_step and now_left_id / left_n_step <= now_right_id / right_n_step:
for j in range(len(self.left_arm_joint_id)):
left_j = self.left_arm_joint_id[j]
self.active_joints[left_j].set_drive_target(left_result["position"][now_left_id][j])
self.active_joints[left_j].set_drive_velocity_target(left_result["velocity"][now_left_id][j])
if not self.fix_gripper:
for joint in self.active_joints[34:36]:
joint.set_drive_target(left_gripper[now_left_id])
joint.set_drive_velocity_target(0.05)
self.left_gripper_val = left_gripper[now_left_id]
now_left_id +=1
if topp_right_flag and now_right_id < right_n_step and now_right_id / right_n_step <= now_left_id / left_n_step:
for j in range(len(self.right_arm_joint_id)):
right_j = self.right_arm_joint_id[j]
self.active_joints[right_j].set_drive_target(right_result["position"][now_right_id][j])
self.active_joints[right_j].set_drive_velocity_target(right_result["velocity"][now_right_id][j])
if not self.fix_gripper:
for joint in self.active_joints[36:38]:
joint.set_drive_target(right_gripper[now_right_id])
joint.set_drive_velocity_target(0.05)
self.right_gripper_val = right_gripper[now_right_id]
now_right_id +=1
self.scene.step()
self._update_render()
if i % 5==0:
self._update_render()
if self.render_freq and i % self.render_freq == 0:
self.viewer.render()
i += 1
if self.check_success():
success_flag = True
break
if self.actor_pose == False:
break
self. _update_render()
if self.render_freq:
self.viewer.render()
print(f'step: {step_cnt} / {self.step_lim}', end='\r')
if success_flag:
print("\nsuccess!")
self.suc +=1
return
if self.actor_pose == False:
break
print("\nfail!")