Skip to content

Commit

Permalink
Feature/concurrency (#13)
Browse files Browse the repository at this point in the history
* Update pingpong

* Update pingpong

* Update pingpong

* Update pingpong

* Update print output

* Implement PingStream as a method on Node

* Update output

* Bump version
  • Loading branch information
Svetomir Smiljkovic authored Jun 3, 2020
1 parent 11767a2 commit dc9c5be
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 71 deletions.
5 changes: 4 additions & 1 deletion cmd/beekeeper/cmd/print_addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ func (c *command) initPrintAddresses() *cobra.Command {
}

for i, a := range addresses {
fmt.Printf("%d. %s\n", i, a)
fmt.Printf("Node %d. overlay: %s\n", i, a.Overlay)
for _, u := range a.Underlay {
fmt.Printf("Node %d. underlay: %s\n", i, u)
}
}

return
Expand Down
2 changes: 1 addition & 1 deletion cmd/beekeeper/cmd/print_overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *command) initPrintOverlay() *cobra.Command {
}

for i, o := range overlays {
fmt.Printf("%d. %s\n", i, o.String())
fmt.Printf("Node %d. %s\n", i, o.String())
}

return
Expand Down
4 changes: 3 additions & 1 deletion cmd/beekeeper/cmd/print_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func (c *command) initPrintPeers() *cobra.Command {
}

for i, a := range peers {
fmt.Printf("%d. %s\n", i, a)
for _, p := range a {
fmt.Printf("Node %d. %s\n", i, p)
}
}

return
Expand Down
9 changes: 8 additions & 1 deletion cmd/beekeeper/cmd/print_topologies.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ func (c *command) initPrintTopologies() *cobra.Command {
}

for i, t := range topologies {
fmt.Printf("%d. %+v\n", i, t)
fmt.Printf("Node %d. overlay: %s\n", i, t.Overlay)
fmt.Printf("Node %d. population: %d\n", i, t.Population)
fmt.Printf("Node %d. connected: %d\n", i, t.Connected)
fmt.Printf("Node %d. depth: %d\n", i, t.Depth)
fmt.Printf("Node %d. nnLowWatermark: %d\n", i, t.NnLowWatermark)
for k, v := range t.Bins {
fmt.Printf("Node %d. %s %+v\n", i, k, v)
}
}

return
Expand Down
37 changes: 37 additions & 0 deletions pkg/bee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"sync"

"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/beeclient/api"
Expand Down Expand Up @@ -107,6 +108,42 @@ func (n *Node) Ping(ctx context.Context, node swarm.Address) (rtt string, err er
return r.RTT, nil
}

// PingStreamMsg represents message sent over the PingStream channel
type PingStreamMsg struct {
Node swarm.Address
RTT string
Index int
Error error
}

// PingStream returns stream of ping results for given nodes
func (n *Node) PingStream(ctx context.Context, nodes []swarm.Address) <-chan PingStreamMsg {
pingStream := make(chan PingStreamMsg)

var wg sync.WaitGroup
for i, node := range nodes {
wg.Add(1)
go func(i int, node swarm.Address) {
defer wg.Done()

rtt, err := n.Ping(ctx, node)
pingStream <- PingStreamMsg{
Node: node,
RTT: rtt,
Index: i,
Error: err,
}
}(i, node)
}

go func() {
wg.Wait()
close(pingStream)
}()

return pingStream
}

// Topology represents Kademlia topology
type Topology struct {
Overlay swarm.Address
Expand Down
2 changes: 2 additions & 0 deletions pkg/check/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ var (
func Check(cluster bee.Cluster) (err error) {
ctx := context.Background()

fmt.Printf("Checking for full connectivity:\n")
if err := fullconnectivity.Check(cluster); err == nil {
return errKademliaFullConnectivity
}
fmt.Printf("Full connectivity not present, continuing with kademlia topology check\n")

topologies, err := cluster.Topologies(ctx)
if err != nil {
Expand Down
85 changes: 19 additions & 66 deletions pkg/check/pingpong/pingpong.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pingpong
import (
"context"
"fmt"
"sort"
"sync"

"github.com/ethersphere/bee/pkg/swarm"
Expand All @@ -14,25 +13,12 @@ import (
func Check(cluster bee.Cluster) (err error) {
ctx := context.Background()

var result []nodeStreamMsg
for n := range nodeStream(ctx, cluster.Nodes) {
result = append(result, n)
}
sort.SliceStable(result, func(i, j int) bool {
return result[i].Index < result[j].Index
})

for i, n := range result {
if n.Error != nil {
fmt.Printf("node %d: %s\n", i, n.Error)
fmt.Printf("node %d: %s\n", n.Index, n.Error)
continue
}
for j, p := range n.PingResults {
if p.Error != nil {
fmt.Printf("node %d had error pinging peer %d: %s\n", i, j, p.Error)
}
fmt.Printf("Node %d. Peer %d RTT: %s. Node: %s Peer: %s\n", i, j, p.RTT, n.Address, p.Address)
}
fmt.Printf("Node %d. Peer %d RTT: %s. Node: %s Peer: %s\n", n.Index, n.PeerIndex, n.RTT, n.Address, n.PeerAddress)
}

return
Expand All @@ -41,7 +27,9 @@ func Check(cluster bee.Cluster) (err error) {
type nodeStreamMsg struct {
Index int
Address swarm.Address
PingResults []pingStreamMsg
PeerIndex int
PeerAddress swarm.Address
RTT string
Error error
}

Expand All @@ -51,34 +39,32 @@ func nodeStream(ctx context.Context, nodes []bee.Node) <-chan nodeStreamMsg {
var wg sync.WaitGroup
for i, node := range nodes {
wg.Add(1)
go func(i int, n bee.Node) {
go func(i int, node bee.Node) {
defer wg.Done()

address, err := n.Overlay(ctx)
address, err := node.Overlay(ctx)
if err != nil {
nodeStream <- nodeStreamMsg{Index: i, Error: err}
return
}

peers, err := n.Peers(ctx)
peers, err := node.Peers(ctx)
if err != nil {
nodeStream <- nodeStreamMsg{Index: i, Error: err}
return
}

// TODO: remove pingResults and ordering and send result in channel
var pingResults []pingStreamMsg
for m := range pingStream(ctx, n, peers) {
pingResults = append(pingResults, m)
}
sort.SliceStable(pingResults, func(i, j int) bool {
return pingResults[i].Index < pingResults[j].Index
})

nodeStream <- nodeStreamMsg{
Index: i,
Address: address,
PingResults: pingResults,
for m := range node.PingStream(ctx, peers) {
if m.Error != nil {
nodeStream <- nodeStreamMsg{Index: i, Error: m.Error}
}
nodeStream <- nodeStreamMsg{
Index: i,
Address: address,
PeerIndex: m.Index,
PeerAddress: m.Node,
RTT: m.RTT,
}
}
}(i, node)
}
Expand All @@ -90,36 +76,3 @@ func nodeStream(ctx context.Context, nodes []bee.Node) <-chan nodeStreamMsg {

return nodeStream
}

type pingStreamMsg struct {
Index int
Address swarm.Address
RTT string
Error error
}

func pingStream(ctx context.Context, node bee.Node, peers []swarm.Address) <-chan pingStreamMsg {
pingStream := make(chan pingStreamMsg)

var wg sync.WaitGroup
for i, peer := range peers {
wg.Add(1)
go func(n bee.Node, i int, p swarm.Address) {
defer wg.Done()
rtt, err := n.Ping(ctx, p)
pingStream <- pingStreamMsg{
Index: i,
Address: p,
RTT: rtt,
Error: err,
}
}(node, i, peer)
}

go func() {
wg.Wait()
close(pingStream)
}()

return pingStream
}
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package beekeeper

var (
version = "0.2.3" // manually set semantic version number
version = "0.2.4" // manually set semantic version number
commit string // automatically set git commit hash

// Version TODO
Expand Down

0 comments on commit dc9c5be

Please sign in to comment.