#!/usr/bin/env python # Copyright 2024 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import time from functools import cached_property from itertools import chain from typing import Any import numpy as np from lerobot.common.cameras.utils import make_cameras_from_configs from lerobot.common.constants import OBS_IMAGES, OBS_STATE from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode from lerobot.common.motors.feetech import ( FeetechMotorsBus, OperatingMode, ) from ..robot import Robot from ..utils import ensure_safe_goal_position from .config_lekiwi import LeKiwiConfig logger = logging.getLogger(__name__) class LeKiwi(Robot): """ The robot includes a three omniwheel mobile base and a remote follower arm. The leader arm is connected locally (on the laptop) and its joint positions are recorded and then forwarded to the remote follower arm (after applying a safety clamp). In parallel, keyboard teleoperation is used to generate raw velocity commands for the wheels. """ config_class = LeKiwiConfig name = "lekiwi" def __init__(self, config: LeKiwiConfig): super().__init__(config) self.config = config norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100 self.bus = FeetechMotorsBus( port=self.config.port, motors={ # arm "arm_shoulder_pan": Motor(1, "sts3215", norm_mode_body), "arm_shoulder_lift": Motor(2, "sts3215", norm_mode_body), "arm_elbow_flex": Motor(3, "sts3215", norm_mode_body), "arm_wrist_flex": Motor(4, "sts3215", norm_mode_body), "arm_wrist_roll": Motor(5, "sts3215", norm_mode_body), "arm_gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), # base "base_left_wheel": Motor(7, "sts3215", MotorNormMode.RANGE_M100_100), "base_right_wheel": Motor(8, "sts3215", MotorNormMode.RANGE_M100_100), "base_back_wheel": Motor(9, "sts3215", MotorNormMode.RANGE_M100_100), }, calibration=self.calibration, ) self.arm_motors = [motor for motor in self.bus.motors if motor.startswith("arm")] self.base_motors = [motor for motor in self.bus.motors if motor.startswith("base")] self.cameras = make_cameras_from_configs(config.cameras) @property def _state_ft(self) -> dict[str, type]: return dict.fromkeys( ( "arm_shoulder_pan.pos", "arm_shoulder_lift.pos", "arm_elbow_flex.pos", "arm_wrist_flex.pos", "arm_wrist_roll.pos", "arm_gripper.pos", "x.vel", "y.vel", "theta.vel", ), float, ) @property def _cameras_ft(self) -> dict[str, tuple]: return { cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras } @cached_property def observation_features(self) -> dict[str, type | tuple]: return {**self._state_ft, **self._cameras_ft} @cached_property def action_features(self) -> dict[str, type]: return self._state_ft @property def is_connected(self) -> bool: return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) def connect(self, calibrate: bool = True) -> None: if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() for cam in self.cameras.values(): cam.connect() self.configure() logger.info(f"{self} connected.") @property def is_calibrated(self) -> bool: return self.bus.is_calibrated def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") motors = self.arm_motors + self.base_motors self.bus.disable_torque(self.arm_motors) for name in self.arm_motors: self.bus.write("Operating_Mode", name, OperatingMode.POSITION.value) input("Move robot to the middle of its range of motion and press ENTER....") homing_offsets = self.bus.set_half_turn_homings(self.arm_motors) homing_offsets.update(dict.fromkeys(self.base_motors, 0)) full_turn_motor = [ motor for motor in motors if any(keyword in motor for keyword in ["wheel", "wrist"]) ] unknown_range_motors = [motor for motor in motors if motor not in full_turn_motor] print( f"Move all arm joints except '{full_turn_motor}' sequentially through their " "entire ranges of motion.\nRecording positions. Press ENTER to stop..." ) range_mins, range_maxes = self.bus.record_ranges_of_motion(unknown_range_motors) for name in full_turn_motor: range_mins[name] = 0 range_maxes[name] = 4095 self.calibration = {} for name, motor in self.bus.motors.items(): self.calibration[name] = MotorCalibration( id=motor.id, drive_mode=0, homing_offset=homing_offsets[name], range_min=range_mins[name], range_max=range_maxes[name], ) self.bus.write_calibration(self.calibration) self._save_calibration() print("Calibration saved to", self.calibration_fpath) def configure(self): # Set-up arm actuators (position mode) # We assume that at connection time, arm is in a rest position, # and torque can be safely disabled to run calibration. self.bus.disable_torque() self.bus.configure_motors() for name in self.arm_motors: self.bus.write("Operating_Mode", name, OperatingMode.POSITION.value) # Set P_Coefficient to lower value to avoid shakiness (Default is 32) self.bus.write("P_Coefficient", name, 16) # Set I_Coefficient and D_Coefficient to default value 0 and 32 self.bus.write("I_Coefficient", name, 0) self.bus.write("D_Coefficient", name, 32) for name in self.base_motors: self.bus.write("Operating_Mode", name, OperatingMode.VELOCITY.value) self.bus.enable_torque() def setup_motors(self) -> None: for motor in chain(reversed(self.arm_motors), reversed(self.base_motors)): input(f"Connect the controller board to the '{motor}' motor only and press enter.") self.bus.setup_motor(motor) print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") @staticmethod def _degps_to_raw(degps: float) -> int: steps_per_deg = 4096.0 / 360.0 speed_in_steps = degps * steps_per_deg speed_int = int(round(speed_in_steps)) # Cap the value to fit within signed 16-bit range (-32768 to 32767) if speed_int > 0x7FFF: speed_int = 0x7FFF # 32767 -> maximum positive value elif speed_int < -0x8000: speed_int = -0x8000 # -32768 -> minimum negative value return speed_int @staticmethod def _raw_to_degps(raw_speed: int) -> float: steps_per_deg = 4096.0 / 360.0 magnitude = raw_speed degps = magnitude / steps_per_deg return degps def _body_to_wheel_raw( self, x: float, y: float, theta: float, wheel_radius: float = 0.05, base_radius: float = 0.125, max_raw: int = 3000, ) -> dict: """ Convert desired body-frame velocities into wheel raw commands. Parameters: x_cmd : Linear velocity in x (m/s). y_cmd : Linear velocity in y (m/s). theta_cmd : Rotational velocity (deg/s). wheel_radius: Radius of each wheel (meters). base_radius : Distance from the center of rotation to each wheel (meters). max_raw : Maximum allowed raw command (ticks) per wheel. Returns: A dictionary with wheel raw commands: {"base_left_wheel": value, "base_back_wheel": value, "base_right_wheel": value}. Notes: - Internally, the method converts theta_cmd to rad/s for the kinematics. - The raw command is computed from the wheels angular speed in deg/s using _degps_to_raw(). If any command exceeds max_raw, all commands are scaled down proportionally. """ # Convert rotational velocity from deg/s to rad/s. theta_rad = theta * (np.pi / 180.0) # Create the body velocity vector [x, y, theta_rad]. velocity_vector = np.array([x, y, theta_rad]) # Define the wheel mounting angles with a -90° offset. angles = np.radians(np.array([240, 120, 0]) - 90) # Build the kinematic matrix: each row maps body velocities to a wheel’s linear speed. # The third column (base_radius) accounts for the effect of rotation. m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles]) # Compute each wheel’s linear speed (m/s) and then its angular speed (rad/s). wheel_linear_speeds = m.dot(velocity_vector) wheel_angular_speeds = wheel_linear_speeds / wheel_radius # Convert wheel angular speeds from rad/s to deg/s. wheel_degps = wheel_angular_speeds * (180.0 / np.pi) # Scaling steps_per_deg = 4096.0 / 360.0 raw_floats = [abs(degps) * steps_per_deg for degps in wheel_degps] max_raw_computed = max(raw_floats) if max_raw_computed > max_raw: scale = max_raw / max_raw_computed wheel_degps = wheel_degps * scale # Convert each wheel’s angular speed (deg/s) to a raw integer. wheel_raw = [self._degps_to_raw(deg) for deg in wheel_degps] return { "base_left_wheel": wheel_raw[0], "base_back_wheel": wheel_raw[1], "base_right_wheel": wheel_raw[2], } def _wheel_raw_to_body( self, left_wheel_speed, back_wheel_speed, right_wheel_speed, wheel_radius: float = 0.05, base_radius: float = 0.125, ) -> dict[str, Any]: """ Convert wheel raw command feedback back into body-frame velocities. Parameters: wheel_raw : Vector with raw wheel commands ("base_left_wheel", "base_back_wheel", "base_right_wheel"). wheel_radius: Radius of each wheel (meters). base_radius : Distance from the robot center to each wheel (meters). Returns: A dict (x_cmd, y_cmd, theta_cmd) where: OBS_STATE.x_cmd : Linear velocity in x (m/s). OBS_STATE.y_cmd : Linear velocity in y (m/s). OBS_STATE.theta_cmd : Rotational velocity in deg/s. """ # Convert each raw command back to an angular speed in deg/s. wheel_degps = np.array( [ self._raw_to_degps(left_wheel_speed), self._raw_to_degps(back_wheel_speed), self._raw_to_degps(right_wheel_speed), ] ) # Convert from deg/s to rad/s. wheel_radps = wheel_degps * (np.pi / 180.0) # Compute each wheel’s linear speed (m/s) from its angular speed. wheel_linear_speeds = wheel_radps * wheel_radius # Define the wheel mounting angles with a -90° offset. angles = np.radians(np.array([240, 120, 0]) - 90) m = np.array([[np.cos(a), np.sin(a), base_radius] for a in angles]) # Solve the inverse kinematics: body_velocity = M⁻¹ · wheel_linear_speeds. m_inv = np.linalg.inv(m) velocity_vector = m_inv.dot(wheel_linear_speeds) x, y, theta_rad = velocity_vector theta = theta_rad * (180.0 / np.pi) return { "x.vel": x, "y.vel": y, "theta.vel": theta, } # m/s and deg/s def get_observation(self) -> dict[str, Any]: if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") # Read actuators position for arm and vel for base start = time.perf_counter() arm_pos = self.bus.sync_read("Present_Position", self.arm_motors) base_wheel_vel = self.bus.sync_read("Present_Velocity", self.base_motors) base_vel = self._wheel_raw_to_body( base_wheel_vel["base_left_wheel"], base_wheel_vel["base_back_wheel"], base_wheel_vel["base_right_wheel"], ) arm_state = {f"{k}.pos": v for k, v in arm_pos.items()} flat_states = {**arm_state, **base_vel} obs_dict = {f"{OBS_STATE}": flat_states} dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read state: {dt_ms:.1f}ms") # Capture images from cameras for cam_key, cam in self.cameras.items(): start = time.perf_counter() obs_dict[f"{OBS_IMAGES}.{cam_key}"] = cam.async_read() dt_ms = (time.perf_counter() - start) * 1e3 logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") return obs_dict def send_action(self, action: dict[str, Any]) -> dict[str, Any]: """Command lekiwi to move to a target joint configuration. The relative action magnitude may be clipped depending on the configuration parameter `max_relative_target`. In this case, the action sent differs from original action. Thus, this function always returns the action actually sent. Raises: RobotDeviceNotConnectedError: if robot is not connected. Returns: np.ndarray: the action sent to the motors, potentially clipped. """ if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") arm_goal_pos = {k: v for k, v in action.items() if k.endswith(".pos")} base_goal_vel = {k: v for k, v in action.items() if k.endswith(".vel")} base_wheel_goal_vel = self._body_to_wheel_raw( base_goal_vel["x.vel"], base_goal_vel["y.vel"], base_goal_vel["theta.vel"] ) # Cap goal position when too far away from present position. # /!\ Slower fps expected due to reading from the follower. if self.config.max_relative_target is not None: present_pos = self.bus.sync_read("Present_Position", self.arm_motors) goal_present_pos = {key: (g_pos, present_pos[key]) for key, g_pos in arm_goal_pos.items()} arm_safe_goal_pos = ensure_safe_goal_position(goal_present_pos, self.config.max_relative_target) arm_goal_pos = arm_safe_goal_pos # Send goal position to the actuators arm_goal_pos_raw = {k.replace(".pos", ""): v for k, v in arm_goal_pos.items()} self.bus.sync_write("Goal_Position", arm_goal_pos_raw) self.bus.sync_write("Goal_Velocity", base_wheel_goal_vel) return {**arm_goal_pos, **base_goal_vel} def stop_base(self): self.bus.sync_write("Goal_Velocity", dict.fromkeys(self.base_motors, 0), num_retry=5) logger.info("Base motors stopped") def disconnect(self): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") self.stop_base() self.bus.disconnect(self.config.disable_torque_on_disconnect) for cam in self.cameras.values(): cam.disconnect() logger.info(f"{self} disconnected.")