Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update README.md #5

Merged
merged 4 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 70 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# eGossip

## Build
## About

The Extended Berkeley Packet Filter (eBPF) presents a transformative approach to dynamic micro-program loading and execution directly within the Linux kernel, circumventing the need for kernel recompilation.
Our primary focus is the deployment of TC eBPF hooks for in-kernel packet cloning, aiming to enhance the efficiency of user-space broadcasts. This approach is particularly pivotal in consensus algorithms like Gossip, which heavily rely on user-space broadcast mechanisms. We explore the implementation of this method, highlighting its ability to significantly boost broadcast speeds.

<div align=center> <img src="img/3.png" width="400" class="center"></div>


## Build & Test
Using make command it will build and run automatically.

Build binary file
Expand Down Expand Up @@ -43,24 +51,45 @@ Use ``-b`` flag to generate random metadata update event on a node.
python3 script/test.py -b
```

## Implement function

### eBPF Feature

#### In-kernel broadcastor
Using ebpf TC hook to implement a clone redirect with a resruion structure, allowing gossip to quickly replicate multiple copies by only sending a single packet to the Linux protocol stack.


> Attention !! In Linux network implementation, to avoid netlink or TC causing packet recursion too many times, which could lead to stack overflow, the ``XMIT_RESUION_LIMIT`` is set to 8. If Gossip needs to broadcast to more than 8 nodes, consider modifying the kernel source code.

<img src="img/3.png" width="400" class="center">
Switch to different protocol modes by changing ``PROTO`` in ``k8s/deployment.yaml``. We have three modes:

* **UDP mode:** Baseline.

```yaml
- name: PROTO
value: "UDP"
resources:
limits:
cpu: "1"
memory: "512Mi"
```
* **TC mode:** Only enable in-kernel boradcastor function.

```yaml
- name: PROTO
value: "TC"
resources:
limits:
cpu: "1"
memory: "512Mi"
```
* **XDP mode:** Enable both in-kernel boradcastor and ``AF_XDP`` kernel bypass.

```yaml
- name: PROTO
value: "XDP"
resources:
limits:
cpu: "1"
memory: "512Mi"
```

## Implement function

### Gossip Protocol

Basic Gossip API is from [PekoNode](https://github.com/dpwgc/pekonode/tree/master)


##### Cluster node list sharing
* Synchronize the list of cluster nodes through rumor propagation `NodeList` (Each node will eventually store a complete list of nodes that can be used in service registration discovery scenarios)
##### Cluster metadata information sharing
Expand All @@ -80,11 +109,37 @@ Basic Gossip API is from [PekoNode](https://github.com/dpwgc/pekonode/tree/maste
* Repeat the previous broadcast step (rumor propagation method) until all nodes are infected, and this heartbeat infection ends.
* If there is a node in the local node list NodeList that has not sent the heartbeat update after timeout, delete the data of the timeout node.

![](img/1.png)

<div align=center> <img src="img/1.png" width="600" class="center"></div>

##### `Metadata` Metadata information synchronization
* After a node calls the Publish() function to publish new metadata, the new data will spread to each node and then overwrite their local metadata information.
* Each node will periodically select a random node for metadata exchange check operation. If the metadata on a node is found to be old, it will be overwritten (anti-entropy propagation method).
* When a new node joins the cluster, the node will obtain the latest cluster metadata information through the data exchange function.

![](img/2.png)

<div align=center> <img src="img/2.png" width="450" class="center"></div>


##### Sync. packet type

|Type number | Usage|
|---| ---|
|1|Heartbeat packet (Broadcast)|
|2| Metadata switch request |
|3|Metadata switch response |



### eBPF Feature

#### In-kernel broadcastor
Using ebpf TC hook to implement a clone redirect with a resruion structure, allowing gossip to quickly replicate multiple copies by only sending a single packet to the Linux protocol stack.


> Attention !! In Linux network implementation, to avoid netlink or TC causing packet recursion too many times, which could lead to stack overflow, the ``XMIT_RESUION_LIMIT`` is set to 8. If Gossip needs to broadcast to more than 8 nodes, consider modifying the kernel source code.


#### AF_XDP Kernel bypass

Our programming framework is intricately designed to meticulously analyze the type of incoming packets. Specifically, it is engineered to filter and redirect only those packets classified as type 1 and 2 to the xsk_map, while ensuring that TCP packets are seamlessly guided along the established socket pathway to the controller. This selective redirection approach is pivotal, as it leverages the AF_XDP Socket's high-performance characteristics for certain types of traffic, while maintaining the traditional processing route for TCP packets. Such a differentiated handling mechanism highlights our system's capability to optimize network traffic processing by integrating advanced packet filtering and redirection techniques, thereby enhancing both the efficiency and reliability of packet receiving and processing within complex networking environments.
43 changes: 29 additions & 14 deletions script/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,19 @@ def run_shell_script():
# Function to generate metadata updates
def metadata_update(node_list):
while True:
for _ in range(1500000000):
for _ in range(1500000):
node = random.choice(node_list) # Choose a node for update
metadata = random_metadata_string()
send_metadata_update(node, metadata)
time.sleep(1/1500000000 * 60) # Spread out the 150 updates over one minute
time.sleep(1/1500000 * 60) # Spread out the 150 updates over one minute

def metadata_update_fix(node_list):
update_count = 25000
for _ in range(update_count):
node = random.choice(node_list)
metadata = random_metadata_string()
send_metadata_update(node, metadata)
#time.sleep(0.001)

def test_configurations():
# Retrieve the IP addresses
Expand Down Expand Up @@ -139,6 +147,7 @@ def main():
parser.add_argument('-t', '--test', action='store_true', help='Test parsing config', required=False)
parser.add_argument('-c', '--configure', action='store_true', help='Configure the servers', required=False)
parser.add_argument('-b', '--bench', action='store_true', help='Benchmakr the cluster', required=False)
parser.add_argument('-bf', '--bench-fix', action='store_true', help='Benchmakr the cluster', required=False)
parser.add_argument('-gl', '--get-list', action='store_true', help='Get all output list of gossip node', required=False)

args = parser.parse_args()
Expand All @@ -151,23 +160,29 @@ def main():
elif args.bench:
ip_list = get_ip_addresses()
# print(ip_list[0])
process1 = multiprocessing.Process(target=metadata_update, args=(["http://"+ip_list[0]+":8000"],))
#process2 = multiprocessing.Process(target=run_shell_script)

#process2 = multiprocessing.Process(target=metadata_update, args=(NODES_2,))
#process3 = multiprocessing.Process(target=metadata_update, args=(NODES_3,))

# process1 = multiprocessing.Process(target=metadata_update, args=(["http://"+ip_list[0]+":8000"],))

metadata_update(["http://"+ip_list[0]+":8000"])
# Start the processes
process1.start()
#process2.start()
#process3.start()
# process1.start()

# Wait for both processes to complete
process1.join()
#process2.join()
#process3.join()
#process1.join()
elif args.get_list:
count_list_length()
elif args.bench_fix:
ip_list = get_ip_addresses()
# metadata_update_fix(["http://"+ip_list[0]+":8000"])
process1 = multiprocessing.Process(target=metadata_update_fix, args=(["http://"+ip_list[0]+":8000"],))
process2 = multiprocessing.Process(target=metadata_update_fix, args=(["http://"+ip_list[1]+":8000"],))
process3 = multiprocessing.Process(target=metadata_update_fix, args=(["http://"+ip_list[2]+":8000"],))

process1.start()
process2.start()
process3.start()
process1.join()
process2.join()
process3.join()
else:
print("No arguments given")

Expand Down
Loading