Skip to content

Commit

Permalink
Add is_slippery option for cliffwalking environment (#1087)
Browse files Browse the repository at this point in the history
  • Loading branch information
CloseChoice authored Jul 3, 2024
1 parent fc55d47 commit 1afdb5c
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 19 deletions.
57 changes: 39 additions & 18 deletions gymnasium/envs/toy_text/cliffwalking.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from contextlib import closing
from io import StringIO
from os import path
from typing import Optional
from typing import Any, List, Optional, Tuple, Union

import numpy as np

Expand All @@ -16,6 +16,8 @@
DOWN = 2
LEFT = 3

POSITION_MAPPING = {UP: [-1, 0], RIGHT: [0, 1], DOWN: [1, 0], LEFT: [0, -1]}


class CliffWalkingEnv(Env):
"""
Expand All @@ -33,6 +35,9 @@ class CliffWalkingEnv(Env):
Adapted from Example 6.6 (page 132) from Reinforcement Learning: An Introduction
by Sutton and Barto [<a href="#cliffwalk_ref">1</a>].
The cliff can be chosen to be slippery (disabled by default) so the player may move perpendicular
to the intended direction sometimes (see <a href="#is_slippy">`is_slippery`</a>).
With inspiration from:
[https://github.com/dennybritz/reinforcement-learning/blob/master/lib/envs/cliff_walking.py](https://github.com/dennybritz/reinforcement-learning/blob/master/lib/envs/cliff_walking.py)
Expand Down Expand Up @@ -78,14 +83,15 @@ class CliffWalkingEnv(Env):
```python
import gymnasium as gym
gym.make('CliffWalking-v0')
gym.make('CliffWalking-v1')
```
## References
<a id="cliffwalk_ref"></a>[1] R. Sutton and A. Barto, “Reinforcement Learning:
An Introduction” 2020. [Online]. Available: [http://www.incompleteideas.net/book/RLbook2020.pdf](http://www.incompleteideas.net/book/RLbook2020.pdf)
## Version History
- v1: Add slippery version of cliffwalking
- v0: Initial version release
"""
Expand All @@ -95,13 +101,15 @@ class CliffWalkingEnv(Env):
"render_fps": 4,
}

def __init__(self, render_mode: Optional[str] = None):
def __init__(self, render_mode: Optional[str] = None, is_slippery: bool = False):
self.shape = (4, 12)
self.start_state_index = np.ravel_multi_index((3, 0), self.shape)

self.nS = np.prod(self.shape)
self.nA = 4

self.is_slippery = is_slippery

# Cliff Location
self._cliff = np.zeros(self.shape, dtype=bool)
self._cliff[3, 1:-1] = True
Expand All @@ -111,10 +119,10 @@ def __init__(self, render_mode: Optional[str] = None):
for s in range(self.nS):
position = np.unravel_index(s, self.shape)
self.P[s] = {a: [] for a in range(self.nA)}
self.P[s][UP] = self._calculate_transition_prob(position, [-1, 0])
self.P[s][RIGHT] = self._calculate_transition_prob(position, [0, 1])
self.P[s][DOWN] = self._calculate_transition_prob(position, [1, 0])
self.P[s][LEFT] = self._calculate_transition_prob(position, [0, -1])
self.P[s][UP] = self._calculate_transition_prob(position, UP)
self.P[s][RIGHT] = self._calculate_transition_prob(position, RIGHT)
self.P[s][DOWN] = self._calculate_transition_prob(position, DOWN)
self.P[s][LEFT] = self._calculate_transition_prob(position, LEFT)

# Calculate initial state distribution
# We always start in state (3, 0)
Expand Down Expand Up @@ -150,25 +158,38 @@ def _limit_coordinates(self, coord: np.ndarray) -> np.ndarray:
coord[1] = max(coord[1], 0)
return coord

def _calculate_transition_prob(self, current, delta):
def _calculate_transition_prob(
self, current: Union[List[int], np.ndarray], move: int
) -> List[Tuple[float, Any, int, bool]]:
"""Determine the outcome for an action. Transition Prob is always 1.0.
Args:
current: Current position on the grid as (row, col)
delta: Change in position for transition
Returns:
Tuple of ``(1.0, new_state, reward, terminated)``
Tuple of ``(transition_probability, new_state, reward, terminated)``
where `transition_probability` is 1 if the environment is not slippery, otherwise 1/3 for `move`
and the perpendicular moves.
"""
new_position = np.array(current) + np.array(delta)
new_position = self._limit_coordinates(new_position).astype(int)
new_state = np.ravel_multi_index(tuple(new_position), self.shape)
if self._cliff[tuple(new_position)]:
return [(1.0, self.start_state_index, -100, False)]

terminal_state = (self.shape[0] - 1, self.shape[1] - 1)
is_terminated = tuple(new_position) == terminal_state
return [(1.0, new_state, -1, is_terminated)]
if not self.is_slippery:
deltas = [POSITION_MAPPING[move]]
else:
deltas = [
POSITION_MAPPING[act] for act in [(move - 1) % 4, move, (move + 1) % 4]
]
outcomes = []
for delta in deltas:
new_position = np.array(current) + np.array(delta)
new_position = self._limit_coordinates(new_position).astype(int)
new_state = np.ravel_multi_index(tuple(new_position), self.shape)
if self._cliff[tuple(new_position)]:
outcomes.append((1 / len(deltas), self.start_state_index, -100, False))
else:
terminal_state = (self.shape[0] - 1, self.shape[1] - 1)
is_terminated = tuple(new_position) == terminal_state
outcomes.append((1 / len(deltas), new_state, -1, is_terminated))
return outcomes

def step(self, a):
transitions = self.P[self.s][a]
Expand Down
22 changes: 21 additions & 1 deletion tests/envs/test_env_implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import gymnasium as gym
from gymnasium.envs.box2d import BipedalWalker, CarRacing
from gymnasium.envs.box2d.lunar_lander import demo_heuristic_lander
from gymnasium.envs.toy_text import TaxiEnv
from gymnasium.envs.toy_text import CliffWalkingEnv, TaxiEnv
from gymnasium.envs.toy_text.frozen_lake import generate_random_map


Expand Down Expand Up @@ -87,6 +87,26 @@ def test_carracing_domain_randomize():
).all(), f"Have same grass color after reset. Before: {grass_color}, after: {env.grass_color}."


def test_slippery_cliffwalking():
"""Test that the slippery cliffwalking environment is correctly implemented.
We check here that there are always 3 possible transitions for each action and
that there is a 1/3 probability for each.
"""
envs = CliffWalkingEnv(is_slippery=True)
for actions_dict in envs.P.values():
for transitions in actions_dict.values():
assert len(transitions) == 3
assert all([r[0] == 1 / 3 for r in transitions])


def test_cliffwalking():
env = CliffWalkingEnv(is_slippery=False)
for actions_dict in env.P.values():
for transitions in actions_dict.values():
assert len(transitions) == 1
assert all([r[0] == 1.0 for r in transitions])


@pytest.mark.parametrize("seed", range(5))
def test_bipedal_walker_hardcore_creation(seed: int):
"""Test BipedalWalker hardcore creation.
Expand Down

0 comments on commit 1afdb5c

Please sign in to comment.