Skip to content

Commit

Permalink
fix(i): Make peer connections deterministic (#2888)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #2847
Resolves #1902

## Description

This PR fixes an issue where peer connections were not deterministic
within the test framework.

There's also a few other areas that were cleaned up:
- Bootstrap has been replaced with
https://github.com/ipfs/boxo/blob/main/bootstrap/bootstrap.go
- Host setup logic was moved to `net/host.go`
- Persistent peer store was replaced with in memory only version
- Context reuse within the `net` package has been refactored
- Removed unused peer event

## Tasks

- [x] I made sure the code is well commented, particularly
hard-to-understand areas.
- [x] I made sure the repository-held documentation is changed
accordingly.
- [x] I made sure the pull request title adheres to the conventional
commit style (the subset used in the project can be found in
[tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)).
- [x] I made sure to discuss its limitations such as threats to
validity, vulnerability to mistake and misuse, robustness to
invalidation of assumptions, resource requirements, ...

## How has this been tested?

`make test`

Specify the platform(s) on which this was tested:
- MacOS
  • Loading branch information
nasdf authored Aug 26, 2024
1 parent 495c456 commit 9a228d2
Show file tree
Hide file tree
Showing 24 changed files with 313 additions and 707 deletions.
14 changes: 1 addition & 13 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
package cli

import (
"fmt"
"os"
"os/signal"
"syscall"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/sourcenetwork/immutable"
"github.com/sourcenetwork/sourcehub/sdk"
"github.com/spf13/cobra"
Expand All @@ -26,7 +24,6 @@ import (
"github.com/sourcenetwork/defradb/internal/db"
"github.com/sourcenetwork/defradb/keyring"
"github.com/sourcenetwork/defradb/net"
netutils "github.com/sourcenetwork/defradb/net/utils"
"github.com/sourcenetwork/defradb/node"
)

Expand All @@ -49,29 +46,20 @@ func MakeStartCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
cfg := mustGetContextConfig(cmd)

var peers []peer.AddrInfo
if val := cfg.GetStringSlice("net.peers"); len(val) > 0 {
addrs, err := netutils.ParsePeers(val)
if err != nil {
return errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %s", val), err)
}
peers = addrs
}

opts := []node.Option{
node.WithStorePath(cfg.GetString("datastore.badger.path")),
node.WithBadgerInMemory(cfg.GetString("datastore.store") == configStoreMemory),
node.WithDisableP2P(cfg.GetBool("net.p2pDisabled")),
node.WithSourceHubChainID(cfg.GetString("acp.sourceHub.ChainID")),
node.WithSourceHubGRPCAddress(cfg.GetString("acp.sourceHub.GRPCAddress")),
node.WithSourceHubCometRPCAddress(cfg.GetString("acp.sourceHub.CometRPCAddress")),
node.WithPeers(peers...),
// db options
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
// net node options
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
net.WithEnablePubSub(cfg.GetBool("net.pubSubEnabled")),
net.WithEnableRelay(cfg.GetBool("net.relayEnabled")),
net.WithBootstrapPeers(cfg.GetStringSlice("net.peers")...),
// http server options
http.WithAddress(cfg.GetString("api.address")),
http.WithAllowedOrigins(cfg.GetStringSlice("api.allowed-origins")...),
Expand Down
7 changes: 0 additions & 7 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package event

import (
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -30,8 +29,6 @@ const (
UpdateName = Name("update")
// PubSubName is the name of the network pubsub event.
PubSubName = Name("pubsub")
// PeerName is the name of the network connect event.
PeerName = Name("peer")
// P2PTopicName is the name of the network p2p topic update event.
P2PTopicName = Name("p2p-topic")
// PeerInfoName is the name of the network peer info event.
Expand All @@ -44,10 +41,6 @@ const (
ReplicatorCompletedName = Name("replicator-completed")
)

// Peer is an event that is published when
// a peer connection has changed status.
type Peer = event.EvtPeerConnectednessChanged

// PubSub is an event that is published when
// a pubsub message has been received from a remote peer.
type PubSub struct {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ require (
github.com/hashicorp/go-safetemp v1.0.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/hashicorp/golang-lru/arc/v2 v2.0.7 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
Expand Down
8 changes: 1 addition & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,6 @@ github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMb
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4=
github.com/99designs/keyring v1.2.2 h1:pZd3neh/EmUzWONb35LxQfvuY7kiSXAq3HQd97+XBn0=
github.com/99designs/keyring v1.2.2/go.mod h1:wes/FrByc8j7lFOAGLGSNEg8f/PaI3cgTBqhFkHUrPk=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down Expand Up @@ -442,7 +440,6 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f h1:U5y3Y5UE0w7amNe7Z5G/twsBW0KEalRQXZzf8ufSh9I=
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f/go.mod h1:xH/i4TFMt8koVQZ6WFms69WAsDWr2XsYL3Hkl7jkoLE=
github.com/dgraph-io/badger v1.6.2 h1:mNw0qs90GVgGGWylh0umH5iag1j6n/PeJtNvL6KY/x8=
github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrVH//y2UQE=
github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o=
github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk=
github.com/dgraph-io/badger/v3 v3.2011.1 h1:Hmyof0WMEF/QtutX5SQHzIMnJQxb/IrSzhjckV2SD6g=
Expand Down Expand Up @@ -869,10 +866,6 @@ github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0M
github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1Ro=
github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupVCGm4QUIek=
github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo=
github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q=
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk=
github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ=
Expand Down Expand Up @@ -908,6 +901,7 @@ github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:ej/GTRX+HjlHMs/M3zg9fM8mUlQXgHqRvPJjtp+atHw=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk=
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
Expand Down
6 changes: 3 additions & 3 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (

// pushLog creates a pushLog request and sends it to another node
// over libp2p grpc connection
func (s *server) pushLog(ctx context.Context, evt event.Update, pid peer.ID) error {
func (s *server) pushLog(evt event.Update, pid peer.ID) error {
body := &pb.PushLogRequest_Body{
DocID: []byte(evt.DocID),
Cid: evt.Cid.Bytes(),
Expand All @@ -50,10 +50,10 @@ func (s *server) pushLog(ctx context.Context, evt event.Update, pid peer.ID) err
return NewErrPushLog(err)
}

cctx, cancel := context.WithTimeout(ctx, PushTimeout)
ctx, cancel := context.WithTimeout(s.peer.ctx, PushTimeout)
defer cancel()

if _, err := client.PushLog(cctx, req); err != nil {
if _, err := client.PushLog(ctx, req); err != nil {
return NewErrPushLog(
err,
errors.NewKV("CID", evt.Cid),
Expand Down
8 changes: 3 additions & 5 deletions net/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestPushlogWithDialFailure(t *testing.T) {
grpc.WithCredentialsBundle(nil),
)

err = p.server.pushLog(ctx, event.Update{
err = p.server.pushLog(event.Update{
DocID: id.String(),
Cid: cid,
SchemaRoot: "test",
Expand All @@ -86,7 +86,7 @@ func TestPushlogWithInvalidPeerID(t *testing.T) {
cid, err := createCID(doc)
require.NoError(t, err)

err = p.server.pushLog(ctx, event.Update{
err = p.server.pushLog(event.Update{
DocID: id.String(),
Cid: cid,
SchemaRoot: "test",
Expand All @@ -100,11 +100,9 @@ func TestPushlogW_WithValidPeerID_NoError(t *testing.T) {
db1, p1 := newTestPeer(ctx, t)
defer db1.Close()
defer p1.Close()
p1.Start()
db2, p2 := newTestPeer(ctx, t)
defer p2.Close()
defer db2.Close()
p2.Start()

err := p1.host.Connect(ctx, p2.PeerInfo())
require.NoError(t, err)
Expand Down Expand Up @@ -139,7 +137,7 @@ func TestPushlogW_WithValidPeerID_NoError(t *testing.T) {
b, err := db1.Blockstore().AsIPLDStorage().Get(ctx, headCID.KeyString())
require.NoError(t, err)

err = p1.server.pushLog(ctx, event.Update{
err = p1.server.pushLog(event.Update{
DocID: doc.ID().String(),
Cid: headCID,
SchemaRoot: col.SchemaRoot(),
Expand Down
8 changes: 8 additions & 0 deletions net/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Options struct {
EnableRelay bool
GRPCServerOptions []grpc.ServerOption
GRPCDialOptions []grpc.DialOption
BootstrapPeers []string
}

// DefaultOptions returns the default net options.
Expand Down Expand Up @@ -64,3 +65,10 @@ func WithListenAddresses(addresses ...string) NodeOpt {
opt.ListenAddresses = addresses
}
}

// WithBootstrapPeers sets the bootstrap peer addresses to attempt to connect to.
func WithBootstrapPeers(peers ...string) NodeOpt {
return func(opt *Options) {
opt.BootstrapPeers = peers
}
}
12 changes: 12 additions & 0 deletions net/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,15 @@ func TestWithEnablePubSub(t *testing.T) {
WithEnablePubSub(true)(opts)
assert.Equal(t, true, opts.EnablePubSub)
}

func TestWithBootstrapPeers(t *testing.T) {
opts := &Options{}
WithBootstrapPeers("/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ")(opts)
assert.ElementsMatch(t, []string{"/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"}, opts.BootstrapPeers)
}

func TestWithPrivateKey(t *testing.T) {
opts := &Options{}
WithPrivateKey([]byte("abc"))(opts)
assert.Equal(t, []byte("abc"), opts.PrivateKey)
}
47 changes: 18 additions & 29 deletions net/dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

netutils "github.com/sourcenetwork/defradb/net/utils"
)

func TestDial_WithConnectedPeer_NoError(t *testing.T) {
Expand All @@ -28,27 +26,24 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) {
ctx := context.Background()
n1, err := NewPeer(
ctx,
db1.Rootstore(),
db1.Blockstore(),
db1.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewPeer(
ctx,
db2.Rootstore(),
db2.Blockstore(),
db2.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()})
if err != nil {
t.Fatal(err)
}
n2.Bootstrap(addrs)

err = n2.Connect(ctx, n1.PeerInfo())
require.NoError(t, err)

_, err = n1.server.dial(n2.PeerID())
require.NoError(t, err)
}
Expand All @@ -61,27 +56,24 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) {
ctx := context.Background()
n1, err := NewPeer(
ctx,
db1.Rootstore(),
db1.Blockstore(),
db1.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewPeer(
ctx,
db2.Rootstore(),
db2.Blockstore(),
db2.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()})
if err != nil {
t.Fatal(err)
}
n2.Bootstrap(addrs)

err = n2.Connect(ctx, n1.PeerInfo())
require.NoError(t, err)

_, err = n1.server.dial(n2.PeerID())
require.NoError(t, err)

Expand All @@ -97,27 +89,24 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing
ctx := context.Background()
n1, err := NewPeer(
ctx,
db1.Rootstore(),
db1.Blockstore(),
db1.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewPeer(
ctx,
db2.Rootstore(),
db2.Blockstore(),
db2.Events(),
WithListenAddresses("/ip4/0.0.0.0/tcp/0"),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()})
if err != nil {
t.Fatal(err)
}
n2.Bootstrap(addrs)

err = n2.Connect(ctx, n1.PeerInfo())
require.NoError(t, err)

_, err = n1.server.dial(n2.PeerID())
require.NoError(t, err)

Expand Down
Loading

0 comments on commit 9a228d2

Please sign in to comment.