diff --git a/cmd/beekeeper/cmd/print_addresses.go b/cmd/beekeeper/cmd/print_addresses.go index 4c073c970..eb9f10ded 100644 --- a/cmd/beekeeper/cmd/print_addresses.go +++ b/cmd/beekeeper/cmd/print_addresses.go @@ -37,7 +37,10 @@ func (c *command) initPrintAddresses() *cobra.Command { } for i, a := range addresses { - fmt.Printf("%d. %s\n", i, a) + fmt.Printf("Node %d. overlay: %s\n", i, a.Overlay) + for _, u := range a.Underlay { + fmt.Printf("Node %d. underlay: %s\n", i, u) + } } return diff --git a/cmd/beekeeper/cmd/print_overlay.go b/cmd/beekeeper/cmd/print_overlay.go index 86d33b8ba..6bb54b605 100644 --- a/cmd/beekeeper/cmd/print_overlay.go +++ b/cmd/beekeeper/cmd/print_overlay.go @@ -37,7 +37,7 @@ func (c *command) initPrintOverlay() *cobra.Command { } for i, o := range overlays { - fmt.Printf("%d. %s\n", i, o.String()) + fmt.Printf("Node %d. %s\n", i, o.String()) } return diff --git a/cmd/beekeeper/cmd/print_peers.go b/cmd/beekeeper/cmd/print_peers.go index 0f9d59311..b93a3fa23 100644 --- a/cmd/beekeeper/cmd/print_peers.go +++ b/cmd/beekeeper/cmd/print_peers.go @@ -37,7 +37,9 @@ func (c *command) initPrintPeers() *cobra.Command { } for i, a := range peers { - fmt.Printf("%d. %s\n", i, a) + for _, p := range a { + fmt.Printf("Node %d. %s\n", i, p) + } } return diff --git a/cmd/beekeeper/cmd/print_topologies.go b/cmd/beekeeper/cmd/print_topologies.go index f76a88210..2608a83a4 100644 --- a/cmd/beekeeper/cmd/print_topologies.go +++ b/cmd/beekeeper/cmd/print_topologies.go @@ -37,7 +37,14 @@ func (c *command) initPrintTopologies() *cobra.Command { } for i, t := range topologies { - fmt.Printf("%d. %+v\n", i, t) + fmt.Printf("Node %d. overlay: %s\n", i, t.Overlay) + fmt.Printf("Node %d. population: %d\n", i, t.Population) + fmt.Printf("Node %d. connected: %d\n", i, t.Connected) + fmt.Printf("Node %d. depth: %d\n", i, t.Depth) + fmt.Printf("Node %d. nnLowWatermark: %d\n", i, t.NnLowWatermark) + for k, v := range t.Bins { + fmt.Printf("Node %d. %s %+v\n", i, k, v) + } } return diff --git a/pkg/bee/node.go b/pkg/bee/node.go index b90b4da6f..4b6626f3b 100644 --- a/pkg/bee/node.go +++ b/pkg/bee/node.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "net/http" "net/url" + "sync" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/beekeeper/pkg/beeclient/api" @@ -107,6 +108,42 @@ func (n *Node) Ping(ctx context.Context, node swarm.Address) (rtt string, err er return r.RTT, nil } +// PingStreamMsg represents message sent over the PingStream channel +type PingStreamMsg struct { + Node swarm.Address + RTT string + Index int + Error error +} + +// PingStream returns stream of ping results for given nodes +func (n *Node) PingStream(ctx context.Context, nodes []swarm.Address) <-chan PingStreamMsg { + pingStream := make(chan PingStreamMsg) + + var wg sync.WaitGroup + for i, node := range nodes { + wg.Add(1) + go func(i int, node swarm.Address) { + defer wg.Done() + + rtt, err := n.Ping(ctx, node) + pingStream <- PingStreamMsg{ + Node: node, + RTT: rtt, + Index: i, + Error: err, + } + }(i, node) + } + + go func() { + wg.Wait() + close(pingStream) + }() + + return pingStream +} + // Topology represents Kademlia topology type Topology struct { Overlay swarm.Address diff --git a/pkg/check/kademlia/kademlia.go b/pkg/check/kademlia/kademlia.go index 04ea14f19..95c8a0093 100644 --- a/pkg/check/kademlia/kademlia.go +++ b/pkg/check/kademlia/kademlia.go @@ -23,9 +23,11 @@ var ( func Check(cluster bee.Cluster) (err error) { ctx := context.Background() + fmt.Printf("Checking for full connectivity:\n") if err := fullconnectivity.Check(cluster); err == nil { return errKademliaFullConnectivity } + fmt.Printf("Full connectivity not present, continuing with kademlia topology check\n") topologies, err := cluster.Topologies(ctx) if err != nil { diff --git a/pkg/check/pingpong/pingpong.go b/pkg/check/pingpong/pingpong.go index ac449ce89..2a95e92dd 100644 --- a/pkg/check/pingpong/pingpong.go +++ b/pkg/check/pingpong/pingpong.go @@ -3,7 +3,6 @@ package pingpong import ( "context" "fmt" - "sort" "sync" "github.com/ethersphere/bee/pkg/swarm" @@ -14,25 +13,12 @@ import ( func Check(cluster bee.Cluster) (err error) { ctx := context.Background() - var result []nodeStreamMsg for n := range nodeStream(ctx, cluster.Nodes) { - result = append(result, n) - } - sort.SliceStable(result, func(i, j int) bool { - return result[i].Index < result[j].Index - }) - - for i, n := range result { if n.Error != nil { - fmt.Printf("node %d: %s\n", i, n.Error) + fmt.Printf("node %d: %s\n", n.Index, n.Error) continue } - for j, p := range n.PingResults { - if p.Error != nil { - fmt.Printf("node %d had error pinging peer %d: %s\n", i, j, p.Error) - } - fmt.Printf("Node %d. Peer %d RTT: %s. Node: %s Peer: %s\n", i, j, p.RTT, n.Address, p.Address) - } + fmt.Printf("Node %d. Peer %d RTT: %s. Node: %s Peer: %s\n", n.Index, n.PeerIndex, n.RTT, n.Address, n.PeerAddress) } return @@ -41,7 +27,9 @@ func Check(cluster bee.Cluster) (err error) { type nodeStreamMsg struct { Index int Address swarm.Address - PingResults []pingStreamMsg + PeerIndex int + PeerAddress swarm.Address + RTT string Error error } @@ -51,34 +39,32 @@ func nodeStream(ctx context.Context, nodes []bee.Node) <-chan nodeStreamMsg { var wg sync.WaitGroup for i, node := range nodes { wg.Add(1) - go func(i int, n bee.Node) { + go func(i int, node bee.Node) { defer wg.Done() - address, err := n.Overlay(ctx) + address, err := node.Overlay(ctx) if err != nil { nodeStream <- nodeStreamMsg{Index: i, Error: err} return } - peers, err := n.Peers(ctx) + peers, err := node.Peers(ctx) if err != nil { nodeStream <- nodeStreamMsg{Index: i, Error: err} return } - // TODO: remove pingResults and ordering and send result in channel - var pingResults []pingStreamMsg - for m := range pingStream(ctx, n, peers) { - pingResults = append(pingResults, m) - } - sort.SliceStable(pingResults, func(i, j int) bool { - return pingResults[i].Index < pingResults[j].Index - }) - - nodeStream <- nodeStreamMsg{ - Index: i, - Address: address, - PingResults: pingResults, + for m := range node.PingStream(ctx, peers) { + if m.Error != nil { + nodeStream <- nodeStreamMsg{Index: i, Error: m.Error} + } + nodeStream <- nodeStreamMsg{ + Index: i, + Address: address, + PeerIndex: m.Index, + PeerAddress: m.Node, + RTT: m.RTT, + } } }(i, node) } @@ -90,36 +76,3 @@ func nodeStream(ctx context.Context, nodes []bee.Node) <-chan nodeStreamMsg { return nodeStream } - -type pingStreamMsg struct { - Index int - Address swarm.Address - RTT string - Error error -} - -func pingStream(ctx context.Context, node bee.Node, peers []swarm.Address) <-chan pingStreamMsg { - pingStream := make(chan pingStreamMsg) - - var wg sync.WaitGroup - for i, peer := range peers { - wg.Add(1) - go func(n bee.Node, i int, p swarm.Address) { - defer wg.Done() - rtt, err := n.Ping(ctx, p) - pingStream <- pingStreamMsg{ - Index: i, - Address: p, - RTT: rtt, - Error: err, - } - }(node, i, peer) - } - - go func() { - wg.Wait() - close(pingStream) - }() - - return pingStream -} diff --git a/version.go b/version.go index 137964b75..789a8eb4c 100644 --- a/version.go +++ b/version.go @@ -1,7 +1,7 @@ package beekeeper var ( - version = "0.2.3" // manually set semantic version number + version = "0.2.4" // manually set semantic version number commit string // automatically set git commit hash // Version TODO