Skip to content

Commit

Permalink
added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aradmaleki02 committed Feb 12, 2024
1 parent ea3af4d commit 7fa8d4b
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 0 deletions.
25 changes: 25 additions & 0 deletions test/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
### dummy client that passes all tests

from collections import namedtuple
from collections import deque

Message = namedtuple("Message", "key val")
queue: deque = deque()
subscribers: deque = deque()


def push(key: str, val: bytes):
queue.append(Message(key, val))
notify_subscribers()

def pull() -> Message:
return queue.popleft()

def subscribe(action):
subscribers.append(action)

def notify_subscribers():
action = subscribers.popleft()
msg = pull()
action(msg.key, msg.val)
subscribers.append(action)
49 changes: 49 additions & 0 deletions test/fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
### this test checks fault tolerancy of the system.
### for each component in the system do the following:
### 1. run the test
### 2. when the pop up popes, bring down an instance of the said component.
### 3. wait for cluster to become healty again
### 4. press enter

### note that API-Gateway, External Database, ... all are components of the system and are prune to downtime
import random
from typing import List
from threading import Lock

from client import pull, push, subscribe

TEST_SIZE = 1000 * 1000
KEY_SIZE = 8
SUBSCRIER_COUNT = 4

key_seq = [random.choice(range(KEY_SIZE)) for _ in range(TEST_SIZE)]

pulled: List[int] = []
lock = Lock()
def store(_: str, val: bytes):
next_val = int(val.decode("utf-8"))
with lock:
pulled.append(next_val)


for _ in range(SUBSCRIER_COUNT):
subscribe(store)


for i in range(TEST_SIZE//2):
push(f"{key_seq[i]}", f"{i}".encode(encoding="utf-8"))

print("manually fail one node and wait for cluster to become healthy again")
print("press enter when cluster is healthy")
input()

for i in range(TEST_SIZE//2,TEST_SIZE):
push(f"{key_seq[i]}", f"{i}".encode(encoding="utf-8"))

pulled.sort()
for i in range(TEST_SIZE):
if pulled[i]!=i:
print("DATA loss occurred")


print("Fault tolerance test passed successfully!")
37 changes: 37 additions & 0 deletions test/order.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
### this test checks if the order garantee holds, i.e. if (k1,v1) is pushed before (k1,v2)
### then it is read before (k1,v2)

import random
import sys
from typing import Dict, List

from client import pull, push, subscribe

TEST_SIZE = 1000 * 1000
KEY_SIZE = 8
SUBSCRIER_COUNT = 4

key_seq = [random.choice(range(KEY_SIZE)) for _ in range(TEST_SIZE)]

pulled: Dict[str, List[int]] = {}
for i in range(KEY_SIZE):
pulled[f"{i}"] = []

def validate_pull(key: str, val: bytes):
next_val = int(val.decode("utf-8"))
if len(pulled[key]) != 0:
prev_val = pulled[key][-1]
if prev_val >= next_val:
print(f"order violation, seq: [{prev_val}, {next_val}]\tkey: [{key}]")
sys.exit(255)
pulled[key].append(next_val)


for _ in range(SUBSCRIER_COUNT):
subscribe(validate_pull)


for i in range(TEST_SIZE):
push(f"{key_seq[i]}", f"{i}".encode(encoding="utf-8"))

print("order test passed successfully!")
35 changes: 35 additions & 0 deletions test/scale.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
### this test checks if the cluster performance scales with the size of the cluster
### run the test and press enter untill you see a thourghput cap in the monitoring of the cluster
### manually scale up cluster
### press enter once more and see if you have a increase in throughput rate of the cluster

import multiprocessing
from client import pull, push, subscribe

TEST_SIZE = 1000 * 1000
KEY_SIZE = 8
SUBSCRIER_COUNT = 4


def to_infinity():
index = 0
while True:
yield index
index += 1


def push_key(key: str):
for i in to_infinity():
push(key, f"{i}".encode("utf-8"))
print(key, f"{i}".encode("utf-8"))


subscribe(lambda key, val: ...)

for i in to_infinity():
p = multiprocessing.Process(target=push_key, args=(i,))
p.start()
print("did it cap?")
print("if not, press enter to increase throughput")
print("if capped, manually scale up the cluster and press enter to see if you can increase the throughput")
input()

0 comments on commit 7fa8d4b

Please sign in to comment.