Skip to content

Commit

Permalink
sync2: implement multi-peer synchronization
Browse files Browse the repository at this point in the history
This adds multi-peer synchronization support.
When the local set differs too much from the remote sets,
"torrent-style" "split sync" is attempted which splits the set into
subranges and syncs each sub-range against a separate peer.
Otherwise, the full sync is done, syncing the whole set against
each of the synchronization peers.
Full sync is also done after each split sync run.
The local set can be considered synchronized after the specified
number of full syncs has happened.

The approach is loosely based on [SREP: Out-Of-Band Sync of
Transaction Pools for Large-Scale
Blockchains](https://people.bu.edu/staro/2023-ICBC-Novak.pdf) paper by
Novak Boškov, Sevval Simsek, Ari Trachtenberg, and David Starobinski.
  • Loading branch information
ivan4th committed Oct 23, 2024
1 parent 86b9591 commit ef30f47
Show file tree
Hide file tree
Showing 22 changed files with 3,877 additions and 12 deletions.
7 changes: 7 additions & 0 deletions fetch/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ type Peers struct {
globalLatency float64
}

func (p *Peers) Contains(id peer.ID) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, exist := p.peers[id]
return exist
}

func (p *Peers) Add(id peer.ID) bool {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
23 changes: 21 additions & 2 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,30 @@ func WithRequestsPerInterval(n int, interval time.Duration) Opt {
}
}

// WithDecayingTag specifies P2P decaying tag that is applied to the peer when a request
// is being served.
func WithDecayingTag(tag DecayingTagSpec) Opt {
return func(s *Server) {
s.decayingTagSpec = &tag
}
}

type peerIDKey struct{}

func withPeerID(ctx context.Context, peerID peer.ID) context.Context {
return context.WithValue(ctx, peerIDKey{}, peerID)
}

// ContextPeerID retrieves the ID of the peer being served from the context and a boolean
// value indicating that the context contains peer ID. If there's no peer ID associated
// with the context, the function returns an empty peer ID and false.
func ContextPeerID(ctx context.Context) (peer.ID, bool) {
if v := ctx.Value(peerIDKey{}); v != nil {
return v.(peer.ID), true
}
return peer.ID(""), false
}

// Handler is a handler to be defined by the application.
type Handler func(context.Context, []byte) ([]byte, error)

Expand Down Expand Up @@ -264,7 +282,8 @@ func (s *Server) Run(ctx context.Context) error {
eg.Wait()
return nil
}
ctx, cancel := context.WithCancel(ctx)
peer := req.stream.Conn().RemotePeer()
ctx, cancel := context.WithCancel(withPeerID(ctx, peer))
eg.Go(func() error {
<-ctx.Done()
s.sem.Release(1)
Expand All @@ -275,7 +294,7 @@ func (s *Server) Run(ctx context.Context) error {
defer cancel()
conn := req.stream.Conn()
if s.decayingTag != nil {
s.decayingTag.Bump(conn.RemotePeer(), s.decayingTagSpec.Inc)
s.decayingTag.Bump(peer, s.decayingTagSpec.Inc)
}
ok := s.queueHandler(ctx, req.stream)
duration := time.Since(req.received)
Expand Down
16 changes: 12 additions & 4 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/spacemeshos/go-scale/tester"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -45,8 +46,10 @@ func TestServer(t *testing.T) {
request := []byte("test request")
testErr := errors.New("test error")

handler := func(_ context.Context, msg []byte) ([]byte, error) {
return msg, nil
handler := func(ctx context.Context, msg []byte) ([]byte, error) {
peerID, found := ContextPeerID(ctx)
require.True(t, found)
return append(msg, []byte(peerID)...), nil
}
errhandler := func(_ context.Context, _ []byte) ([]byte, error) {
return nil, testErr
Expand Down Expand Up @@ -81,6 +84,9 @@ func TestServer(t *testing.T) {
append(opts, WithRequestSizeLimit(limit))...,
)
ctx, cancel := context.WithCancel(context.Background())
noPeerID, found := ContextPeerID(ctx)
require.Equal(t, peer.ID(""), noPeerID)
require.False(t, found)
var eg errgroup.Group
eg.Go(func() error {
return srv1.Run(ctx)
Expand Down Expand Up @@ -109,7 +115,8 @@ func TestServer(t *testing.T) {
srvID := mesh.Hosts()[1].ID()
response, err := client.Request(ctx, srvID, request)
require.NoError(t, err)
require.Equal(t, request, response)
expResponse := append(request, []byte(mesh.Hosts()[0].ID())...)
require.Equal(t, expResponse, response)
srvConns := mesh.Hosts()[1].Network().ConnsToPeer(mesh.Hosts()[0].ID())
require.NotEmpty(t, srvConns)
require.Equal(t, n+1, srv1.NumAcceptedRequests())
Expand All @@ -129,7 +136,8 @@ func TestServer(t *testing.T) {
srvID := mesh.Hosts()[3].ID()
response, err := client.Request(ctx, srvID, request)
require.NoError(t, err)
require.Equal(t, request, response)
expResponse := append(request, []byte(mesh.Hosts()[0].ID())...)
require.Equal(t, expResponse, response)
srvConns := mesh.Hosts()[3].Network().ConnsToPeer(mesh.Hosts()[0].ID())
require.NotEmpty(t, srvConns)
require.Equal(t, n+1, srv1.NumAcceptedRequests())
Expand Down
22 changes: 22 additions & 0 deletions sync2/multipeer/delim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package multipeer

import (
"encoding/binary"

"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
)

func getDelimiters(numPeers, keyLen, maxDepth int) (h []rangesync.KeyBytes) {
if numPeers < 2 {
return nil
}
mask := uint64(0xffffffffffffffff) << (64 - maxDepth)
inc := (uint64(0x80) << 56) / uint64(numPeers)
h = make([]rangesync.KeyBytes, numPeers-1)
for i, v := 0, uint64(0); i < numPeers-1; i++ {
h[i] = make(rangesync.KeyBytes, keyLen)
v += inc
binary.BigEndian.PutUint64(h[i], (v<<1)&mask)
}
return h
}
105 changes: 105 additions & 0 deletions sync2/multipeer/delim_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package multipeer_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/spacemeshos/go-spacemesh/sync2/multipeer"
)

func TestGetDelimiters(t *testing.T) {
for _, tc := range []struct {
numPeers int
keyLen int
maxDepth int
values []string
}{
{
numPeers: 0,
maxDepth: 64,
keyLen: 32,
values: nil,
},
{
numPeers: 1,
maxDepth: 64,
keyLen: 32,
values: nil,
},
{
numPeers: 2,
maxDepth: 64,
keyLen: 32,
values: []string{
"8000000000000000000000000000000000000000000000000000000000000000",
},
},
{
numPeers: 2,
maxDepth: 24,
keyLen: 32,
values: []string{
"8000000000000000000000000000000000000000000000000000000000000000",
},
},
{
numPeers: 3,
maxDepth: 64,
keyLen: 32,
values: []string{
"5555555555555554000000000000000000000000000000000000000000000000",
"aaaaaaaaaaaaaaa8000000000000000000000000000000000000000000000000",
},
},
{
numPeers: 3,
maxDepth: 24,
keyLen: 32,
values: []string{
"5555550000000000000000000000000000000000000000000000000000000000",
"aaaaaa0000000000000000000000000000000000000000000000000000000000",
},
},
{
numPeers: 3,
maxDepth: 4,
keyLen: 32,
values: []string{
"5000000000000000000000000000000000000000000000000000000000000000",
"a000000000000000000000000000000000000000000000000000000000000000",
},
},
{
numPeers: 4,
maxDepth: 64,
keyLen: 32,
values: []string{
"4000000000000000000000000000000000000000000000000000000000000000",
"8000000000000000000000000000000000000000000000000000000000000000",
"c000000000000000000000000000000000000000000000000000000000000000",
},
},
{
numPeers: 4,
maxDepth: 24,
keyLen: 32,
values: []string{
"4000000000000000000000000000000000000000000000000000000000000000",
"8000000000000000000000000000000000000000000000000000000000000000",
"c000000000000000000000000000000000000000000000000000000000000000",
},
},
} {
ds := multipeer.GetDelimiters(tc.numPeers, tc.keyLen, tc.maxDepth)
var hs []string
for _, d := range ds {
hs = append(hs, d.String())
}
if len(tc.values) == 0 {
require.Empty(t, hs, "%d delimiters", tc.numPeers)
} else {
require.Equal(t, tc.values, hs, "%d delimiters", tc.numPeers)
}
}
}
58 changes: 58 additions & 0 deletions sync2/multipeer/dumbset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package multipeer

import (
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
)

// DumbSet is an unoptimized OrderedSet to be used for testing purposes.
// It builds on rangesync.DumbSet.
type DumbSet struct {
*rangesync.DumbSet
}

var _ OrderedSet = &DumbSet{}

// NewDumbHashSet creates an unoptimized OrderedSet to be used for testing purposes.
// If disableReAdd is true, receiving the same item multiple times will fail.
func NewDumbHashSet() *DumbSet {
return &DumbSet{
DumbSet: &rangesync.DumbSet{},
}
}

// Advance implements OrderedSet.
func (ds *DumbSet) EnsureLoaded() error {
return nil
}

// Advance implements OrderedSet.
func (ds *DumbSet) Advance() error {
return nil
}

// Has implements OrderedSet.
func (ds *DumbSet) Has(k rangesync.KeyBytes) (bool, error) {
var first rangesync.KeyBytes
sr := ds.Items()
for cur := range sr.Seq {
if first == nil {
first = cur
} else if first.Compare(cur) == 0 {
return false, sr.Error()
}
if k.Compare(cur) == 0 {
return true, sr.Error()
}
}
return false, sr.Error()
}

// Copy implements OrderedSet.
func (ds *DumbSet) Copy(syncScope bool) rangesync.OrderedSet {
return &DumbSet{ds.DumbSet.Copy(syncScope).(*rangesync.DumbSet)}
}

// Release implements OrderedSet.
func (ds *DumbSet) Release() error {
return nil
}
25 changes: 25 additions & 0 deletions sync2/multipeer/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package multipeer

import (
"context"

"github.com/spacemeshos/go-spacemesh/p2p"
)

type (
SyncRunner = syncRunner
SplitSync = splitSync
)

var (
WithSyncRunner = withSyncRunner
WithClock = withClock
GetDelimiters = getDelimiters
NewSyncQueue = newSyncQueue
NewSplitSync = newSplitSync
NewSyncList = newSyncList
)

func (mpr *MultiPeerReconciler) FullSync(ctx context.Context, syncPeers []p2p.Peer) error {
return mpr.fullSync(ctx, syncPeers)
}
Loading

0 comments on commit ef30f47

Please sign in to comment.