From 010e3b85e82b7f90de7fea54384eb9a4620282a8 Mon Sep 17 00:00:00 2001 From: kerwenwwer Date: Tue, 9 Apr 2024 21:22:48 +0800 Subject: [PATCH] Split targetNodes into groups to manage bulk broadcasts to avoid BPF stack overflow --- cmd/sync.go | 141 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 104 insertions(+), 37 deletions(-) diff --git a/cmd/sync.go b/cmd/sync.go index 33108f5..b0597b8 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -188,72 +188,139 @@ func broadcast(nodeList *NodeList, p common.Packet) { } } +// func fastBroadcast(nodeList *NodeList, p common.Packet) { +// nodes := nodeList.Get() +// var targetNodes []common.Node + +// // Select some uninfected nodes +// i := 0 + +// for _, v := range nodes { + +// // If the maximum number of pushes (Amount) has been reached +// if i >= nodeList.Amount { +// // Stop the broadcast +// break +// } + +// if v.Addr == nodeList.localNode.Addr && v.Port == nodeList.localNode.Port { +// // Skip to broadcast to self +// continue +// } + +// // If the node has already been "infected" +// if p.Infected[v.Addr+":"+strconv.Itoa(v.Port)] { +// // Skip this node +// continue +// } + +// p.Infected[v.Addr+":"+strconv.Itoa(v.Port)] = true // Mark the node as infected +// // Set the target node for sending +// targetNode := common.Node{ +// Addr: v.Addr, // Set the target address +// Port: v.Port, // Set the target port +// Mac: v.Mac, // Set the target mac +// } + +// // Add the node to the broadcast list +// targetNodes = append(targetNodes, targetNode) +// i++ +// } + +// if len(targetNodes) != 0 { +// /* Handle atomic counter operation for map_id*/ +// map_id := nodeList.Counter.Next() +// if map_id == 0 { +// nodeList.println("[Map ID error]: map_id is 0") +// } +// p.Mapkey = map_id +// p.Type = 1 + +// if err := bpf.TcPushtoMap(nodeList.Program, map_id, targetNodes); err != nil { +// nodeList.println("[TC error]:", "Failed to push to map", err) +// } + +// bs, err := json.Marshal(p) +// if err != nil { +// nodeList.println("[Infection Error]:", err) +// } + +// addr := targetNodes[0].Addr +// port := targetNodes[0].Port + +// write(nodeList, addr, int(port), bs) // Send the packet +// } else { +// //nodeList.println("[Not target]:", "No target nodes") +// } +// } + func fastBroadcast(nodeList *NodeList, p common.Packet) { nodes := nodeList.Get() var targetNodes []common.Node - // Select some uninfected nodes i := 0 - for _, v := range nodes { - - // If the maximum number of pushes (Amount) has been reached if i >= nodeList.Amount { - // Stop the broadcast break } if v.Addr == nodeList.localNode.Addr && v.Port == nodeList.localNode.Port { - // Skip to broadcast to self continue } - // If the node has already been "infected" if p.Infected[v.Addr+":"+strconv.Itoa(v.Port)] { - // Skip this node continue } - p.Infected[v.Addr+":"+strconv.Itoa(v.Port)] = true // Mark the node as infected - // Set the target node for sending - targetNode := common.Node{ - Addr: v.Addr, // Set the target address - Port: v.Port, // Set the target port - Mac: v.Mac, // Set the target mac - } - - // Add the node to the broadcast list + p.Infected[v.Addr+":"+strconv.Itoa(v.Port)] = true + targetNode := common.Node{Addr: v.Addr, Port: v.Port, Mac: v.Mac} targetNodes = append(targetNodes, targetNode) i++ } - //nodeList.println("[Broadcast]:", len(targetNodes)) - - if len(targetNodes) != 0 { - /* Handle atomic counter operation for map_id*/ - map_id := nodeList.Counter.Next() - if map_id == 0 { - nodeList.println("[Map ID error]: map_id is 0") + // Function to split targetNodes into smaller slices if more than 25 nodes + splitNodes := func(nodes []common.Node, size int) [][]common.Node { + var chunks [][]common.Node + for size < len(nodes) { + nodes, chunks = nodes[size:], append(chunks, nodes[0:size:size]) } - p.Mapkey = map_id - p.Type = 1 + chunks = append(chunks, nodes) + return chunks + } - if err := bpf.TcPushtoMap(nodeList.Program, map_id, targetNodes); err != nil { - nodeList.println("[TC error]:", "Failed to push to map", err) - } + // Handling targetNodes exceeding 25 nodes + if len(targetNodes) > 25 { + nodeGroups := splitNodes(targetNodes, 25) - bs, err := json.Marshal(p) - if err != nil { - nodeList.println("[Infection Error]:", err) + for _, group := range nodeGroups { + sendGroup(nodeList, group, p) } + } else if len(targetNodes) != 0 { + sendGroup(nodeList, targetNodes, p) + } +} + +// sendGroup handles sending a packet to a group of nodes +func sendGroup(nodeList *NodeList, nodes []common.Node, p common.Packet) { + map_id := nodeList.Counter.Next() + if map_id == 0 { + nodeList.println("[Map ID error]: map_id is 0") + } + p.Mapkey = map_id + p.Type = 1 - addr := targetNodes[0].Addr - port := targetNodes[0].Port + if err := bpf.TcPushtoMap(nodeList.Program, map_id, nodes); err != nil { + nodeList.println("[TC error]:", "Failed to push to map", err) + } - write(nodeList, addr, int(port), bs) // Send the packet - } else { - //nodeList.println("[Not target]:", "No target nodes") + bs, err := json.Marshal(p) + if err != nil { + nodeList.println("[Infection Error]:", err) } + + addr := nodes[0].Addr + port := nodes[0].Port + write(nodeList, addr, int(port), bs) // Send the packet to each node in the group } // Initiate a data exchange request between two nodes