import torch import os from cloud_helper import Server from lerobot.policies.factory import get_policy_class os.environ["CUDA_VISIBLE_DEVICES"] = "0" os.environ["HF_HUB_OFFLINE"] = "1" class LerobotInferenceServer: def __init__( self, checkpoint: str, policy_type: str = "smolvla", host: str = "localhost", port: int = 5555, device="cuda", ): self.server = Server(host, port) self.policy_type = policy_type policy_class = get_policy_class(self.policy_type) self.policy = policy_class.from_pretrained(checkpoint) self.device = device self.policy.to(self.device) print(f"Loaded {self.policy_type.upper()} policy from {checkpoint}") def get_actions(self, batch): # batch = { # "observation": { # "state": ..., # "images.front": ..., HWC uint8 # "images.wrist": ..., # }, # "instruction": ..., # } obs = {} for k, v in batch["observation"].items(): if k.startswith("images.") and v is not None: img = v.astype("float32") / 255.0 img = img.transpose(2, 0, 1) # HWC -> CHW img = torch.from_numpy(img).unsqueeze(0).to(self.device) obs[f"observation.{k}"] = img elif k == "state": tensor = torch.from_numpy(v.astype("float32")).unsqueeze(0).to(self.device) obs[f"observation.{k}"] = tensor obs["task"] = batch["instruction"] action_chunk = self.policy.predict_action_chunk(obs) return action_chunk.cpu().numpy() # (B, chunk_size, action_dim) def run(self): self.server.register_endpoint("get_actions", self.get_actions) print(f"Lerobot {self.policy_type.upper()} Server is running...") self.server.loop_forever() if __name__ == "__main__": smolvla_checkpoint = "./20250901/pick_red_marker_smolvla/checkpoints/last/pretrained_model" server = LerobotInferenceServer( checkpoint=smolvla_checkpoint, policy_type="smolvla", host="0.0.0.0", port=50000 ) server.run()