You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, I am trying to create an environment that is a variation of Cartpole.
From the Cartpole definiton:
The studied system is a cart of which a rigid pole is hinged (see figure). The cart is free to move within the bounds of a one-dimensional track. The pole can move inthe vertical plane parallel to the track. The controller can apply a force F to the cart, parallel to the track.
Suppose you can apply a force F but also a multiplier of this force M, so the total force applied is F * M.
The following is the code:
#PPO-LSTM
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import time
import math
import numpy as np
import gym.envs.classic_control
#Hyperparameters
learning_rate = 0.0005
gamma = 0.98
lmbda = 0.95
eps_clip = 0.1
K_epoch = 2
T_horizon = 20
class CustomCartpole(gym.envs.classic_control.CartPoleEnv):
"""Add a dimension to the cartpole action space that is used as 'speed' button."""
def __init__(self, env_config):
super().__init__()
self.force_mag = 5.0
self.action_space = gym.spaces.MultiDiscrete([2, 4])
def step(self, action):
err_msg = "%r (%s) invalid" % (action, type(action))
assert self.action_space.contains(action), err_msg
x, x_dot, theta, theta_dot = self.state
force = self.force_mag if action[0] == 1 else -self.force_mag
force *= (action[1] + 1)
costheta = math.cos(theta)
sintheta = math.sin(theta)
temp = (force + self.polemass_length * theta_dot ** 2 * sintheta) / self.total_mass
thetaacc = (self.gravity * sintheta - costheta * temp) / (self.length * (4.0 / 3.0 - self.masspole * costheta ** 2 / self.total_mass))
xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
if self.kinematics_integrator == 'euler':
x = x + self.tau * x_dot
x_dot = x_dot + self.tau * xacc
theta = theta + self.tau * theta_dot
theta_dot = theta_dot + self.tau * thetaacc
else: # semi-implicit euler
x_dot = x_dot + self.tau * xacc
x = x + self.tau * x_dot
theta_dot = theta_dot + self.tau * thetaacc
theta = theta + self.tau * theta_dot
self.state = (x, x_dot, theta, theta_dot)
done = bool(
x < -self.x_threshold
or x > self.x_threshold
or theta < -self.theta_threshold_radians
or theta > self.theta_threshold_radians
)
if not done:
reward = 1.0
elif self.steps_beyond_done is None:
# Pole just fell!
self.steps_beyond_done = 0
reward = 1.0
else:
if self.steps_beyond_done == 0:
logger.warn(
"You are calling 'step()' even though this "
"environment has already returned done = True. You "
"should always call 'reset()' once you receive 'done = "
"True' -- any further steps are undefined behavior."
)
self.steps_beyond_done += 1
reward = 0.0
return np.array(self.state), reward, done, {}
class PPO(nn.Module):
def __init__(self):
super(PPO, self).__init__()
self.data = []
self.fc1 = nn.Linear(4,64)
self.lstm = nn.LSTM(64,32)
self.fc_pi = nn.Linear(32,2)
self.fc_v = nn.Linear(32,2)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
def pi(self, x, hidden):
x = F.relu(self.fc1(x))
x = x.view(-1, 1, 64)
x, lstm_hidden = self.lstm(x, hidden)
x = self.fc_pi(x)
prob = F.softmax(x, dim=2)
return prob, lstm_hidden
def v(self, x, hidden):
x = F.relu(self.fc1(x))
x = x.view(-1, 1, 64)
x, lstm_hidden = self.lstm(x, hidden)
v = self.fc_v(x)
return v
def put_data(self, transition):
self.data.append(transition)
def make_batch(self):
s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, h_in_lst, h_out_lst, done_lst = [], [], [], [], [], [], [], []
for transition in self.data:
s, a, r, s_prime, prob_a, h_in, h_out, done = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r])
s_prime_lst.append(s_prime)
prob_a_lst.append([prob_a])
h_in_lst.append(h_in)
h_out_lst.append(h_out)
done_mask = 0 if done else 1
done_lst.append([done_mask])
s,a,r,s_prime,done_mask,prob_a = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
torch.tensor(done_lst, dtype=torch.float), torch.tensor(prob_a_lst)
self.data = []
return s,a,r,s_prime, done_mask, prob_a, h_in_lst[0], h_out_lst[0]
def train_net(self):
s,a,r,s_prime,done_mask, prob_a, (h1_in, h2_in), (h1_out, h2_out) = self.make_batch()
first_hidden = (h1_in.detach(), h2_in.detach())
second_hidden = (h1_out.detach(), h2_out.detach())
for i in range(K_epoch):
v_prime = self.v(s_prime, second_hidden).squeeze(1)
td_target = r + gamma * v_prime * done_mask
v_s = self.v(s, first_hidden).squeeze(1)
delta = td_target - v_s
delta = delta.detach().numpy()
advantage_lst = []
advantage = 0.0
for item in delta[::-1]:
advantage = gamma * lmbda * advantage + item[0]
advantage_lst.append([advantage])
advantage_lst.reverse()
advantage = torch.tensor(advantage_lst, dtype=torch.float)
pi, _ = self.pi(s, first_hidden)
pi_a = pi.squeeze(1).gather(1,a)
ratio = torch.exp(torch.log(pi_a) - torch.log(prob_a)) # a/b == log(exp(a)-exp(b))
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(v_s, td_target.detach())
self.optimizer.zero_grad()
loss.mean().backward(retain_graph=True)
self.optimizer.step()
def main():
#env = gym.make('CartPole-v1')
env = CustomCartpole({'override_actions': False})
model = PPO()
score = 0.0
print_interval = 20
for n_epi in range(10000):
h_out = (torch.zeros([1, 1, 32], dtype=torch.float), torch.zeros([1, 1, 32], dtype=torch.float))
s = env.reset()
done = False
while not done:
for t in range(T_horizon):
h_in = h_out
prob, h_out = model.pi(torch.from_numpy(s).float(), h_in)
prob = prob.view(-1)
m = Categorical(prob)
a = m.sample().item()
s_prime, r, done, info = env.step(a)
model.put_data((s, a, r/100.0, s_prime, prob[a].item(), h_in, h_out, done))
s = s_prime
score += r
if done:
break
model.train_net()
if n_epi%print_interval==0 and n_epi!=0:
print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
score = 0.0
env.close()
if __name__ == '__main__':
main()
This code fails with the following error:
Could you please tell me how to adjust the main loop:
while not done:
for t in range(T_horizon):
h_in = h_out
prob, h_out = model.pi(torch.from_numpy(s).float(), h_in)
prob = prob.view(-1)
m = Categorical(prob)
a = m.sample().item()
s_prime, r, done, info = env.step(a)
model.put_data((s, a, r/100.0, s_prime, prob[a].item(), h_in, h_out, done))
s = s_prime
score += r
if done:
break
model.train_net()
to this environment?
The text was updated successfully, but these errors were encountered:
Hi, I am trying to create an environment that is a variation of Cartpole.
From the Cartpole definiton:
Suppose you can apply a force F but also a multiplier of this force M, so the total force applied is F * M.
The following is the code:
This code fails with the following error:
Could you please tell me how to adjust the main loop:
to this environment?
The text was updated successfully, but these errors were encountered: