Skip to content

Commit

Permalink
fetch: smarter peer selection (#6477)
Browse files Browse the repository at this point in the history
## Motivation

Most of the time, there are some nodes that already joined the P2P
network but didn't complete their initialization yet, and thus
attempts to e.g. fetch blobs to them end up in stream setup errors
(protocol not supported).
Additionally, during initial phases of `syncv2` testing, we're going
to have a limited number of nodes supporting `sync/2` protocol, and
when choosing peers for `syncv2`, we must only include these peers.



Co-authored-by: Ivan Shvedunov <[email protected]>
  • Loading branch information
ivan4th and ivan4th committed Nov 21, 2024
1 parent 5a398cf commit 30880eb
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 14 deletions.
16 changes: 14 additions & 2 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -297,7 +298,16 @@ func NewFetch(
// there is one test that covers this part.
if host != nil {
connectedf := func(peer p2p.Peer) {
if f.peers.Add(peer) {
protocols := func() []protocol.ID {
ps, err := host.Peerstore().GetProtocols(peer)
if err != nil {
f.logger.Debug("failed to get protocols for peer",
zap.Stringer("id", peer), zap.Error(err))
return nil
}
return ps
}
if f.peers.Add(peer, protocols) {
f.logger.Debug("adding peer", zap.Stringer("id", peer))
}
}
Expand Down Expand Up @@ -703,7 +713,9 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][]*batc
rng := rand.New(rand.NewChaCha8(seed))
peer2requests := make(map[p2p.Peer][]RequestMessage)

best := f.peers.SelectBest(RedundantPeers)
// When selecting peers, provide protocol IDs so that peers that aren't yet fully
// initialized are not picked for the request, avoiding unnecessary errors.
best := f.peers.SelectBestWithProtocols(RedundantPeers, []protocol.ID{hashProtocol, activeSetProtocol})
if len(best) == 0 {
f.logger.Warn("cannot send batch: no peers found")
f.mu.Lock()
Expand Down
10 changes: 7 additions & 3 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -187,7 +188,9 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) {
f := createFetch(t)
f.cfg.MaxRetriesForRequest = 0
peer := p2p.Peer("buddy")
f.peers.Add(peer)
f.peers.Add(peer, func() []protocol.ID {
return []protocol.ID{hashProtocol, activeSetProtocol}
})

hsh0 := types.RandomHash()
res0 := ResponseMessage{
Expand Down Expand Up @@ -259,8 +262,9 @@ func TestFetch_Loop_BatchRequestMax(t *testing.T) {
f.cfg.BatchTimeout = 1
f.cfg.BatchSize = 2
peer := p2p.Peer("buddy")
f.peers.Add(peer)

f.peers.Add(peer, func() []protocol.ID {
return []protocol.ID{hashProtocol, activeSetProtocol}
})
h1 := types.RandomHash()
h2 := types.RandomHash()
h3 := types.RandomHash()
Expand Down
9 changes: 7 additions & 2 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

p2phost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -137,7 +138,9 @@ func TestFetch_getHashes(t *testing.T) {
f.Start()
tb.Cleanup(f.Stop)
for _, peer := range peers {
f.peers.Add(peer)
f.peers.Add(peer, func() []protocol.ID {
return []protocol.ID{hashProtocol, activeSetProtocol}
})
}
f.mh.EXPECT().ID().Return("self").AnyTimes()
f.RegisterPeerHashes(peers[0], hashes[:2])
Expand Down Expand Up @@ -249,7 +252,9 @@ func TestFetch_getHashesStreaming(t *testing.T) {
f.Start()
tb.Cleanup(f.Stop)
for _, peer := range peers {
f.peers.Add(peer)
f.peers.Add(peer, func() []protocol.ID {
return []protocol.ID{hashProtocol, activeSetProtocol}
})
}
f.mh.EXPECT().ID().Return("self").AnyTimes()
f.RegisterPeerHashes(peers[0], hashes[:2])
Expand Down
34 changes: 32 additions & 2 deletions fetch/peers/peers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package peers

import (
"slices"
"strings"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap/zapcore"

"github.com/spacemeshos/go-spacemesh/p2p"
Expand All @@ -16,6 +18,7 @@ type data struct {
success, failures int
failRate float64
averageLatency float64
protocols func() []protocol.ID
}

func (d *data) latency(global float64) float64 {
Expand Down Expand Up @@ -61,14 +64,14 @@ func (p *Peers) Contains(id peer.ID) bool {
return exist
}

func (p *Peers) Add(id peer.ID) bool {
func (p *Peers) Add(id peer.ID, protocols func() []protocol.ID) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, exist := p.peers[id]
if exist {
return false
}
p.peers[id] = &data{id: id}
p.peers[id] = &data{id: id, protocols: protocols}
return true
}

Expand Down Expand Up @@ -151,12 +154,39 @@ func (p *Peers) SelectBestFrom(peers []peer.ID) peer.ID {
func (p *Peers) SelectBest(n int) []peer.ID {
p.mu.Lock()
defer p.mu.Unlock()
return p.selectBest(n, nil)
}

// SelectBestWithProtocols is similar to SelectBest but filters peers by supported protocols.
// If protocols is empty, it returns the best peers regardless of the protocol.
// If protocols is not empty, it returns the best peers that support at least one of the protocols.
func (p *Peers) SelectBestWithProtocols(n int, protocols []protocol.ID) []peer.ID {
p.mu.Lock()
defer p.mu.Unlock()
return p.selectBest(n, protocols)
}

func (p *Peers) selectBest(n int, protocols []protocol.ID) []peer.ID {
slices.Sort(protocols)
protocols = slices.Compact(protocols)
lth := min(len(p.peers), n)
if lth == 0 {
return nil
}
best := make([]*data, 0, lth)
for _, peer := range p.peers {
if len(protocols) > 0 {
found := false
for _, proto := range peer.protocols() {
if slices.Contains(protocols, proto) {
found = true
break
}
}
if !found {
continue
}
}
for i := range best {
if peer.less(best[i], p.globalLatency) {
best[i], peer = peer, best[i]
Expand Down
71 changes: 70 additions & 1 deletion fetch/peers/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
)

Expand All @@ -22,6 +23,7 @@ type event struct {
success int
failure int
latency time.Duration
protocols []protocol.ID
}

func withEvents(events []event) *Peers {
Expand All @@ -30,7 +32,7 @@ func withEvents(events []event) *Peers {
if ev.delete {
tracker.Delete(ev.id)
} else if ev.add {
tracker.Add(ev.id)
tracker.Add(ev.id, func() []protocol.ID { return ev.protocols })
}
for i := 0; i < ev.failure; i++ {
tracker.OnFailure(ev.id, 0, ev.latency)
Expand Down Expand Up @@ -210,6 +212,73 @@ func TestSelect(t *testing.T) {
}
}

func TestSelectBestWithProtocols(t *testing.T) {
for _, tc := range []struct {
desc string
events []event

n int
protocols []protocol.ID
expect []peer.ID
}{
{
desc: "no protocols required and no peer protocols",
events: []event{
{id: "a", success: 1, latency: 8, add: true},
{id: "b", success: 1, latency: 9, add: true},
{id: "c", success: 3, latency: 14, add: true},
},
n: 2,
expect: []peer.ID{"a", "b"},
protocols: nil,
},
{
desc: "no protocols required, peers have protocols",
events: []event{
{id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}},
{id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}},
{id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}},
},
n: 2,
expect: []peer.ID{"a", "b"},
protocols: nil,
},
{
desc: "single protocol required, peers have protocols",
events: []event{
{id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}},
{id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}},
{id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}},
},
n: 2,
expect: []peer.ID{"b", "c"},
protocols: []protocol.ID{"c"},
},
{
desc: "multiple protocols required, peers have protocols",
events: []event{
{id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}},
{id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}},
{id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}},
{id: "d", success: 3, latency: 12, add: true, protocols: []protocol.ID{"a", "e"}},
},
n: 3,
expect: []peer.ID{"a", "b", "c"},
protocols: []protocol.ID{"b", "c"},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(
t,
tc.expect,
withEvents(tc.events).SelectBestWithProtocols(tc.n, tc.protocols),
"select best %d",
tc.n,
)
})
}
}

func TestTotal(t *testing.T) {
const total = 100
events := []event{}
Expand Down
7 changes: 6 additions & 1 deletion sync2/multipeer/multipeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

Expand All @@ -15,6 +16,10 @@ import (
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
)

const (
Protocol = "sync/2"
)

type syncability struct {
// peers that were probed successfully
syncable []p2p.Peer
Expand Down Expand Up @@ -283,7 +288,7 @@ func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Pe
func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) (full bool, err error) {
var s syncability
for {
syncPeers := mpr.peers.SelectBest(mpr.cfg.SyncPeerCount)
syncPeers := mpr.peers.SelectBestWithProtocols(mpr.cfg.SyncPeerCount, []protocol.ID{Protocol})
mpr.logger.Debug("selected best peers for sync",
zap.Int("syncPeerCount", mpr.cfg.SyncPeerCount),
zap.Int("totalPeers", mpr.peers.Total()),
Expand Down
3 changes: 2 additions & 1 deletion sync2/multipeer/multipeer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -101,7 +102,7 @@ func (mt *multiPeerSyncTester) addPeers(n int) []p2p.Peer {
r := make([]p2p.Peer, n)
for i := 0; i < n; i++ {
p := p2p.Peer(fmt.Sprintf("peer%d", i+1))
mt.peers.Add(p)
mt.peers.Add(p, func() []protocol.ID { return []protocol.ID{multipeer.Protocol} })
r[i] = p
}
return r
Expand Down
3 changes: 2 additions & 1 deletion sync2/multipeer/split_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
gomock "go.uber.org/mock/gomock"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -108,7 +109,7 @@ func newTestSplitSync(t testing.TB) *splitSyncTester {
AnyTimes()
}
for _, p := range tst.syncPeers {
tst.peers.Add(p)
tst.peers.Add(p, func() []protocol.ID { return []protocol.ID{multipeer.Protocol} })
}
tst.splitSync = multipeer.NewSplitSync(
zaptest.NewLogger(t),
Expand Down
6 changes: 5 additions & 1 deletion sync2/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/server"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/sync2/multipeer"
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
)

Expand Down Expand Up @@ -88,7 +90,9 @@ func TestP2P(t *testing.T) {
ps := peers.New()
for m := 0; m < numNodes; m++ {
if m != n {
ps.Add(mesh.Hosts()[m].ID())
ps.Add(mesh.Hosts()[m].ID(), func() []protocol.ID {
return []protocol.ID{multipeer.Protocol}
})
}
}
cfg := sync2.DefaultConfig()
Expand Down

0 comments on commit 30880eb

Please sign in to comment.