Skip to content

Commit

Permalink
Merge pull request #5 from kerwenwwer/dev
Browse files Browse the repository at this point in the history
Update README.md
  • Loading branch information
kerwenwwer authored Mar 26, 2024
2 parents 3481e9a + 05f34dc commit 6515d13
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 29 deletions.
85 changes: 70 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# eGossip

## Build
## About

The Extended Berkeley Packet Filter (eBPF) presents a transformative approach to dynamic micro-program loading and execution directly within the Linux kernel, circumventing the need for kernel recompilation.
Our primary focus is the deployment of TC eBPF hooks for in-kernel packet cloning, aiming to enhance the efficiency of user-space broadcasts. This approach is particularly pivotal in consensus algorithms like Gossip, which heavily rely on user-space broadcast mechanisms. We explore the implementation of this method, highlighting its ability to significantly boost broadcast speeds.

<div align=center> <img src="img/3.png" width="400" class="center"></div>


## Build & Test
Using make command it will build and run automatically.

Build binary file
Expand Down Expand Up @@ -43,24 +51,45 @@ Use ``-b`` flag to generate random metadata update event on a node.
python3 script/test.py -b
```

## Implement function

### eBPF Feature

#### In-kernel broadcastor
Using ebpf TC hook to implement a clone redirect with a resruion structure, allowing gossip to quickly replicate multiple copies by only sending a single packet to the Linux protocol stack.


> Attention !! In Linux network implementation, to avoid netlink or TC causing packet recursion too many times, which could lead to stack overflow, the ``XMIT_RESUION_LIMIT`` is set to 8. If Gossip needs to broadcast to more than 8 nodes, consider modifying the kernel source code.
<img src="img/3.png" width="400" class="center">
Switch to different protocol modes by changing ``PROTO`` in ``k8s/deployment.yaml``. We have three modes:

* **UDP mode:** Baseline.

```yaml
- name: PROTO
value: "UDP"
resources:
limits:
cpu: "1"
memory: "512Mi"
```
* **TC mode:** Only enable in-kernel boradcastor function.
```yaml
- name: PROTO
value: "TC"
resources:
limits:
cpu: "1"
memory: "512Mi"
```
* **XDP mode:** Enable both in-kernel boradcastor and ``AF_XDP`` kernel bypass.
```yaml
- name: PROTO
value: "XDP"
resources:
limits:
cpu: "1"
memory: "512Mi"
```
## Implement function
### Gossip Protocol
Basic Gossip API is from [PekoNode](https://github.com/dpwgc/pekonode/tree/master)

##### Cluster node list sharing
* Synchronize the list of cluster nodes through rumor propagation `NodeList` (Each node will eventually store a complete list of nodes that can be used in service registration discovery scenarios)
##### Cluster metadata information sharing
Expand All @@ -80,11 +109,37 @@ Basic Gossip API is from [PekoNode](https://github.com/dpwgc/pekonode/tree/maste
* Repeat the previous broadcast step (rumor propagation method) until all nodes are infected, and this heartbeat infection ends.
* If there is a node in the local node list NodeList that has not sent the heartbeat update after timeout, delete the data of the timeout node.

![](img/1.png)

<div align=center> <img src="img/1.png" width="600" class="center"></div>

##### `Metadata` Metadata information synchronization
* After a node calls the Publish() function to publish new metadata, the new data will spread to each node and then overwrite their local metadata information.
* Each node will periodically select a random node for metadata exchange check operation. If the metadata on a node is found to be old, it will be overwritten (anti-entropy propagation method).
* When a new node joins the cluster, the node will obtain the latest cluster metadata information through the data exchange function.

![](img/2.png)

<div align=center> <img src="img/2.png" width="450" class="center"></div>


##### Sync. packet type

|Type number | Usage|
|---| ---|
|1|Heartbeat packet (Broadcast)|
|2| Metadata switch request |
|3|Metadata switch response |



### eBPF Feature

#### In-kernel broadcastor
Using ebpf TC hook to implement a clone redirect with a resruion structure, allowing gossip to quickly replicate multiple copies by only sending a single packet to the Linux protocol stack.


> Attention !! In Linux network implementation, to avoid netlink or TC causing packet recursion too many times, which could lead to stack overflow, the ``XMIT_RESUION_LIMIT`` is set to 8. If Gossip needs to broadcast to more than 8 nodes, consider modifying the kernel source code.


#### AF_XDP Kernel bypass

Our programming framework is intricately designed to meticulously analyze the type of incoming packets. Specifically, it is engineered to filter and redirect only those packets classified as type 1 and 2 to the xsk_map, while ensuring that TCP packets are seamlessly guided along the established socket pathway to the controller. This selective redirection approach is pivotal, as it leverages the AF_XDP Socket's high-performance characteristics for certain types of traffic, while maintaining the traditional processing route for TCP packets. Such a differentiated handling mechanism highlights our system's capability to optimize network traffic processing by integrating advanced packet filtering and redirection techniques, thereby enhancing both the efficiency and reliability of packet receiving and processing within complex networking environments.
43 changes: 29 additions & 14 deletions script/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,19 @@ def run_shell_script():
# Function to generate metadata updates
def metadata_update(node_list):
while True:
for _ in range(1500000000):
for _ in range(1500000):
node = random.choice(node_list) # Choose a node for update
metadata = random_metadata_string()
send_metadata_update(node, metadata)
time.sleep(1/1500000000 * 60) # Spread out the 150 updates over one minute
time.sleep(1/1500000 * 60) # Spread out the 150 updates over one minute

def metadata_update_fix(node_list):
update_count = 25000
for _ in range(update_count):
node = random.choice(node_list)
metadata = random_metadata_string()
send_metadata_update(node, metadata)
#time.sleep(0.001)

def test_configurations():
# Retrieve the IP addresses
Expand Down Expand Up @@ -139,6 +147,7 @@ def main():
parser.add_argument('-t', '--test', action='store_true', help='Test parsing config', required=False)
parser.add_argument('-c', '--configure', action='store_true', help='Configure the servers', required=False)
parser.add_argument('-b', '--bench', action='store_true', help='Benchmakr the cluster', required=False)
parser.add_argument('-bf', '--bench-fix', action='store_true', help='Benchmakr the cluster', required=False)
parser.add_argument('-gl', '--get-list', action='store_true', help='Get all output list of gossip node', required=False)

args = parser.parse_args()
Expand All @@ -151,23 +160,29 @@ def main():
elif args.bench:
ip_list = get_ip_addresses()
# print(ip_list[0])
process1 = multiprocessing.Process(target=metadata_update, args=(["http://"+ip_list[0]+":8000"],))
#process2 = multiprocessing.Process(target=run_shell_script)

#process2 = multiprocessing.Process(target=metadata_update, args=(NODES_2,))
#process3 = multiprocessing.Process(target=metadata_update, args=(NODES_3,))

# process1 = multiprocessing.Process(target=metadata_update, args=(["http://"+ip_list[0]+":8000"],))

metadata_update(["http://"+ip_list[0]+":8000"])
# Start the processes
process1.start()
#process2.start()
#process3.start()
# process1.start()

# Wait for both processes to complete
process1.join()
#process2.join()
#process3.join()
#process1.join()
elif args.get_list:
count_list_length()
elif args.bench_fix:
ip_list = get_ip_addresses()
# metadata_update_fix(["http://"+ip_list[0]+":8000"])
process1 = multiprocessing.Process(target=metadata_update_fix, args=(["http://"+ip_list[0]+":8000"],))
process2 = multiprocessing.Process(target=metadata_update_fix, args=(["http://"+ip_list[1]+":8000"],))
process3 = multiprocessing.Process(target=metadata_update_fix, args=(["http://"+ip_list[2]+":8000"],))

process1.start()
process2.start()
process3.start()
process1.join()
process2.join()
process3.join()
else:
print("No arguments given")

Expand Down

0 comments on commit 6515d13

Please sign in to comment.