Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(i): Make peer connections deterministic #2888

Merged
merged 7 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
package cli

import (
"fmt"
"os"
"os/signal"
"syscall"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/sourcenetwork/immutable"
"github.com/sourcenetwork/sourcehub/sdk"
"github.com/spf13/cobra"
Expand All @@ -26,7 +24,6 @@
"github.com/sourcenetwork/defradb/internal/db"
"github.com/sourcenetwork/defradb/keyring"
"github.com/sourcenetwork/defradb/net"
netutils "github.com/sourcenetwork/defradb/net/utils"
"github.com/sourcenetwork/defradb/node"
)

Expand All @@ -49,29 +46,20 @@
RunE: func(cmd *cobra.Command, args []string) error {
cfg := mustGetContextConfig(cmd)

var peers []peer.AddrInfo
if val := cfg.GetStringSlice("net.peers"); len(val) > 0 {
addrs, err := netutils.ParsePeers(val)
if err != nil {
return errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %s", val), err)
}
peers = addrs
}

opts := []node.Option{
node.WithStorePath(cfg.GetString("datastore.badger.path")),
node.WithBadgerInMemory(cfg.GetString("datastore.store") == configStoreMemory),
node.WithDisableP2P(cfg.GetBool("net.p2pDisabled")),
node.WithSourceHubChainID(cfg.GetString("acp.sourceHub.ChainID")),
node.WithSourceHubGRPCAddress(cfg.GetString("acp.sourceHub.GRPCAddress")),
node.WithSourceHubCometRPCAddress(cfg.GetString("acp.sourceHub.CometRPCAddress")),
node.WithPeers(peers...),
// db options
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
// net node options
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
net.WithEnablePubSub(cfg.GetBool("net.pubSubEnabled")),
net.WithEnableRelay(cfg.GetBool("net.relayEnabled")),
net.WithBootstrapPeers(cfg.GetStringSlice("net.peers")...),

Check warning on line 62 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L62

Added line #L62 was not covered by tests
// http server options
http.WithAddress(cfg.GetString("api.address")),
http.WithAllowedOrigins(cfg.GetStringSlice("api.allowed-origins")...),
Expand Down
7 changes: 0 additions & 7 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package event

import (
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -30,8 +29,6 @@ const (
UpdateName = Name("update")
// PubSubName is the name of the network pubsub event.
PubSubName = Name("pubsub")
// PeerName is the name of the network connect event.
PeerName = Name("peer")
// P2PTopicName is the name of the network p2p topic update event.
P2PTopicName = Name("p2p-topic")
// PeerInfoName is the name of the network peer info event.
Expand All @@ -44,10 +41,6 @@ const (
ReplicatorCompletedName = Name("replicator-completed")
)

// Peer is an event that is published when
// a peer connection has changed status.
type Peer = event.EvtPeerConnectednessChanged

// PubSub is an event that is published when
// a pubsub message has been received from a remote peer.
type PubSub struct {
Expand Down
6 changes: 3 additions & 3 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (

// pushLog creates a pushLog request and sends it to another node
// over libp2p grpc connection
func (s *server) pushLog(ctx context.Context, evt event.Update, pid peer.ID) error {
func (s *server) pushLog(evt event.Update, pid peer.ID) error {
body := &pb.PushLogRequest_Body{
DocID: []byte(evt.DocID),
Cid: evt.Cid.Bytes(),
Expand All @@ -50,10 +50,10 @@ func (s *server) pushLog(ctx context.Context, evt event.Update, pid peer.ID) err
return NewErrPushLog(err)
}

cctx, cancel := context.WithTimeout(ctx, PushTimeout)
ctx, cancel := context.WithTimeout(context.Background(), PushTimeout)
defer cancel()

if _, err := client.PushLog(cctx, req); err != nil {
if _, err := client.PushLog(ctx, req); err != nil {
return NewErrPushLog(
err,
errors.NewKV("CID", evt.Cid),
Expand Down
6 changes: 3 additions & 3 deletions net/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestPushlogWithDialFailure(t *testing.T) {
grpc.WithCredentialsBundle(nil),
)

err = p.server.pushLog(ctx, event.Update{
err = p.server.pushLog(event.Update{
DocID: id.String(),
Cid: cid,
SchemaRoot: "test",
Expand All @@ -86,7 +86,7 @@ func TestPushlogWithInvalidPeerID(t *testing.T) {
cid, err := createCID(doc)
require.NoError(t, err)

err = p.server.pushLog(ctx, event.Update{
err = p.server.pushLog(event.Update{
DocID: id.String(),
Cid: cid,
SchemaRoot: "test",
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestPushlogW_WithValidPeerID_NoError(t *testing.T) {
b, err := db1.Blockstore().AsIPLDStorage().Get(ctx, headCID.KeyString())
require.NoError(t, err)

err = p1.server.pushLog(ctx, event.Update{
err = p1.server.pushLog(event.Update{
DocID: doc.ID().String(),
Cid: headCID,
SchemaRoot: col.SchemaRoot(),
Expand Down
8 changes: 8 additions & 0 deletions net/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Options struct {
EnableRelay bool
GRPCServerOptions []grpc.ServerOption
GRPCDialOptions []grpc.DialOption
BootstrapPeers []string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Sorry if I've misread, but I can't spot where this is actually read - is it dead code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used here:

peers := make([]peer.AddrInfo, len(options.BootstrapPeers))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cheers!

}

// DefaultOptions returns the default net options.
Expand Down Expand Up @@ -64,3 +65,10 @@ func WithListenAddresses(addresses ...string) NodeOpt {
opt.ListenAddresses = addresses
}
}

// WithBootstrapPeers sets the bootstrap peer addresses to attempt to connect to.
func WithBootstrapPeers(peers ...string) NodeOpt {
return func(opt *Options) {
opt.BootstrapPeers = peers
}
}
Comment on lines +69 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: All other config functions are tested in this file except this one. Please add a test if not too much hassle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a test here

12 changes: 12 additions & 0 deletions net/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,15 @@ func TestWithEnablePubSub(t *testing.T) {
WithEnablePubSub(true)(opts)
assert.Equal(t, true, opts.EnablePubSub)
}

func TestWithBootstrapPeers(t *testing.T) {
opts := &Options{}
WithBootstrapPeers("/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ")(opts)
assert.ElementsMatch(t, []string{"/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"}, opts.BootstrapPeers)
}

func TestWithPrivateKey(t *testing.T) {
opts := &Options{}
WithPrivateKey([]byte("abc"))(opts)
assert.Equal(t, []byte("abc"), opts.PrivateKey)
}
47 changes: 18 additions & 29 deletions net/dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

netutils "github.com/sourcenetwork/defradb/net/utils"
)

func TestDial_WithConnectedPeer_NoError(t *testing.T) {
Expand All @@ -28,27 +26,24 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) {
ctx := context.Background()
n1, err := NewPeer(
ctx,
db1.Rootstore(),
db1.Blockstore(),
db1.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewPeer(
ctx,
db2.Rootstore(),
db2.Blockstore(),
db2.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()})
if err != nil {
t.Fatal(err)
}
n2.Bootstrap(addrs)

err = n2.Connect(ctx, n1.PeerInfo())
require.NoError(t, err)

_, err = n1.server.dial(n2.PeerID())
require.NoError(t, err)
}
Expand All @@ -61,27 +56,24 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) {
ctx := context.Background()
n1, err := NewPeer(
ctx,
db1.Rootstore(),
db1.Blockstore(),
db1.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewPeer(
ctx,
db2.Rootstore(),
db2.Blockstore(),
db2.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()})
if err != nil {
t.Fatal(err)
}
n2.Bootstrap(addrs)

err = n2.Connect(ctx, n1.PeerInfo())
require.NoError(t, err)

_, err = n1.server.dial(n2.PeerID())
require.NoError(t, err)

Expand All @@ -97,27 +89,24 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing
ctx := context.Background()
n1, err := NewPeer(
ctx,
db1.Rootstore(),
db1.Blockstore(),
db1.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewPeer(
ctx,
db2.Rootstore(),
db2.Blockstore(),
db2.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()})
if err != nil {
t.Fatal(err)
}
n2.Bootstrap(addrs)

err = n2.Connect(ctx, n1.PeerInfo())
require.NoError(t, err)

_, err = n1.server.dial(n2.PeerID())
require.NoError(t, err)

Expand Down
72 changes: 72 additions & 0 deletions net/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package net

import (
"context"
"time"

libp2p "github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
dualdht "github.com/libp2p/go-libp2p-kad-dht/dual"
record "github.com/libp2p/go-libp2p-record"
libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
)

// setupHost returns a host and router configured with the given options.
func setupHost(ctx context.Context, options *Options) (host.Host, *dualdht.DHT, error) {
connManager, err := connmgr.NewConnManager(100, 400, connmgr.WithGracePeriod(time.Second*20))
if err != nil {
return nil, nil, err

Check warning on line 31 in net/host.go

View check run for this annotation

Codecov / codecov/patch

net/host.go#L31

Added line #L31 was not covered by tests
}

dhtOpts := []dualdht.Option{
dualdht.DHTOption(dht.NamespacedValidator("pk", record.PublicKeyValidator{})),
dualdht.DHTOption(dht.Concurrency(10)),
dualdht.DHTOption(dht.Mode(dht.ModeAuto)),
}

var ddht *dualdht.DHT
routing := func(h host.Host) (routing.PeerRouting, error) {
ddht, err = dualdht.New(ctx, h, dhtOpts...)
return ddht, err
}

libp2pOpts := []libp2p.Option{
libp2p.ConnectionManager(connManager),
libp2p.DefaultTransports,
libp2p.ListenAddrStrings(options.ListenAddresses...),
libp2p.Routing(routing),
}

// relay is enabled by default unless explicitly disabled
if !options.EnableRelay {
libp2pOpts = append(libp2pOpts, libp2p.DisableRelay())
}

// use the private key from options or generate a random one
if options.PrivateKey != nil {
privateKey, err := libp2pCrypto.UnmarshalEd25519PrivateKey(options.PrivateKey)
if err != nil {
return nil, nil, err

Check warning on line 62 in net/host.go

View check run for this annotation

Codecov / codecov/patch

net/host.go#L62

Added line #L62 was not covered by tests
}
libp2pOpts = append(libp2pOpts, libp2p.Identity(privateKey))
}

h, err := libp2p.New(libp2pOpts...)
if err != nil {
return nil, nil, err

Check warning on line 69 in net/host.go

View check run for this annotation

Codecov / codecov/patch

net/host.go#L69

Added line #L69 was not covered by tests
}
return h, ddht, nil
}
29 changes: 29 additions & 0 deletions net/host_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package net

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestSetupHostWithDefaultOptions(t *testing.T) {
h, dht, err := setupHost(context.Background(), DefaultOptions())
require.NoError(t, err)

require.NotNil(t, h)
require.NotNil(t, dht)

err = h.Close()
require.NoError(t, err)
}
Loading
Loading