-
Notifications
You must be signed in to change notification settings - Fork 2
/
custom_env.py
114 lines (95 loc) · 3.33 KB
/
custom_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from typing import Tuple
import numpy as np
import torch
from bricksrl.environments.base.base_env import BaseEnv
from tensordict import TensorDict, TensorDictBase
from torchrl.data.tensor_specs import BoundedTensorSpec, CompositeSpec
class CustomEnv(BaseEnv):
"""
Environment template for creating your own custom environment for BricksRL.
Args:
max_episode_steps (int): The maximum number of steps per episode. Defaults to 10.
verbose (bool): Whether to print additional information. Defaults to False.
"""
def __init__(
self,
max_episode_steps: int = 50,
verbose: bool = False,
):
self._batch_size = torch.Size([1])
self.max_episode_steps = max_episode_steps
# Define action spec
self.action_spec = BoundedTensorSpec(
low=-1,
high=1,
shape=(1, self.action_dim),
)
# Define observation spec
observation_spec = BoundedTensorSpec(
low=-1,
high=1,
shape=(1, self.state_dim),
)
self.observation_spec = CompositeSpec(
{self.observation_key: observation_spec}, shape=(1,)
)
super().__init__(
action_dim=self.action_dim,
state_dim=self.state_dim,
verbose=verbose,
)
def _reset(self, tensordict: TensorDictBase, **kwargs) -> TensorDictBase:
"""
Reset the environment and return the initial state.
Returns:
TensorDictBase: The initial state of the environment.
"""
# TODO solve this fake action sending before to receive first state
self.episode_step_iter = 0
if tensordict is not None:
action = tensordict.get("action").cpu().numpy().squeeze()
else:
action = np.zeros(self.action_dim)
self.send_to_hub(action)
# Get current observation
observation = self.read_from_hub()
return TensorDict(
{
self.observation_key: torch.tensor(observation, dtype=torch.float32),
},
batch_size=[1],
)
def reward(
self,
action: np.ndarray,
next_state: np.ndarray,
) -> Tuple[float, bool]:
"""Your custom reward function"""
return 1.0, False
def _step(self, tensordict: TensorDictBase) -> TensorDictBase:
"""Custom step function"""
# Send action to hub to receive next state
action = tensordict.get("action").cpu().numpy().squeeze()
self.send_to_hub(action)
# receive the next state
next_observation = self.read_from_hub()
# calc reward and done
reward, done = self.reward(
action=action,
next_state=next_observation,
)
next_tensordict = TensorDict(
{
self.observation_key: torch.tensor(
next_observation, dtype=torch.float32
),
"reward": torch.tensor([reward]).float(),
"done": torch.tensor([done]).bool(),
},
batch_size=[1],
)
# increment episode step counter
self.episode_step_iter += 1
if self.episode_step_iter >= self.max_episode_steps:
next_tensordict.set("done", torch.tensor([True]))
return next_tensordict