-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadversary.py
514 lines (461 loc) · 20.9 KB
/
adversary.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
import socket
import threading
import time
import hashlib
import random
import numpy
from queue import Queue
no_of_threads = 4 # We have 4 threads one each for listen, liveness testing, mining and flooding
job_no = [1, 2, 3, 4] # We will create 4 jobs in queue for running each thread
queue = Queue() # Queue to store our jobs
MY_IP = "0.0.0.0" # MY_IP will later contain IP Address for listening
PORT = int(input("PORT No. = ")) # Port for listening
seeds_addr = set() # To store seed address received from config.txt
connect_seed_addr = [] # To store seed address to which peer is connected
peer_set_from_seed = set() # Set used to store different peers address received from seed
peers_connected = [] # To store list of peer objects connected
MessageList = [] # To store hash of Block to prevent flooding
DataBase = [] # To store Blockchain blocks in local database
PendingQueue = Queue() # To store received block which need to be validated
# Class of Peer objects
class Peer:
i = 0
address = ""
def __init__(self, addr):
self.address = addr
# Data Base Entry Class
class DB_Entry():
def __init__(self, block, block_hash, height):
self.block = block
self.block_hash = block_hash
self.height = height
# Block Class
class Block:
def __init__(self, prev_block_hash, merkel_root, time_stamp):
self.prev_block_hash = prev_block_hash
self.merkel_root = merkel_root
self.time_stamp = time_stamp
#self.transaction = []
def hash(self):
block = str(self.prev_block_hash) + ":" + str(self.merkel_root) + ":" + str(self.time_stamp)
return hash_of_message(block)
def __str__(self):
return str(self.prev_block_hash) + ":" + str(self.merkel_root) + ":" + str(self.time_stamp)
# Blockchain Class
class Blockchain():
def __init__(self):
self.genesis_block = Block("0x0000", "0x14ac", timestamp())
self.genesis_hash = "0x9e1c"
DataBase.append(DB_Entry(self.genesis_block, self.genesis_hash, 0))
# Write the outputs to the file
def write_output_to_file(output):
try:
file = open("outputadversary.txt", "a")
file.write(output + "\n")
except:
print("Write Failed")
finally:
file.close()
# To generate merkel root hash of 16 bits
def merkel():
return "0x14ac"
# To find self timestamp
def timestamp():
return time.time()
# To calculate hash of Block
def hash_of_message(message):
result = hashlib.sha3_256(message.encode('utf-8')).hexdigest()
return "0x" + result[-4:]
# Read address of seeds from config file
def read_addr_of_seeds():
global seeds_address_list
try:
file = open("config.txt","r")
seeds_address_list = file.read()
except:
print("Read from config failed")
finally:
file.close()
# To calculate n i.e. total no. of seeds and also to find set of all available seeds
def total_available_seeds():
global seeds_address_list
temp = seeds_address_list.split("\n")
for addr in temp:
if addr:
addr = addr.split(":")
addr = "0.0.0.0:" + str(addr[1])
seeds_addr.add(addr)
return len(seeds_addr)
# Generate k random numbers in a given range.
def generate_k_random_numbers_in_range(lower, higher, k):
random_numbers_set = set()
while len(random_numbers_set) < k:
random_numbers_set.add(random.randint(lower,higher))
return random_numbers_set
# This function is used to register the peer to (floor(n / 2) + 1) random seeds
def register_with_k_seeds(): # where k = floor(n / 2) + 1
global seeds_addr
seeds_addr = list(seeds_addr)
seed_nodes_index = generate_k_random_numbers_in_range(0, n - 1, n // 2 + 1)
seed_nodes_index = list(seed_nodes_index)
for i in seed_nodes_index:
connect_seed_addr.append(seeds_addr[i])
connect_seeds()
# This function takes complete list of peers and find a random no. of peers to connect to b/w 1 and 4 and then connect to them and receive k-th block
def join_atmost_4_peers(complete_peer_list):
i = 0
limit = random.randint(no_of_peer_to_be_flooded, 4)
for peer in complete_peer_list:
try:
if i < limit:
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
peer_addr = peer.split(":")
ADDRESS = (str(peer_addr[0]), int(peer_addr[1]))
sock.connect(ADDRESS)
peers_connected.append( Peer(complete_peer_list[i]) )
i += 1
message = "New Connect Request From:"+str(MY_IP)+":"+str(PORT)
sock.send(message.encode('utf-8'))
print(sock.recv(20).decode('utf-8')) # Connect Accepted Message Received here
received_block_k = sock.recv(128).decode('utf-8') # Received Block k
print("Received Block_K", received_block_k)
thread = threading.Thread(target = put_block_in_pending_queue, args = [received_block_k])
thread.start()
sock.close()
else:
break
except:
print("Peer Connection Error. Trying another peer")
# It take complete peer list separated by comma from each seed and union them all
def union_peer_lists(complete_peer_list):
global MY_IP
complete_peer_list = complete_peer_list.split(",")
complete_peer_list.pop()
temp = complete_peer_list.pop()
temp = temp.split(":")
MY_IP = temp[0]
for i in complete_peer_list:
if i:
peer_set_from_seed.add(i)
complete_peer_list = list(peer_set_from_seed)
return complete_peer_list
# This function is used to connect to seed and send our IP address and port info to seed and then receives a list of peers connected to that seed separated by comma
# After finding union it calls join_atmost_4_peers to connect to atmost peers
def connect_seeds():
for i in range(0, len(connect_seed_addr)):
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
seed_addr = connect_seed_addr[i].split(":")
ADDRESS = (str(seed_addr[0]), int(seed_addr[1]))
sock.connect(ADDRESS)
MY_ADDRESS = str(MY_IP)+":"+str(PORT)
sock.send(MY_ADDRESS.encode('utf-8'))
message = sock.recv(10240).decode('utf-8')
complete_peer_list = union_peer_lists(message)
sock.close()
except:
print("Seed Connection Error")
for peer in complete_peer_list:
print(peer)
write_output_to_file(peer)
join_atmost_4_peers(complete_peer_list)
# Create socket to connect 2 computers
def create_socket():
global sock
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# To Bind Socket
def bind_socket():
global sock
ADDRESS = (MY_IP, PORT)
sock.bind(ADDRESS)
# To handle different connected peers in different thread.It recieves messages from peer.
# According to the type of message received take appropriate actions
def handle_peer(conn, addr):
global block_k
global block_k_index
while True:
try:
message = conn.recv(128).decode('utf-8')
received_data = message
if message:
message = message.split(":")
if "New Connect Request From" in message[0]: # If it is new connection request then check if already 4 peers are not connected then accept the connection
if(len(peers_connected) < 4):
conn.send("New Connect Accepted".encode('utf-8'))
peers_connected.append( Peer(str(addr[0])+":"+str(message[2])) )
block_k = DataBase[-1].block
block_k_index = len(DataBase) - 1
conn.send(str(block_k).encode('utf-8')) # Send Block k
elif "Liveness Request" in message[0]: # If its liveness request then give back liveness reply
liveness_reply = "Liveness Reply:" + str(message[1]) + ":" + str(message[2]) + ":" + str(MY_IP)
conn.send(liveness_reply.encode('utf-8'))
elif "Send Previous Blocks" in received_data: # If we get this message we sent all block from Block 0 to Block (k-1) to the requesting peer
conn.send(str(block_k_index).encode('utf-8'))
for i in range(block_k_index):
time.sleep(0.1)
conn.send(str(DataBase[i].block).encode('utf-8'))
else:
thread = threading.Thread(target = put_block_in_pending_queue, args = [received_data]) # If its new block then put it in Pending Queue if its not in ML list
thread.start()
except:
break
conn.close()
# To listen at a particular port and create thread for each peer
def begin():
sock.listen(100)
print("Peer is Listening")
while True:
conn, addr = sock.accept()
sock.setblocking(5)
thread = threading.Thread(target = handle_peer, args = (conn,addr))
thread.start()
# This function takes address of peer which is down. Generate dead node message and send it to all connected seeds
def report_dead(peer):
dead_message = "Dead Node:" + peer + ":" + str(timestamp()) + ":" + str(MY_IP)
print(dead_message)
write_output_to_file(dead_message)
for seed in connect_seed_addr:
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
seed_address = seed.split(":")
ADDRESS = (str(seed_address[0]), int(seed_address[1]))
sock.connect(ADDRESS)
sock.send(dead_message.encode('utf-8'))
sock.close()
except:
print("Seed Down ", seed)
# This function generates liveness request and send it to all connected peers at interval of 13 sec
# If three consecutive replies are not received then call report_dead()
def liveness_testing():
while True:
liveness_request = "Liveness Request:" + str(timestamp()) + ":" + str(MY_IP)
#print(liveness_request)
for peer in peers_connected:
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
peer_addr = peer.address.split(":")
ADDRESS = (str(peer_addr[0]), int(peer_addr[1]))
sock.connect(ADDRESS)
sock.send(liveness_request.encode('utf-8'))
#print(sock.recv(1024).decode('utf-8'))
sock.close()
peer.i = 0 # If it is able to send liveness req and get reply then start from 0 again so that we check for 3 consecutive failure
except: # This happens when connection fails so count for 3 consecutive failures for given peer
peer.i = peer.i + 1
if(peer.i == 3): # If three failures then report this peer as dead node and remove from connected peer list
report_dead(peer.address)
peers_connected.remove(peer)
time.sleep(13)
# Add valid blocks to local database
def add_block_to_database(block, block_hash, height):
DataBase.append(DB_Entry(block, block_hash, height))
start = 1 # Mine when start = 1
# Put received block in Pending Queue if its hash is not in Message List to avoid flooding/looping
def put_block_in_pending_queue(received_block):
global start
hash = hash_of_message(received_block)
if hash in MessageList: # If hash of received block is already in Message List then dont store in Pending Queue
pass
else:
MessageList.append(str(hash)) # If it is received for 1st time then append it to its ML list
PendingQueue.put(received_block)
print("Block Received = ", received_block)
start = 0 # Stop Mining
# To validate received block and add it to database or Blockchain if valid
def validate(received_block):
message = received_block.split(":")
if str(message[0]) == "0x0000" and str(message[2]) >= str(timestamp() - 3600) and str(message[2]) <= str(timestamp() + 3600):
DataBase[0] = DB_Entry(received_block, '0x9e1c', 0)
write_output_to_file(received_block)
return True
for db_entry in DataBase:
if str(message[0]) == str(db_entry.block_hash) and str(message[2]) >= str(timestamp() - 3600) and str(message[2]) <= str(timestamp() + 3600):
add_block_to_database(received_block, hash_of_message(received_block), db_entry.height + 1)
write_output_to_file(received_block)
return True
return False
# To synchronise with current blockchain height
def sync_with_current_blockchain_height():
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
peer_addr = peers_connected[0].address.split(":")
ADDRESS = (str(peer_addr[0]), int(peer_addr[1]))
sock.connect(ADDRESS)
message = "Send Previous Blocks"
sock.send(message.encode('utf-8'))
no_of_prev_block = int(sock.recv(5).decode('utf-8'))
for i in range(no_of_prev_block):
received_block = sock.recv(40).decode('utf-8')
validate(received_block)
sock.close()
mined_by_adversary = []
# Generate block and send it to connected peers
def generate_and_send_block():
longest_chain_length = DataBase[-1].height
parent = DataBase[-1]
for db_entry in DataBase:
if db_entry.height == longest_chain_length:
parent = db_entry
break
block = Block(parent.block_hash, merkel(), timestamp())
print("--------------------------------")
print("Block no = ", parent.height + 1)
print("My block = ", block)
print("My Block Hash = ", hash_of_message(str(block)))
print("--------------------------------")
add_block_to_database(block, block.hash(), parent.height + 1)
mined_by_adversary.append(block)
MessageList.append(block.hash()) # Add hash of mined block to ML list
for peer in peers_connected:
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
peer_addr = peer.address.split(":")
ADDRESS = (str(peer_addr[0]), int(peer_addr[1]))
sock.connect(ADDRESS)
sock.send(str(block).encode('utf-8'))
sock.close()
except:
continue
nodeHashpower = float(input("Node Hash Power = "))
no_of_peer_to_be_flooded = int(input("No. of nodes to flood = "))
# To generate exponential waiting time
def generateWaitingTime(interarrivaltime):
globalLambda = 1.0 / interarrivaltime # Overall block generation rate
lamda = nodeHashpower * globalLambda / 100.0 # Scale individual miners lambda parameter based on the percentage of hash power it has
waitingTime = numpy.random.exponential(scale = 1.0 / lamda, size=None) # Appropriately scale the the exponential waiting time
return waitingTime
# To validate blocks in pending queue and send it to connected peers if valid
def handle_pending_queue():
while PendingQueue.empty() == False:
received_block = PendingQueue.get()
if validate(received_block):
for peer in peers_connected: # Forward gossip message to all connected peers
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
peer_addr = peer.address.split(":")
ADDRESS = (str(peer_addr[0]), int(peer_addr[1]))
sock.connect(ADDRESS)
sock.send(str(received_block).encode('utf-8'))
sock.close()
except:
continue
# Calculate and print MPU, Fraction of blocks in main chain by adversary and other values
def calculate_MPU(start_time, end_time):
mined_by_adversary_in_main = 0
MPU = (DataBase[-1].height + 1) / len(DataBase)
pointer = DataBase[-1]
if pointer.block in mined_by_adversary:
mined_by_adversary_in_main += 1
for i in range(2, len(DataBase) + 1, 1):
db_entry = DataBase[-i]
block = pointer.block
message = block.split(":")
prev_hash = message[0]
if prev_hash == db_entry.block_hash:
pointer = db_entry
if db_entry.block in mined_by_adversary:
mined_by_adversary_in_main += 1
Fraction_by_Adversary = mined_by_adversary_in_main / (DataBase[-1].height + 1)
write_output_to_file("-----------------------------------")
write_output_to_file("Start Time = " + str(start_time))
write_output_to_file("End Time = " + str(end_time))
write_output_to_file("No. of Block in longest chain = " + str(DataBase[-1].height))
write_output_to_file("No. of blocks in blockchain = " + str(len(DataBase)))
write_output_to_file("No. of block mined by adversary = " + str(mined_by_adversary))
write_output_to_file("MPU = " + str(MPU))
write_output_to_file("Fraction by adversary = " + str(Fraction_by_Adversary))
write_output_to_file("----------------------------")
print("-----------------------------------")
print("Start Time = " , str(start_time))
print("End Time = " , str(end_time))
print("No. of Block in longest chain = " , str(DataBase[-1].height))
print("No. of blocks in blockchain = " , str(len(DataBase)))
print("No. of block mined by adversary = " , str(mined_by_adversary))
print("MPU = " , str(MPU))
print("Fraction by adversary = " , str(Fraction_by_Adversary))
print("-----------------------------------")
# Block Mining
def mining():
global start
start_time = time.time() # Start time of peer
end_time = start_time + 600 # We want to calculate MPU for 10 min
Flag = True
if len(peers_connected) > 0:
sync_with_current_blockchain_height()
else:
MessageList.append(hash_of_message(str(DataBase[-1].block)))
while(True):
current_longest_chain_length = DataBase[-1].height
thread = threading.Thread(target = handle_pending_queue)
thread.start()
if DataBase[-1].height >= current_longest_chain_length and PendingQueue.empty():
start = 1
if start == 1:
t = timestamp() # Time at which it start mining
tow = generateWaitingTime(2)
print("Next Block will be created after ", tow)
while(timestamp() <= t + tow):
if start == 0:
break
if start == 1:
generate_and_send_block()
if Flag and timestamp() >= end_time:
print(calculate_MPU(start_time, end_time))
Flag = False
# To flood other peers
def flooding():
peers_to_flood = peers_connected[:no_of_peer_to_be_flooded]
for peer in peers_to_flood:
print("Peer to flood = ", peer.address)
while True:
tow = 2
print("Next Flood Block will be created after ", tow)
time.sleep(tow)
block = Block("0xffff", merkel(), timestamp() - 7500) # Invalid block
print("--------------------------")
print("My Invalid block = ", block)
print("--------------------------")
for peer in peers_to_flood:
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
peer_addr = peer.address.split(":")
ADDRESS = (str(peer_addr[0]), int(peer_addr[1]))
sock.connect(ADDRESS)
sock.send(str(block).encode('utf-8'))
sock.close()
except:
continue
# Create Worker Threads
def create_workers():
for _ in range(no_of_threads):
thread = threading.Thread(target = work)
thread.daemon = True
thread.start()
# Do next job that is in the queue (listen, liveness testing, mining)
def work():
while True:
x = queue.get()
if x == 1:
create_socket()
bind_socket()
begin()
elif x == 2:
liveness_testing()
elif x == 3:
mining()
elif x == 4:
flooding()
queue.task_done()
# Create jobs in queue
def create_jobs():
for i in job_no:
queue.put(i)
queue.join()
read_addr_of_seeds()
n = total_available_seeds()
register_with_k_seeds() # Where k = floor(n / 2) + 1
print("Peers Connected List : ")
for peer in peers_connected:
print(peer.address)
blockchain = Blockchain()
create_workers()
create_jobs()