-
Notifications
You must be signed in to change notification settings - Fork 0
/
actions.py
executable file
·360 lines (290 loc) · 10.3 KB
/
actions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
class Action(object):
def __init__(self, agent, name):
"""
Initialzes object.
Args:
agent (agents.RequestSensingAgent): Agent performing the action.
"""
self.agent = agent
self.name = name
def apply(self):
"""
Apply action to environment.
Returns:
True if successful else False
"""
raise NotImplementedError()
def rollback(self):
"""
Rollback action, i.e., move environment to state before action was
applied.
Returns:
True if successful else False.
"""
raise NotImplementedError()
class AddEdge(Action):
def __init__(self, agent):
"""
Initializes object.
Args:
agent (agents.AbstractAgent): An agent that performs those edges.
"""
super(AddEdge, self).__init__(agent, "AddEdge")
self.applicable = self.agent.inventory > 0
self.applied = False
self.edge = None
self.edge_value = 0
def apply(self):
"""
Tries to add an edge to the topology. The action succeeds if it can
be applied. If the action is not applicable nothing happens. If it is
applicable then the agents inventory is reduced.
Returns:
applied (bool):
"""
self.applied = False
if self.applicable:
if self.edge is None:
# Let the agent select an edge to place. If this returns None
# then the agent was not able to pick a suitable edge and
# this action becomes a noop.
self.edge = self.agent.choose_edge_to_place(exclude_existing=False)
if self.edge is None:
self.edge = -1
if self.edge != -1:
self.edge_value = self.agent.environment.add_edge(self.edge, value=self.agent.edge_value)
self.applied = True
self.agent.inventory -= self.edge_value
self.applied = True
return self.applied
def rollback(self):
"""
Rollback this action, i.e., remove edge again from the topology.
Returns:
None
"""
if self.applied:
value = self.agent.environment.remove_edge(self.edge, value=self.agent.edge_value)
assert value == self.edge_value, "Value of added and removed " \
+ "do not match"
self.agent.inventory += value
self.applied = False
self.edge_value = 0
class TakeEdge(Action):
def __init__(self, agent):
"""
Initializes object.
Args:
agent (agents.AbstractAgent): An agent that performs those edges.
"""
super(TakeEdge, self).__init__(agent, "TakeEdge")
self.applied = False
self.applicable = self.agent.environment.number_of_edges > 0
self.edge = None
self.edge_value = 0
def apply(self):
"""
Remove an edge from the topology.
Returns:
applied
"""
if self.applicable:
if self.edge is None:
self.edge = self.agent.choose_edge_to_take()
# choose edge might return None if no edge could be found. Thus
# check again.
if self.edge is None:
# Set to minus one as None has other meanings in later contexts
self.edge = -1
if self.edge != -1:
self.edge_value = self.agent.environment.remove_edge(self.edge, value=self.agent.edge_value)
self.agent.inventory += self.edge_value
self.applied = True
return self.applied
def rollback(self):
if self.applied:
self.applied = False
value = self.agent.environment.add_edge(self.edge, value=self.agent.edge_value)
assert value == self.edge_value, "Value of removed and readded " + \
"edge to not match"
self.agent.inventory -= value
self.edge_value = 0
class AddRequest(Action):
def __init__(self, agent):
"""
Initializes object.
Args:
agent (agents.AbstractAgent): An agent that performs those edges.
"""
super(AddRequest, self).__init__(agent, "AddRequest")
self.request = self.agent.choose_request_to_place()
# If request is already in environment do nothing
self.applicable = self.request.mask not in self.agent.environment.routed_requests.keys()
self.applied = False
self.path = None
def apply(self):
"""
Add a s-t pair to the topology using shortest path routing. Depending
on the value of self.path a preconfigured path is chosen. in case of
success corresponding resources are allocated.
Returns:
applied (bool): True if request has been added zero false.
"""
if self.applicable:
if self.path is None:
self.path = self.agent.environment.add_request(self.request)
else:
self.agent.environment.add_request(self.request, self.path)
if self.path is not None:
self.applied = True
return self.applied
def rollback(self):
"""
Remove request again from the topology and release resources.
Returns:
None
"""
if self.applied:
self.agent.environment.remove_request(self.request)
self.applied = False
class RemoveRequest(Action):
def __init__(self, agent):
"""
Initializes object.
Args:
agent (agents.AbstractAgent): An agent that performs those edges.
"""
super(RemoveRequest, self).__init__(agent, "RemoveRequest")
self.request = self.agent.choose_request_to_remove()
# If request is already in environment do nothing
if self.request is None:
self.applicable = False
else:
self.applicable = self.request.mask in self.agent.environment.routed_requests.keys()
self.applied = False
self.path = None
def apply(self):
"""
Remove request from the topology and free resources.
Returns:
applied (bool): True if successful else false.
"""
if self.applicable:
self.path = self.agent.environment.remove_request(self.request)
self.applied = True
return self.applied
def rollback(self):
"""
Add removed request againt to the topology.
Returns:
None
"""
if self.applied:
self.agent.environment.add_request(self.request, self.path)
self.applied = False
class DoNothing(Action):
def __init__(self, agent):
"""
Initializes object.
Args:
agent (agents.AbstractAgent): An agent that performs those edges.
"""
super(DoNothing, self).__init__(agent, "DoNothing")
def apply(self):
"""
Do nothing always succeeds.
Returns:
True
"""
return True
def rollback(self):
"""
Do nothing again.
Returns:
True
"""
return True
class IncreaseCapacity(Action):
def __init__(self, agent):
"""
Initializes object.
Args:
agent (agents.AbstractAgent): An agent that performs those edges.
"""
super(IncreaseCapacity, self).__init__(agent, "IncreaseCapacity")
self.applicable = self.agent.inventory > 0
self.applied = False
self.edge = None
self.edge_value = 0
def apply(self):
"""
Try to increase capacity on an edge of the agents choice.
Returns:
applied (bool): True if successful else False.
"""
if self.applicable:
if self.edge is None:
self.edge = self.agent.choose_edge_to_change(True, True)
if self.edge is None:
self.edge = -1
if self.edge != -1:
# Since an existing edge is chosen by the agent enviornment will
# increase the capacity.
self.edge_value = self.agent.environment.add_edge(self.edge)
self.applied = True
self.agent.inventory -= self.edge_value
return self.applied
def rollback(self):
"""
Remove previously added capacity back to topology.
Returns:
None
"""
if self.applied:
value = self.agent.environment.remove_edge(self.edge)
assert value == self.edge_value, "Value of added and removed " \
+ "do not match"
self.agent.inventory += 1
self.applied = False
self.edge_value = 0
class DecreaseCapacity(Action):
def __init__(self, agent):
"""
Initializes object.
Args:
agent (agents.AbstractAgent): An agent that performs those edges.
"""
super(DecreaseCapacity, self).__init__(agent, "DecreaseCapacity")
self.applied = False
self.applicable = self.agent.environment.number_of_edges > 0
self.edge = None
self.edge_value = 0
def apply(self):
"""
Decrease capacity on an edge of the agent's choice.
Returns:
applied (bool): True if successful else False.
"""
if self.applicable:
if self.edge is None:
self.edge = self.agent.choose_edge_to_change(True, False)
if self.edge is None:
self.edge = -1
if self.edge != -1:
self.edge_value = self.agent.environment.remove_edge(self.edge)
self.agent.inventory += self.edge_value
self.applied = True
return self.applied
def rollback(self):
"""
Add repviously removed capacity back to topology.
Returns:
None
"""
if self.applied:
self.applied = False
value = self.agent.environment.add_edge(self.edge)
assert value == self.edge_value, "Value of removed and readded " + \
"edge to not match"
self.agent.inventory -= value
self.edge_value = 0