-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreplay.py
53 lines (39 loc) · 1.64 KB
/
replay.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""Script to replay qpos data from HDF5 file on the real robot."""
import argparse
import os
import sys
import time
import h5py
from data_collection.constants import DT
from firmware.robot.robot import Robot
def main(args: argparse.Namespace) -> None:
dataset_dir = args.dataset_dir
episode_idx = args.episode_idx
dataset_name = f"episode_{episode_idx}"
dataset_path = os.path.join(dataset_dir, dataset_name + ".hdf5")
if not os.path.isfile(dataset_path):
print(f"Dataset does not exist at \n{dataset_path}\n")
sys.exit()
with h5py.File(dataset_path, "r") as root:
qpos = root["/observations/qpos"][()]
action = root["/action"][()]
timesteps = qpos.shape[0]
print(f"qpos: {qpos.shape}")
print(f"action: {action.shape}")
print(f"timesteps: {timesteps}")
robot = Robot(config_path="config.yaml", setup="left_arm_replay")
robot.zero_out()
for t in range(timesteps):
start_time = time.time()
robot.set_position({"left_arm": qpos[t]}, radians=False)
robot.update_motor_data()
print(f"Robot at {robot.get_motor_positions()}")
print(f"Moving to {qpos[t]}")
wait_time = DT - (time.time() - start_time)
if wait_time > 0:
time.sleep(wait_time)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Replay qpos data from HDF5 file on the real robot.")
parser.add_argument("--dataset_dir", type=str, required=True, help="Directory containing HDF5 files.")
parser.add_argument("--episode_idx", type=int, required=True, help="Index of the episode to replay.")
main(parser.parse_args())