Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First draft for modular Hindsight Experience Replay Transform #2667

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions test/test_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@
FrameSkipTransform,
GrayScale,
gSDENoise,
HERRewardAssigner,
HERSubGoalAssigner,
HERSubGoalSampler,
HindsightExperienceReplayTransform,
InitTracker,
MultiStepTransform,
NoopResetEnv,
Expand Down Expand Up @@ -12376,6 +12380,102 @@ def test_transform_inverse(self):
pytest.skip("Tested elsewhere")


class TestHERTransform(TransformBase):
@pytest.mark.parametrize("strategy", ["final", "future"])
@pytest.mark.parametrize("device", get_default_devices())
def test_transform_inverse(self, strategy, device):
batch = 10
trajectory_len = 20
num_samples = 4
batch_size = [batch, trajectory_len]
torch.manual_seed(0)

# Let every episode be a random 1D trajectory
velocity = torch.rand((batch, 1), device=device)
time = torch.arange(trajectory_len + 1, device=device).expand(batch, -1)
start_pos = torch.rand((batch, 1), device=device)
pos = start_pos + velocity * time
goal = (
(torch.rand(batch, device=device) * 10)
.expand(trajectory_len, batch)
.T[:, :, None]
)

her = HindsightExperienceReplayTransform(
subgoal_sampler=HERSubGoalSampler(
num_samples=4,
strategy=strategy,
),
subgoal_assigner=HERSubGoalAssigner(
achieved_goal_key=("next", "pos"),
desired_goal_key="original_goal",
),
reward_assigner=HERRewardAssigner(),
)

done = torch.zeros(*batch_size, 1, dtype=torch.bool, device=device)
done[:, -1] = True
reward = done.clone().float()

td = TensorDict(
{
"pos": pos[:, :-1],
"original_goal": goal,
"next": {
"done": done,
"reward": reward,
"pos": pos[:, 1:],
"original_goal": goal,
},
},
batch_size,
device=device,
)

td = her.inv(td)
if strategy == "last":
assert td.shape == (batch * 2, trajectory_len)
elif strategy == "future":
assert td.shape == (batch * (num_samples + 1), trajectory_len)

# original trajectories are at the top so we can check if the sugoals are part of the positions
augmented_td = td[batch:, :]
new_batch_size, _ = augmented_td.shape
for i in range(new_batch_size):
goal_value = augmented_td["original_goal"][i, 0]
assert (goal_value == augmented_td["next", "pos"][i]).any()

def test_parallel_trans_env_check(self):
pass

def test_serial_trans_env_check(self):
pass

def test_single_trans_env_check(self):
pass

def test_trans_parallel_env_check(self):
pass

def test_trans_serial_env_check(self):
pass

def test_transform_compose(self):
pass

def test_transform_env(self):
pass

def test_transform_model(self):
pass

def test_transform_no_env(self):
pass

def test_transform_rb(self):
pass


if __name__ == "__main__":
args, unknown = argparse.ArgumentParser().parse_known_args()
pytest.main([__file__, "--capture", "no", "--exitfirst"] + unknown)
4 changes: 4 additions & 0 deletions torchrl/envs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@
FrameSkipTransform,
GrayScale,
gSDENoise,
HERRewardAssigner,
HERSubGoalAssigner,
HERSubGoalSampler,
HindsightExperienceReplayTransform,
InitTracker,
KLRewardTransform,
MultiStepTransform,
Expand Down
4 changes: 4 additions & 0 deletions torchrl/envs/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
FrameSkipTransform,
GrayScale,
gSDENoise,
HERRewardAssigner,
HERSubGoalAssigner,
HERSubGoalSampler,
HindsightExperienceReplayTransform,
InitTracker,
NoopResetEnv,
ObservationNorm,
Expand Down
238 changes: 238 additions & 0 deletions torchrl/envs/transforms/transforms.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe let's create a dedicated file for these?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give the command on where you would like me to put these and I will do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

envs/transforms/her.py ?

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
from tensordict import (
is_tensor_collection,
LazyStackedTensorDict,
NestedKey,
NonTensorData,
pad_sequence,
set_lazy_legacy,
TensorDict,
TensorDictBase,
Expand Down Expand Up @@ -9264,3 +9266,239 @@ def transform_observation_spec(self, observation_spec: Composite) -> Composite:
high=torch.iinfo(torch.int64).max,
)
return super().transform_observation_spec(observation_spec)


class HERSubGoalSampler(Transform):
"""Returns a TensorDict with a key `subgoal_idx` of shape [batch_size, num_samples] represebting the subgoal index.

Available strategies are: `final` and `future`. The `final` strategy assigns the last state of the trajectory as the subgoal. The `future` strategy samples up to `num_samples` subgoal from all intermediate states within the same trajectory.

Args:
num_samples (int): Number of subgoals to sample from each trajectory. Defaults to 4.
subgoal_idx_key (NestedKey): The key to store the subgoal index. Defaults to "subgoal_idx".
strategy (str): Specifies the subgoal sampling strategy `"final"` | `"future"`. Defaults to `"future"`.

seealso:: `HindsightExperienceReplayTransform`, `HERSubgoalSampler`, `HERSubGoalAssigner`, `HERRewardAssigner`.
"""

def __init__(
self,
num_samples: int = 4,
subgoal_idx_key: NestedKey = "subgoal_idx",
strategy: str = "future",
):
super().__init__(
in_keys=None,
in_keys_inv=None,
out_keys_inv=None,
)
self.num_samples = num_samples
self.subgoal_idx_key = subgoal_idx_key
self.strategy = strategy

def forward(self, trajectories: TensorDictBase) -> TensorDictBase:
assert len(trajectories.shape) in [1, 2]
assert self.strategy in ["final", "future"]

if len(trajectories.shape) == 1:
trajectories = trajectories.unsqueeze(0)

batch_size, trajectory_len = trajectories.shape
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

Suggested change
batch_size, trajectory_len = trajectories.shape
*batch_size, trajectory_len = trajectories.shape

to account for batch size > 2

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment I assume that we have a single trajectory or a batch of trajectories [b, t]. I am not sure what other cases there may be, but we can think about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least we should capture if the shape has more or less than 2 dims and let people know that 2 is the minimum, and if they want more they should ask for the feature on github.


if self.strategy == "final":
return TensorDict(
{
self.subgoal_idx_key: torch.full(
(batch_size, 1),
-2,
dtype=torch.int64,
device=trajectories.device,
)
},
batch_size=batch_size,
)

else:
subgoal_idxs = []
for _ in range(batch_size):
subgoal_idxs.append(
TensorDict(
{
self.subgoal_idx_key: (
torch.randperm(
trajectory_len - 2,
dtype=torch.int64,
device=trajectories.device,
)
+ 1
)[: self.num_samples]
},
batch_size=torch.Size(),
)
)
return pad_sequence(subgoal_idxs, pad_dim=0, return_mask=True)


class HERSubGoalAssigner(Transform):
"""This module assigns the subgoal to the trajectory according to a given subgoal index.

Args:
subgoal_idx_name (str): The key to the subgoal index. Defaults to "subgoal_idx".
subgoal_name (str): The key to assign the observation of the subgoal to the goal. Defaults to "goal".

seealso:: `HindsightExperienceReplayTransform`, `HERSubgoalSampler`, `HERRewardAssigner`.
"""

def __init__(
self,
achieved_goal_key: NestedKey = "achieved_goal",
desired_goal_key: NestedKey = "desired_goal",
):
self.achieved_goal_key = achieved_goal_key
self.desired_goal_key = desired_goal_key

def forward(
self, trajectories: TensorDictBase, subgoals_idxs: torch.Tensor
) -> TensorDictBase:
batch_size, trajectory_len = trajectories.shape
for i in range(batch_size):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there's a vectorized version of this? The ops seem simple enough to be executed in a vectorized way

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I had given it a shot with vmap but indexing is not well supported with vmap. Once we pin down the API, I can give it a shot again.

# Assign the subgoal to the desired_goal_key, and ("next", desired_goal_key) of the trajectory
subgoal = trajectories[i][subgoals_idxs[i]][self.achieved_goal_key]
desired_goal_shape = trajectories[i][self.desired_goal_key].shape
trajectories[i].set_(
self.desired_goal_key, subgoal.expand(desired_goal_shape)
)
trajectories[i].set_(
("next", self.desired_goal_key), subgoal.expand(desired_goal_shape)
)

# Update the done and (next, done) flags
new_done = torch.zeros_like(
trajectories[i]["next", "done"], dtype=torch.bool
)
new_done[subgoals_idxs[i]] = True
trajectories[i].set_(("next", "done"), new_done)

return trajectories


class HERRewardAssigner(Transform):
"""This module assigns a reward of `reward_value` where the new trajectory `(next, done)` is `True`.

Args:
reward_value (float): The reward to be assigned to the newly generated trajectories. Defaults to "1.0".

seealso:: `HindsightExperienceReplayTransform`, `HERSubgoalSampler`, `HERSubGoalAssigner`.
"""

def __init__(
self,
reward_value: float = 1.0,
):
self.reward_value = reward_value

def forward(self, trajectories: TensorDictBase) -> TensorDictBase:
new_reward = torch.zeros_like(trajectories["next", "reward"])
new_reward[trajectories["next", "done"]] = self.reward_value
trajectories.set_(("next", "reward"), new_reward)
return trajectories


class HindsightExperienceReplayTransform(Transform):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to modify the specs?
Does this work with replay buffer (static data) or only envs? If the latter, we should not be using forward.

If you look at Compose, there are a bunch of things that need to be implemented when nesting transforms, like clone, cache eraser etc.

Perhaps we could inherit from Compose and rewrite forward, _apply_transform, _call, _reset etc such that the logic hold but the extra features are included automatically?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a method that we do not need to attach to an environment but it's a data augmentation method. The gist of the augmentation is: Given a trajectory we sample some intermediate states and assume that they are the goal instead. Thus, we can get some positive rewards for hard cases.

"""Hindsight Experience Replay (HER) is a technique that allows to learn from failure by creating new experiences from the failed ones.

This module is a wrapper that includes the following modules:
- SubGoalSampler: Creates new trajectories by sampling future subgoals from the same trajectory.
- SubGoalAssigner: Assigns the subgoal to the trajectory according to a given subgoal index.
- RewardTransform: Assigns the reward to the trajectory according to the new subgoal.

Args:
SubGoalSampler (Transform):
SubGoalAssigner (Transform):
RewardTransform (Transform):

seealso:: `HERSubgoalSampler`, `HERSubGoalAssigner`, `HERRewardAssigner`.
"""

def __init__(
self,
subgoal_sampler: Transform | None = None,
subgoal_assigner: Transform | None = None,
reward_assigner: Transform | None = None,
assign_subgoal_idxs: bool = False,
):
if subgoal_sampler is None:
subgoal_sampler = HERSubGoalSampler()
if subgoal_assigner is None:
subgoal_assigner = HERSubGoalAssigner()
if reward_assigner is None:
reward_assigner = HERRewardAssigner()
super().__init__(
in_keys=None,
in_keys_inv=None,
out_keys_inv=None,
)
self.subgoal_sampler = subgoal_sampler
self.subgoal_assigner = subgoal_assigner
self.reward_assigner = reward_assigner
self.assign_subgoal_idxs = assign_subgoal_idxs

def _inv_call(self, tensordict: TensorDictBase) -> TensorDictBase:
augmentation_td = self.her_augmentation(tensordict)
return torch.cat([tensordict, augmentation_td], dim=0)

def _inv_apply_transform(self, tensordict: TensorDictBase) -> torch.Tensor:
return self.her_augmentation(tensordict)

def forward(self, tensordict: TensorDictBase) -> TensorDictBase:
return tensordict

def _call(self, tensordict: TensorDictBase) -> TensorDictBase:
raise ValueError(self.ENV_ERR)

def her_augmentation(self, trajectories: TensorDictBase):
if len(trajectories.shape) == 1:
trajectories = trajectories.unsqueeze(0)
batch_size, trajectory_length = trajectories.shape
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

Suggested change
batch_size, trajectory_length = trajectories.shape
*batch_size, trajectory_length = trajectories.shape


new_trajectories = trajectories.clone(True)

# Sample subgoal indices
subgoal_idxs = self.subgoal_sampler(new_trajectories)

# Create new trajectories
augmented_trajectories = []
list_idxs = []
for i in range(batch_size):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for i in range(batch_size):
for i in range(batch_size.numel()):

which also works with batch_size=torch.Size([])!

idxs = subgoal_idxs[i][self.subgoal_sampler.subgoal_idx_key]

if "masks" in subgoal_idxs.keys():
idxs = idxs[
subgoal_idxs[i]["masks", self.subgoal_sampler.subgoal_idx_key]
]

list_idxs.append(idxs.unsqueeze(-1))
new_traj = (
new_trajectories[i]
.expand((idxs.numel(), trajectory_length))
.clone(True)
)

if self.assign_subgoal_idxs:
new_traj[self.subgoal_sampler.subgoal_idx_key] = idxs.unsqueeze(
-1
).repeat(1, trajectory_length)

augmented_trajectories.append(new_traj)
augmented_trajectories = torch.cat(augmented_trajectories, dim=0)
associated_idxs = torch.cat(list_idxs, dim=0)

# Assign subgoals to the new trajectories
augmented_trajectories = self.subgoal_assigner.forward(
augmented_trajectories, associated_idxs
)

# Adjust the rewards based on the new subgoals
augmented_trajectories = self.reward_assigner.forward(augmented_trajectories)

return augmented_trajectories