-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmape_train.py
142 lines (122 loc) · 4.63 KB
/
mape_train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import time
import numpy as np
from multiagent.environment import MultiAgentEnv
import multiagent.scenarios as scenarios
from train import Train
class MAPETrain(Train):
"""
Training environment for the Multi-Agent Particle Environment (MAPE)
paper:
Lowe, R., Wu, Y., Tamar, A., Harb, J., Abbeel, O. P., & Mordatch, I. (2017).
Multi-agent actor-critic for mixed cooperative-competitive environments.
In Advances in Neural Information Processing Systems (pp. 6379-6390).
Link: http://papers.nips.cc/paper/7217-multi-agent-actor-critic-for-mixed-cooperative-competitive-environments
Open-source: https://github.com/openai/multiagent-particle-envs
"""
def __init__(self):
"""
Create MAPE Train instance
"""
super(MAPETrain, self).__init__()
def parse_args(self):
"""
parse own arguments including default args and mape specific args
"""
self.parse_default_args()
self.parser.add_argument(
"--scenario", type=str, default="simple", help="name of the mape scenario"
)
self.parser.add_argument(
"--partial_observable",
action="store_true",
default=False,
help="use partial observable scenarios",
)
self.parser.add_argument(
"--observation_noise",
action="store_true",
default=False,
help="add Gaussian noise to observations",
)
self.parser.add_argument(
"--environment_noise",
action="store_true",
default=False,
help="add distortion field to environment which adds noise to close agent observations",
)
def create_environment(self):
"""
Create environment instance
:return: environment (gym interface), env_name, task_name, n_agents, observation_sizes,
action_sizes, discrete_actions
"""
# load scenario from script
if self.arglist.partial_observable:
scenario = scenarios.load(
self.arglist.scenario + "_partial_observable.py"
).POScenario()
elif self.arglist.observation_noise:
scenario = scenarios.load(self.arglist.scenario + "_observation_noise.py").ONScenario()
elif self.arglist.environment_noise:
scenario = scenarios.load(self.arglist.scenario + "_env_noise.py").ENScenario()
else:
scenario = scenarios.load(self.arglist.scenario + ".py").Scenario()
# create world
world = scenario.make_world()
# create multiagent environment
env = MultiAgentEnv(world, scenario.reset_world, scenario.reward, scenario.observation)
env_name = "mape"
task_name = "mape_" + self.arglist.scenario
n_agents = env.n
print("Observation spaces: ", [env.observation_space[i] for i in range(n_agents)])
print("Action spaces: ", [env.action_space[i] for i in range(n_agents)])
observation_sizes = self.extract_sizes(env.observation_space)
action_sizes = self.extract_sizes(env.action_space)
discrete_actions = True
return (
env,
env_name,
task_name,
n_agents,
observation_sizes,
action_sizes,
discrete_actions,
)
def reset_environment(self):
"""
Reset environment for new episode
:return: observation (as torch tensor)
"""
obs = self.env.reset()
obs = [np.expand_dims(o, axis=0) for o in obs]
return obs
def select_actions(self, obs, explore=True):
"""
Select actions for agents
:param obs: joint observations for agents
:return: action_tensor, action_list
"""
# get actions as torch Variables
torch_agent_actions = self.alg.step(obs, explore)
# convert actions to numpy arrays
agent_actions = [ac.data.numpy() for ac in torch_agent_actions]
return agent_actions, agent_actions
def environment_step(self, actions):
"""
Take step in the environment
:param actions: actions to apply for each agent
:return: reward, done, next_obs (as Pytorch tensors)
"""
# environment step
next_obs, reward, done, _ = self.env.step(actions)
next_obs = [np.expand_dims(o, axis=0) for o in next_obs]
return reward, done, next_obs
def environment_render(self):
"""
Render visualisation of environment
"""
self.env.render()
time.sleep(0.1)
if __name__ == "__main__":
train = MAPETrain()
train.train()