Skip to content

Commit

Permalink
Feature/concurrency (#12)
Browse files Browse the repository at this point in the history
* Rewrite pingpong check to use streams

* Improve check

* Fix lint

* Wrap peer ping

* Rewrite fullconnectivity check

* Update kademlia and peercount checks

* Create pseudorandom generator per node

* Update HasChunk parameters

* Create chunkStream in pushsync

* Experiment

* Add option to execute pushsync concurrently

* Bump version
  • Loading branch information
Svetomir Smiljkovic authored Jun 1, 2020
1 parent b539a3f commit 11767a2
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 53 deletions.
11 changes: 11 additions & 0 deletions cmd/beekeeper/cmd/check_pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ func (c *command) initCheckPushSync() *cobra.Command {
optionNameUploadNodeCount = "upload-node-count"
optionNameChunksPerNode = "chunks-per-node"
optionNameSeed = "seed"
optionNameConcurrent = "concurrent"
)
var concurrent bool

cmd := &cobra.Command{
Use: "pushsync",
Expand Down Expand Up @@ -51,6 +53,14 @@ and checks if chunks are synced to their closest nodes.`,
seed = random.Int64()
}

if concurrent {
return pushsync.CheckConcurrent(cluster, pushsync.Options{
UploadNodeCount: c.config.GetInt(optionNameUploadNodeCount),
ChunksPerNode: c.config.GetInt(optionNameChunksPerNode),
Seed: seed,
})
}

return pushsync.Check(cluster, pushsync.Options{
UploadNodeCount: c.config.GetInt(optionNameUploadNodeCount),
ChunksPerNode: c.config.GetInt(optionNameChunksPerNode),
Expand All @@ -63,6 +73,7 @@ and checks if chunks are synced to their closest nodes.`,
cmd.Flags().IntP(optionNameUploadNodeCount, "u", 1, "number of nodes to upload chunks to")
cmd.Flags().IntP(optionNameChunksPerNode, "p", 1, "number of chunks to upload per node")
cmd.Flags().Int64P(optionNameSeed, "s", 0, "seed for generating chunks; if not set, will be random")
cmd.Flags().BoolVar(&concurrent, optionNameConcurrent, false, "upload chunks concurrently")

return cmd
}
4 changes: 2 additions & 2 deletions pkg/bee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func (n *Node) DownloadChunk(ctx context.Context, a swarm.Address) (data []byte,
}

// HasChunk returns true/false if node has a chunk
func (n *Node) HasChunk(ctx context.Context, c Chunk) (bool, error) {
return n.debug.Node.HasChunk(ctx, c.Address())
func (n *Node) HasChunk(ctx context.Context, a swarm.Address) (bool, error) {
return n.debug.Node.HasChunk(ctx, a)
}

// Overlay returns node's overlay address
Expand Down
24 changes: 13 additions & 11 deletions pkg/check/fullconnectivity/fullconnectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,28 @@ func Check(cluster bee.Cluster) (err error) {
return err
}

var expectedPeerCount = cluster.Size() - 1
for i, n := range cluster.Nodes {
peers, err := n.Peers(ctx)
if err != nil {
return fmt.Errorf("node %d: %w", i, err)
}
peers, err := cluster.Peers(ctx)
if err != nil {
return err
}

clusterSize := cluster.Size()
expectedPeerCount := clusterSize - 1

if len(peers) != expectedPeerCount {
fmt.Printf("Node %d. Failed. Peers %d/%d. Node: %s\n", i, len(peers), expectedPeerCount, overlays[i].String())
for i := 0; i < clusterSize; i++ {
if len(peers[i]) != expectedPeerCount {
fmt.Printf("Node %d. Failed. Peers %d/%d. Node: %s\n", i, len(peers[i]), expectedPeerCount, overlays[i])
return errFullConnectivity
}

for _, p := range peers {
for _, p := range peers[i] {
if !contains(overlays, p) {
fmt.Printf("Node %d. Failed. Invalid peer: %s. Node: %s\n", i, p.String(), overlays[i].String())
fmt.Printf("Node %d. Failed. Invalid peer: %s. Node: %s\n", i, p.String(), overlays[i])
return errFullConnectivity
}
}

fmt.Printf("Node %d. Passed. Peers %d/%d. All peers are valid. Node: %s\n", i, len(peers), expectedPeerCount, overlays[i].String())
fmt.Printf("Node %d. Passed. Peers %d/%d. All peers are valid. Node: %s\n", i, len(peers[i]), expectedPeerCount, overlays[i])
}

return
Expand Down
12 changes: 5 additions & 7 deletions pkg/check/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,16 @@ var (
func Check(cluster bee.Cluster) (err error) {
ctx := context.Background()

fmt.Printf("Checking for full connectivity:\n")
if err := fullconnectivity.Check(cluster); err == nil {
return errKademliaFullConnectivity
}
fmt.Printf("Full connectivity not present, continuing with kademlia topology check\n")

for i, n := range cluster.Nodes {
t, err := n.Topology(ctx)
if err != nil {
return fmt.Errorf("node %d: %w", i, err)
}
topologies, err := cluster.Topologies(ctx)
if err != nil {
return err
}

for i, t := range topologies {
if t.Depth == 0 {
fmt.Printf("Node %d. Kademlia not healthy. Depth %d. Node: %s\n", i, t.Depth, t.Overlay)
return errKadmeliaNotHealthy
Expand Down
24 changes: 12 additions & 12 deletions pkg/check/peercount/peercount.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,21 @@ import (

// Check executes peer count check on cluster
func Check(cluster bee.Cluster) (err error) {
var expectedPeerCount = cluster.Size() - 1

ctx := context.Background()
for i, n := range cluster.Nodes {
o, err := n.Overlay(ctx)
if err != nil {
return fmt.Errorf("node %d: %w", i, err)
}

peers, err := n.Peers(ctx)
if err != nil {
return fmt.Errorf("node %d: %w", i, err)
}
overlays, err := cluster.Overlays(ctx)
if err != nil {
return err
}

peers, err := cluster.Peers(ctx)
if err != nil {
return err
}

fmt.Printf("Node %d. Peers %d/%d. Node: %s\n", i, len(peers), expectedPeerCount, o.String())
clusterSize := cluster.Size()
for i := 0; i < clusterSize; i++ {
fmt.Printf("Node %d. Peers %d/%d. Node: %s\n", i, len(peers[i]), clusterSize-1, overlays[i])
}

return
Expand Down
119 changes: 105 additions & 14 deletions pkg/check/pingpong/pingpong.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,123 @@ package pingpong
import (
"context"
"fmt"
"sort"
"sync"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee"
)

// Check executes ping from all nodes to all other nodes in the cluster
func Check(cluster bee.Cluster) (err error) {
ctx := context.Background()
for i, n := range cluster.Nodes {
o, err := n.Overlay(ctx)
if err != nil {
return fmt.Errorf("node %d: %w", i, err)
}

peers, err := n.Peers(ctx)
if err != nil {
return fmt.Errorf("node %d: %w", i, err)
}
var result []nodeStreamMsg
for n := range nodeStream(ctx, cluster.Nodes) {
result = append(result, n)
}
sort.SliceStable(result, func(i, j int) bool {
return result[i].Index < result[j].Index
})

for j, p := range peers {
rtt, err := n.Ping(ctx, p)
if err != nil {
return fmt.Errorf("node %d had error pinging peer %s: %w", i, p.String(), err)
for i, n := range result {
if n.Error != nil {
fmt.Printf("node %d: %s\n", i, n.Error)
continue
}
for j, p := range n.PingResults {
if p.Error != nil {
fmt.Printf("node %d had error pinging peer %d: %s\n", i, j, p.Error)
}
fmt.Printf("Node %d. Peer %d RTT: %s. Node: %s Peer: %s \n", i, j, rtt, o.String(), p.String())
fmt.Printf("Node %d. Peer %d RTT: %s. Node: %s Peer: %s\n", i, j, p.RTT, n.Address, p.Address)
}
}

return
}

type nodeStreamMsg struct {
Index int
Address swarm.Address
PingResults []pingStreamMsg
Error error
}

func nodeStream(ctx context.Context, nodes []bee.Node) <-chan nodeStreamMsg {
nodeStream := make(chan nodeStreamMsg)

var wg sync.WaitGroup
for i, node := range nodes {
wg.Add(1)
go func(i int, n bee.Node) {
defer wg.Done()

address, err := n.Overlay(ctx)
if err != nil {
nodeStream <- nodeStreamMsg{Index: i, Error: err}
return
}

peers, err := n.Peers(ctx)
if err != nil {
nodeStream <- nodeStreamMsg{Index: i, Error: err}
return
}

// TODO: remove pingResults and ordering and send result in channel
var pingResults []pingStreamMsg
for m := range pingStream(ctx, n, peers) {
pingResults = append(pingResults, m)
}
sort.SliceStable(pingResults, func(i, j int) bool {
return pingResults[i].Index < pingResults[j].Index
})

nodeStream <- nodeStreamMsg{
Index: i,
Address: address,
PingResults: pingResults,
}
}(i, node)
}

go func() {
wg.Wait()
close(nodeStream)
}()

return nodeStream
}

type pingStreamMsg struct {
Index int
Address swarm.Address
RTT string
Error error
}

func pingStream(ctx context.Context, node bee.Node, peers []swarm.Address) <-chan pingStreamMsg {
pingStream := make(chan pingStreamMsg)

var wg sync.WaitGroup
for i, peer := range peers {
wg.Add(1)
go func(n bee.Node, i int, p swarm.Address) {
defer wg.Done()
rtt, err := n.Ping(ctx, p)
pingStream <- pingStreamMsg{
Index: i,
Address: p,
RTT: rtt,
Error: err,
}
}(node, i, peer)
}

go func() {
wg.Wait()
close(pingStream)
}()

return pingStream
}
65 changes: 62 additions & 3 deletions pkg/check/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"errors"
"fmt"
"math/rand"
"sync"
"time"

"github.com/ethersphere/beekeeper/pkg/random"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee"
)
Expand All @@ -23,7 +26,7 @@ var errPushSync = errors.New("push sync")
// Check uploads given chunks on cluster and checks pushsync ability of the cluster
func Check(c bee.Cluster, o Options) (err error) {
ctx := context.Background()
rnd := rand.New(rand.NewSource(o.Seed))
rnds := random.PseudoGenerators(o.Seed, o.UploadNodeCount)
fmt.Printf("Seed: %d\n", o.Seed)

overlays, err := c.Overlays(ctx)
Expand All @@ -33,7 +36,7 @@ func Check(c bee.Cluster, o Options) (err error) {

for i := 0; i < o.UploadNodeCount; i++ {
for j := 0; j < o.ChunksPerNode; j++ {
chunk, err := bee.NewRandomChunk(rnd)
chunk, err := bee.NewRandomChunk(rnds[i])
if err != nil {
return fmt.Errorf("node %d: %w", i, err)
}
Expand All @@ -49,7 +52,7 @@ func Check(c bee.Cluster, o Options) (err error) {
index := findIndex(overlays, closest)

time.Sleep(1 * time.Second)
synced, err := c.Nodes[index].HasChunk(ctx, chunk)
synced, err := c.Nodes[index].HasChunk(ctx, chunk.Address())
if err != nil {
return fmt.Errorf("node %d: %w", index, err)
}
Expand All @@ -65,6 +68,62 @@ func Check(c bee.Cluster, o Options) (err error) {
return
}

// CheckConcurrent uploads given chunks concurrently on cluster and checks pushsync ability of the cluster
func CheckConcurrent(c bee.Cluster, o Options) (err error) {
ctx := context.Background()
rnds := random.PseudoGenerators(o.Seed, o.UploadNodeCount)
fmt.Printf("Seed: %d\n", o.Seed)

for i := 0; i < o.UploadNodeCount; i++ {
var chunkResults []chunkStreamMsg
for m := range chunkStream(ctx, c.Nodes[i], rnds[i], o.ChunksPerNode) {
chunkResults = append(chunkResults, m)
}
for j, c := range chunkResults {
fmt.Println(i, j, c.Index, c.Chunk.Size(), c.Error)
}
}

return
}

type chunkStreamMsg struct {
Index int
Chunk bee.Chunk
Error error
}

func chunkStream(ctx context.Context, node bee.Node, rnd *rand.Rand, count int) <-chan chunkStreamMsg {
chunkStream := make(chan chunkStreamMsg)

var wg sync.WaitGroup
for i := 0; i < count; i++ {
wg.Add(1)
go func(n bee.Node, i int) {
defer wg.Done()
chunk, err := bee.NewRandomChunk(rnd)
if err != nil {
chunkStream <- chunkStreamMsg{Index: i, Error: err}
return
}

if err := n.UploadChunk(ctx, &chunk); err != nil {
chunkStream <- chunkStreamMsg{Index: i, Error: err}
return
}

chunkStream <- chunkStreamMsg{Index: i, Chunk: chunk}
}(node, i)
}

go func() {
wg.Wait()
close(chunkStream)
}()

return chunkStream
}

// findIndex returns index of a given swarm.Address in a given set of swarm.Addresses, or -1 if not found
func findIndex(overlays []swarm.Address, addr swarm.Address) int {
for i, a := range overlays {
Expand Down
Loading

0 comments on commit 11767a2

Please sign in to comment.