Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(i): Make peer connections deterministic #2888

Merged
merged 7 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
package cli

import (
"fmt"
"os"
"os/signal"
"syscall"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/sourcenetwork/immutable"
"github.com/sourcenetwork/sourcehub/sdk"
"github.com/spf13/cobra"
Expand All @@ -26,7 +24,6 @@
"github.com/sourcenetwork/defradb/internal/db"
"github.com/sourcenetwork/defradb/keyring"
"github.com/sourcenetwork/defradb/net"
netutils "github.com/sourcenetwork/defradb/net/utils"
"github.com/sourcenetwork/defradb/node"
)

Expand All @@ -49,29 +46,20 @@
RunE: func(cmd *cobra.Command, args []string) error {
cfg := mustGetContextConfig(cmd)

var peers []peer.AddrInfo
if val := cfg.GetStringSlice("net.peers"); len(val) > 0 {
addrs, err := netutils.ParsePeers(val)
if err != nil {
return errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %s", val), err)
}
peers = addrs
}

opts := []node.Option{
node.WithStorePath(cfg.GetString("datastore.badger.path")),
node.WithBadgerInMemory(cfg.GetString("datastore.store") == configStoreMemory),
node.WithDisableP2P(cfg.GetBool("net.p2pDisabled")),
node.WithSourceHubChainID(cfg.GetString("acp.sourceHub.ChainID")),
node.WithSourceHubGRPCAddress(cfg.GetString("acp.sourceHub.GRPCAddress")),
node.WithSourceHubCometRPCAddress(cfg.GetString("acp.sourceHub.CometRPCAddress")),
node.WithPeers(peers...),
// db options
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
// net node options
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
net.WithEnablePubSub(cfg.GetBool("net.pubSubEnabled")),
net.WithEnableRelay(cfg.GetBool("net.relayEnabled")),
net.WithBootstrapPeers(cfg.GetStringSlice("net.peers")...),

Check warning on line 62 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L62

Added line #L62 was not covered by tests
// http server options
http.WithAddress(cfg.GetString("api.address")),
http.WithAllowedOrigins(cfg.GetStringSlice("api.allowed-origins")...),
Expand Down
7 changes: 0 additions & 7 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package event

import (
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -30,8 +29,6 @@ const (
UpdateName = Name("update")
// PubSubName is the name of the network pubsub event.
PubSubName = Name("pubsub")
// PeerName is the name of the network connect event.
PeerName = Name("peer")
// P2PTopicName is the name of the network p2p topic update event.
P2PTopicName = Name("p2p-topic")
// PeerInfoName is the name of the network peer info event.
Expand All @@ -44,10 +41,6 @@ const (
ReplicatorCompletedName = Name("replicator-completed")
)

// Peer is an event that is published when
// a peer connection has changed status.
type Peer = event.EvtPeerConnectednessChanged

// PubSub is an event that is published when
// a pubsub message has been received from a remote peer.
type PubSub struct {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ require (
github.com/hashicorp/go-safetemp v1.0.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/arc/v2 v2.0.7 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
Expand Down
8 changes: 1 addition & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,6 @@ github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMb
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4=
github.com/99designs/keyring v1.2.2 h1:pZd3neh/EmUzWONb35LxQfvuY7kiSXAq3HQd97+XBn0=
github.com/99designs/keyring v1.2.2/go.mod h1:wes/FrByc8j7lFOAGLGSNEg8f/PaI3cgTBqhFkHUrPk=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down Expand Up @@ -442,7 +440,6 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f h1:U5y3Y5UE0w7amNe7Z5G/twsBW0KEalRQXZzf8ufSh9I=
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f/go.mod h1:xH/i4TFMt8koVQZ6WFms69WAsDWr2XsYL3Hkl7jkoLE=
github.com/dgraph-io/badger v1.6.2 h1:mNw0qs90GVgGGWylh0umH5iag1j6n/PeJtNvL6KY/x8=
github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrVH//y2UQE=
github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o=
github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk=
github.com/dgraph-io/badger/v3 v3.2011.1 h1:Hmyof0WMEF/QtutX5SQHzIMnJQxb/IrSzhjckV2SD6g=
Expand Down Expand Up @@ -869,10 +866,6 @@ github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0M
github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1Ro=
github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupVCGm4QUIek=
github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo=
github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q=
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk=
github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ=
Expand Down Expand Up @@ -908,6 +901,7 @@ github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:ej/GTRX+HjlHMs/M3zg9fM8mUlQXgHqRvPJjtp+atHw=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk=
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
Expand Down
6 changes: 3 additions & 3 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (

// pushLog creates a pushLog request and sends it to another node
// over libp2p grpc connection
func (s *server) pushLog(ctx context.Context, evt event.Update, pid peer.ID) error {
func (s *server) pushLog(evt event.Update, pid peer.ID) error {
body := &pb.PushLogRequest_Body{
DocID: []byte(evt.DocID),
Cid: evt.Cid.Bytes(),
Expand All @@ -50,10 +50,10 @@ func (s *server) pushLog(ctx context.Context, evt event.Update, pid peer.ID) err
return NewErrPushLog(err)
}

cctx, cancel := context.WithTimeout(ctx, PushTimeout)
ctx, cancel := context.WithTimeout(s.peer.ctx, PushTimeout)
defer cancel()

if _, err := client.PushLog(cctx, req); err != nil {
if _, err := client.PushLog(ctx, req); err != nil {
return NewErrPushLog(
err,
errors.NewKV("CID", evt.Cid),
Expand Down
8 changes: 3 additions & 5 deletions net/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestPushlogWithDialFailure(t *testing.T) {
grpc.WithCredentialsBundle(nil),
)

err = p.server.pushLog(ctx, event.Update{
err = p.server.pushLog(event.Update{
DocID: id.String(),
Cid: cid,
SchemaRoot: "test",
Expand All @@ -86,7 +86,7 @@ func TestPushlogWithInvalidPeerID(t *testing.T) {
cid, err := createCID(doc)
require.NoError(t, err)

err = p.server.pushLog(ctx, event.Update{
err = p.server.pushLog(event.Update{
DocID: id.String(),
Cid: cid,
SchemaRoot: "test",
Expand All @@ -100,11 +100,9 @@ func TestPushlogW_WithValidPeerID_NoError(t *testing.T) {
db1, p1 := newTestPeer(ctx, t)
defer db1.Close()
defer p1.Close()
p1.Start()
db2, p2 := newTestPeer(ctx, t)
defer p2.Close()
defer db2.Close()
p2.Start()

err := p1.host.Connect(ctx, p2.PeerInfo())
require.NoError(t, err)
Expand Down Expand Up @@ -139,7 +137,7 @@ func TestPushlogW_WithValidPeerID_NoError(t *testing.T) {
b, err := db1.Blockstore().AsIPLDStorage().Get(ctx, headCID.KeyString())
require.NoError(t, err)

err = p1.server.pushLog(ctx, event.Update{
err = p1.server.pushLog(event.Update{
DocID: doc.ID().String(),
Cid: headCID,
SchemaRoot: col.SchemaRoot(),
Expand Down
8 changes: 8 additions & 0 deletions net/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Options struct {
EnableRelay bool
GRPCServerOptions []grpc.ServerOption
GRPCDialOptions []grpc.DialOption
BootstrapPeers []string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Sorry if I've misread, but I can't spot where this is actually read - is it dead code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used here:

peers := make([]peer.AddrInfo, len(options.BootstrapPeers))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cheers!

}

// DefaultOptions returns the default net options.
Expand Down Expand Up @@ -64,3 +65,10 @@ func WithListenAddresses(addresses ...string) NodeOpt {
opt.ListenAddresses = addresses
}
}

// WithBootstrapPeers sets the bootstrap peer addresses to attempt to connect to.
func WithBootstrapPeers(peers ...string) NodeOpt {
return func(opt *Options) {
opt.BootstrapPeers = peers
}
}
Comment on lines +69 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: All other config functions are tested in this file except this one. Please add a test if not too much hassle.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a test here

12 changes: 12 additions & 0 deletions net/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,15 @@ func TestWithEnablePubSub(t *testing.T) {
WithEnablePubSub(true)(opts)
assert.Equal(t, true, opts.EnablePubSub)
}

func TestWithBootstrapPeers(t *testing.T) {
opts := &Options{}
WithBootstrapPeers("/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ")(opts)
assert.ElementsMatch(t, []string{"/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"}, opts.BootstrapPeers)
}

func TestWithPrivateKey(t *testing.T) {
opts := &Options{}
WithPrivateKey([]byte("abc"))(opts)
assert.Equal(t, []byte("abc"), opts.PrivateKey)
}
47 changes: 18 additions & 29 deletions net/dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

netutils "github.com/sourcenetwork/defradb/net/utils"
)

func TestDial_WithConnectedPeer_NoError(t *testing.T) {
Expand All @@ -28,27 +26,24 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) {
ctx := context.Background()
n1, err := NewPeer(
ctx,
db1.Rootstore(),
db1.Blockstore(),
db1.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewPeer(
ctx,
db2.Rootstore(),
db2.Blockstore(),
db2.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()})
if err != nil {
t.Fatal(err)
}
n2.Bootstrap(addrs)

err = n2.Connect(ctx, n1.PeerInfo())
require.NoError(t, err)

_, err = n1.server.dial(n2.PeerID())
require.NoError(t, err)
}
Expand All @@ -61,27 +56,24 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) {
ctx := context.Background()
n1, err := NewPeer(
ctx,
db1.Rootstore(),
db1.Blockstore(),
db1.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewPeer(
ctx,
db2.Rootstore(),
db2.Blockstore(),
db2.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()})
if err != nil {
t.Fatal(err)
}
n2.Bootstrap(addrs)

err = n2.Connect(ctx, n1.PeerInfo())
require.NoError(t, err)

_, err = n1.server.dial(n2.PeerID())
require.NoError(t, err)

Expand All @@ -97,27 +89,24 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing
ctx := context.Background()
n1, err := NewPeer(
ctx,
db1.Rootstore(),
db1.Blockstore(),
db1.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewPeer(
ctx,
db2.Rootstore(),
db2.Blockstore(),
db2.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()})
if err != nil {
t.Fatal(err)
}
n2.Bootstrap(addrs)

err = n2.Connect(ctx, n1.PeerInfo())
require.NoError(t, err)

_, err = n1.server.dial(n2.PeerID())
require.NoError(t, err)

Expand Down
Loading
Loading