-
Notifications
You must be signed in to change notification settings - Fork 0
/
sim.py
500 lines (394 loc) · 18.8 KB
/
sim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
#!/usr/bin/env python
"""
Simulates one file being shared amongst a set of peers. The file is divided into a set of pieces, each comprised of some number of blocks. There are two types of peers:
- seeds, which start with all the pieces.
- regular peers, which start with no pieces.
The simulation proceeds in rounds. In each round, peers can request pieces from other peers, and then decide how much to upload to others. Once every peer has every piece, the simulation ends.
"""
import re
import random
import sys
import logging
import copy
import itertools
import pprint
from optparse import OptionParser
from messages import Upload, Request, Download, PeerInfo
from util import *
from stats import Stats
from history import History
class Sim:
def __init__(self, config):
self.config = config
self.up_bws_state = dict()
def up_bw(self, peer_id, reinit=False):
"""Return a consistent bw for this peer"""
c = self.config
s = self.up_bws_state
# Re-initialize up-bws if we are starting a new simulation
if reinit and peer_id in s:
del s[peer_id]
"""Sets the upload bandwidth of seeds to max, other agents at random"""
if re.match("Seed",peer_id): the_up_bw = c.max_up_bw
else: the_up_bw = random.randint(c.min_up_bw, c.max_up_bw)
return s.setdefault(peer_id, the_up_bw)
def run_sim_once(self):
"""Return a history"""
conf = self.config
# Keep track of the current round. Needs to be in scope for helpers.
round = 0
def check_pred(pred, msg, Exc, lst):
"""Check if any element of lst matches the predicate. If it does,
raise an exception of type Exc, including the msg and the offending
element."""
m = list(map(pred, lst))
if True in m:
i = m.index(True)
raise Exc(msg + " Bad element: %s" % lst[i])
def check_uploads(peer, uploads):
"""Raise an IllegalUpload exception if there is a problem."""
def check(pred, msg):
check_pred(pred, msg, IllegalUpload, uploads)
not_upload = lambda o: not isinstance(o, Upload)
check(not_upload, "List of Uploads contains non-Upload object.")
self_upload = lambda upload: upload.to_id == peer.id
check(self_upload, "Can't upload to yourself.")
not_from_self = lambda upload: upload.from_id != peer.id
check(not_from_self, "Upload.from != peer id.")
check(lambda u: u.bw < 0, "Upload bandwidth must be non-negative!")
limit = self.up_bw(peer.id)
if sum([u.bw for u in uploads]) > limit:
raise IllegalUpload("Can't upload more than limit of %d. Attempted to upload %s, for uploads: %s" % (
limit, sum([u.bw for u in uploads])), uploads)
# If we got here, looks ok.
def check_requests(peer, requests, peer_pieces, available):
"""Raise an IllegalRequest exception if there is a problem."""
def check(pred, msg):
check_pred(pred, msg, IllegalRequest, requests)
check(lambda o: not isinstance(o, Request),
"List of Requests contains non-Request object.")
bad_piece_id = lambda r: (r.piece_id < 0 or
r.piece_id >= self.config.num_pieces)
check(bad_piece_id, "Request asks for non-existent piece!")
bad_peer_id = lambda r: r.peer_id not in self.peer_ids
check(bad_peer_id, "Request mentions non-existent peer!")
bad_requester_id = lambda r: r.requester_id != peer.id
check(bad_requester_id, "Request has wrong peer id!")
bad_start_block = lambda r: (
r.start < 0 or
r.start >= self.config.blocks_per_piece or
r.start > peer_pieces[peer.id][r.piece_id])
# Must request the _next_ necessary block
check(bad_start_block, "Request has bad start block!")
def piece_peer_does_not_have(r):
other_peer = self.peers_by_id[r.peer_id]
return r.piece_id not in available[other_peer.id]
check(piece_peer_does_not_have, "Asking for piece peer does not have!")
# If we got here, looks ok
def available_pieces(peer_id, peer_pieces):
"""
Return a list of piece ids that this peer has available.
"""
return [i for i in range(conf.num_pieces) if peer_pieces[peer_id][i] == conf.blocks_per_piece]
def peer_done(peer_pieces, peer_id):
# TODO: remove linear pass
for blocks_so_far in peer_pieces[peer_id]:
if blocks_so_far < conf.blocks_per_piece:
return False
return True
def all_done(peer_pieces):
result = True
# Check all peers to update done status
for peer_id in peer_pieces:
if peer_done(peer_pieces, peer_id):
history.peer_is_done(round, peer_id)
else:
result = False
return result
def create_peers():
"""Each agent class must be already loaded, and have a
constructor that takes the config, id, pieces, and
up and down bandwidth, in that order."""
def load(class_name, params):
agent_class = conf.agent_classes[class_name]
return agent_class(*params)
counts = dict()
def index(name):
if name in counts:
a = counts[name]
counts[name] += 1
else:
a = 0
counts[name] = 1
return a
n = len(conf.agent_class_names)
ids = ["%s%d" % (n,index(n)) for n in conf.agent_class_names]
is_seed = lambda id: id.startswith("Seed")
def get_pieces(id):
if id.startswith("Seed"):
return [conf.blocks_per_piece]*conf.num_pieces
else:
return [0]*conf.num_pieces
peer_pieces = dict() # id -> list (blocks / piece)
peer_pieces = dict((id, get_pieces(id)) for id in ids)
pieces = [get_pieces(id) for id in ids]
r = itertools.repeat
# Re-initialize upload bandwidths at the beginning of each
# new simulation
up_bws = [self.up_bw(id, reinit=True) for id in ids]
params = list(zip(r(conf), ids, pieces, up_bws))
peers = list(map(load, conf.agent_class_names, params))
#logging.debug("Peers: \n" + "\n".join(str(p) for p in peers))
return peers, peer_pieces
def get_peer_requests(p, peer_info, peer_history, peer_pieces, available):
def remove_me(info):
# TODO: Do we need this linear pass?
return [peer for peer in peer_info if peer.id != p.id]
pieces = copy.copy(peer_pieces[p.id])
# Made copy of pieces and the peer info this peer needs to make it's
# decision, so that it can't change the simulation's copies.
p.update_pieces(pieces)
rs = p.requests(remove_me(peer_info), peer_history)
check_requests(p, rs, peer_pieces, available)
return rs
def get_peer_uploads(all_requests, p, peer_info, peer_history):
def remove_me(info):
# TODO: remove this pass? Use a set?
return [peer for peer in peer_info if peer.id != p.id]
def requests_to(id):
f = lambda r: r.peer_id == id
ans = []
for rs in list(all_requests.values()):
ans.extend(list(filter(f, rs)))
return ans
requests = requests_to(p.id)
us = p.uploads(requests, remove_me(peer_info), peer_history)
p.update_du(peer_history)
check_uploads(p, us)
return us
def upload_rate(uploads, uploader_id, requester_id):
"""
return the uploading rate from uploader to requester
in blocks per time period, or 0 if not uploading.
"""
for u in uploads[uploader_id]:
if u.to_id == requester_id:
return u.bw
return 0
def update_peer_pieces(peer_pieces, requests, uploads, available):
"""
Process the uploads: figure out how many blocks of all the requested
pieces the requesters ended up with.
Make sure requesting the same thing from lots of peers doesn't
stack.
update the sets of available pieces as needed.
"""
downloads = dict() # peer_id -> [downloads]
new_pp = copy.deepcopy(peer_pieces)
for requester_id in requests:
downloads[requester_id] = list()
for requester_id in requests:
# Keep track of how many blocks of each piece this
# requester got. piece -> (blocks, from_who)
new_blocks_per_piece = dict()
def update_count(piece_id, blocks, peer_id):
if piece_id in new_blocks_per_piece:
old = new_blocks_per_piece[piece_id][0]
if blocks > old:
new_blocks_per_piece[piece_id] = (blocks, peer_id)
else:
new_blocks_per_piece[piece_id] = (blocks, peer_id)
# Group the requests by peer that is being asked
get_peer_id = lambda r: r.peer_id
rs = sorted(requests[requester_id], key=get_peer_id)
for peer_id, rs_for_peer in itertools.groupby(rs, get_peer_id):
bw = upload_rate(uploads, peer_id, requester_id)
if bw == 0:
continue
# This bandwidth gets applied in order to each piece requested
for r in rs_for_peer:
needed_blocks = conf.blocks_per_piece - r.start
alloced_bw = min(bw, needed_blocks)
update_count(r.piece_id, alloced_bw, peer_id)
bw -= alloced_bw
if bw == 0:
break
for piece_id in new_blocks_per_piece:
(blocks, peer_id) = new_blocks_per_piece[piece_id]
new_pp[requester_id][piece_id] += blocks
if new_pp[requester_id][piece_id] == conf.blocks_per_piece:
available[requester_id].add(piece_id)
d = Download(peer_id, requester_id, piece_id, blocks)
downloads[requester_id].append(d)
return (new_pp, downloads)
def completed_pieces(peer_id, available):
return len(available[peer_id])
def log_peer_info(peer_pieces, available):
for p_id in self.peer_ids:
pieces = peer_pieces[p_id]
logging.debug("pieces for %s: %s" % (str(p_id), str(pieces)))
log = ", ".join("%s:%s" % (p_id, completed_pieces(p_id, available))
for p_id in self.peer_ids)
logging.info("Pieces completed: " + log)
logging.debug("Starting simulation with config: %s" % str(conf))
peers, peer_pieces = create_peers()
self.peer_ids = [p.id for p in peers]
self.peers_by_id = dict((p.id, p) for p in peers)
upload_rates = dict((id, self.up_bw(id)) for id in self.peer_ids)
history = History(self.peer_ids, upload_rates)
# dict : pid -> set(finished / available pieces)
available = dict((pid, set(available_pieces(pid, peer_pieces)))
for pid in self.peer_ids)
# Begin the event loop
while True:
logging.info("======= Round %d ========" % round)
peer_info = [PeerInfo(p.id, available[p.id])
for p in peers]
requests = dict() # peer_id -> list of Requests
uploads = dict() # peer_id -> list of Uploads
h = dict()
for p in peers:
h[p.id] = history.peer_history(p.id)
requests[p.id] = get_peer_requests(p, peer_info, h[p.id], peer_pieces,
available)
for p in peers:
uploads[p.id] = get_peer_uploads(requests, p, peer_info, h[p.id])
(peer_pieces, downloads) = update_peer_pieces(
peer_pieces, requests, uploads, available)
history.update(downloads, uploads)
logging.debug(history.pretty_for_round(round))
log_peer_info(peer_pieces, available)
if all_done(peer_pieces):
logging.info("All done!")
break
round += 1
if round > conf.max_round:
logging.info("Out of time. Stopping.")
break
logging.info("Game history:\n%s" % history.pretty())
logging.info("======== STATS ========")
logging.info("Uploaded blocks:\n%s" %
Stats.uploaded_blocks_str(self.peer_ids, history))
logging.info("Completion rounds:\n%s" %
Stats.completion_rounds_str(self.peer_ids, history))
logging.info("All done round: %s" %
Stats.all_done_round(self.peer_ids, history))
return history
def run_sim(self):
histories = [self.run_sim_once() for i in range(self.config.iters)]
logging.warning("======== SUMMARY STATS ========")
uploaded_blocks = [Stats.uploaded_blocks(self.peer_ids, h) for h in histories]
completion_rounds = [Stats.completion_rounds(self.peer_ids, h) for h in histories]
def extract_by_peer_id(lst, peer_id):
"""Given a list of dicts, pull out the entry
for peer_id from each dict. Return a list"""
return [d[peer_id] for d in lst]
uploaded_by_id = dict(
(p_id, extract_by_peer_id(uploaded_blocks, p_id))
for p_id in self.peer_ids)
completion_by_id = dict(
(p_id, extract_by_peer_id(completion_rounds, p_id))
for p_id in self.peer_ids)
logging.warning("Uploaded blocks: avg (stddev)")
for p_id in sorted(self.peer_ids,
key=lambda id: mean(uploaded_by_id[id])):
us = uploaded_by_id[p_id]
logging.warning("%s: %.1f (%.1f)" % (p_id, mean(us), stddev(us)))
logging.warning("Completion rounds: avg (stddev)")
def optionize(f):
def g(lst):
if None in lst:
return None
else:
return f(lst)
return g
opt_mean = optionize(mean)
opt_stddev = optionize(stddev)
for p_id in sorted(self.peer_ids,
key=lambda id: opt_mean(completion_by_id[id]) or 0):
cs = completion_by_id[p_id]
logging.warning("%s: %s (%s)" % (p_id, opt_mean(cs), opt_stddev(cs)))
def configure_logging(loglevel):
numeric_level = getattr(logging, loglevel.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid log level: %s' % loglevel)
root_logger = logging.getLogger('')
strm_out = logging.StreamHandler(sys.__stdout__)
# strm_out.setFormatter(logging.Formatter('%(levelno)s: %(message)s'))
strm_out.setFormatter(logging.Formatter('%(message)s'))
root_logger.setLevel(numeric_level)
root_logger.addHandler(strm_out)
def parse_agents(args):
"""
Each element is a class name like "Peer", with an optional
count appended after a comma. So either "Peer", or "Peer,3".
Returns an array with a list of class names, each repeated the
specified number of times.
"""
ans = []
for c in args:
s = c.split(',')
if len(s) == 1:
ans.extend(s)
elif len(s) == 2:
name, count = s
ans.extend([name]*int(count))
else:
raise ValueError("Bad argument: %s\n" % c)
return ans
def main(args):
usage_msg = "Usage: %prog [options] PeerClass1[,count] PeerClass2[,count] ..."
parser = OptionParser(usage=usage_msg)
def usage(msg):
print(("Error: %s\n" % msg))
parser.print_help()
sys.exit()
parser.add_option("--loglevel",
dest="loglevel", default="info",
help="Set the logging level: 'debug' or 'info'")
parser.add_option("--num-pieces",
dest="num_pieces", default=3, type="int",
help="Set number of pieces in the file")
parser.add_option("--blocks-per-piece",
dest="blocks_per_piece", default=4, type="int",
help="Set number of blocks per piece")
parser.add_option("--max-round",
dest="max_round", default=5, type="int",
help="Limit on number of rounds")
parser.add_option("--min-bw",
dest="min_up_bw", default=4, type="int",
help="Min upload bandwidth")
parser.add_option("--max-bw",
dest="max_up_bw", default=10, type="int",
help="Max upload bandwidth")
parser.add_option("--iters",
dest="iters", default=1, type="int",
help="Number of times to run simulation to get stats")
(options, args) = parser.parse_args()
# leftover args are class names, with optional counts:
# "Peer Seed[,4]"
if len(args) == 0:
# default
agents_to_run = ['Dummy', 'Dummy', 'Seed']
else:
try:
agents_to_run = parse_agents(args)
except ValueError as e:
usage(e)
configure_logging(options.loglevel)
config = Params()
config.add("agent_class_names", agents_to_run)
config.add("agent_classes", load_modules(config.agent_class_names))
config.add("num_pieces", options.num_pieces)
config.add("blocks_per_piece",options.blocks_per_piece)
config.add("max_round", options.max_round)
config.add("min_up_bw", options.min_up_bw)
config.add("max_up_bw", options.max_up_bw)
config.add("iters", options.iters)
sim = Sim(config)
sim.run_sim()
if __name__ == "__main__":
# The next two lines are for profiling...
import cProfile
cProfile.run('main(sys.argv)', 'out.prof')
# main(sys.argv)