diff --git a/cli/start.go b/cli/start.go index 212202be80..641e743ee8 100644 --- a/cli/start.go +++ b/cli/start.go @@ -11,12 +11,10 @@ package cli import ( - "fmt" "os" "os/signal" "syscall" - "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/immutable" "github.com/sourcenetwork/sourcehub/sdk" "github.com/spf13/cobra" @@ -26,7 +24,6 @@ import ( "github.com/sourcenetwork/defradb/internal/db" "github.com/sourcenetwork/defradb/keyring" "github.com/sourcenetwork/defradb/net" - netutils "github.com/sourcenetwork/defradb/net/utils" "github.com/sourcenetwork/defradb/node" ) @@ -49,15 +46,6 @@ func MakeStartCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { cfg := mustGetContextConfig(cmd) - var peers []peer.AddrInfo - if val := cfg.GetStringSlice("net.peers"); len(val) > 0 { - addrs, err := netutils.ParsePeers(val) - if err != nil { - return errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %s", val), err) - } - peers = addrs - } - opts := []node.Option{ node.WithStorePath(cfg.GetString("datastore.badger.path")), node.WithBadgerInMemory(cfg.GetString("datastore.store") == configStoreMemory), @@ -65,13 +53,13 @@ func MakeStartCommand() *cobra.Command { node.WithSourceHubChainID(cfg.GetString("acp.sourceHub.ChainID")), node.WithSourceHubGRPCAddress(cfg.GetString("acp.sourceHub.GRPCAddress")), node.WithSourceHubCometRPCAddress(cfg.GetString("acp.sourceHub.CometRPCAddress")), - node.WithPeers(peers...), // db options db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")), // net node options net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...), net.WithEnablePubSub(cfg.GetBool("net.pubSubEnabled")), net.WithEnableRelay(cfg.GetBool("net.relayEnabled")), + net.WithBootstrapPeers(cfg.GetStringSlice("net.peers")...), // http server options http.WithAddress(cfg.GetString("api.address")), http.WithAllowedOrigins(cfg.GetStringSlice("api.allowed-origins")...), diff --git a/event/event.go b/event/event.go index fa557cc03c..9d24a89c10 100644 --- a/event/event.go +++ b/event/event.go @@ -12,7 +12,6 @@ package event import ( "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/peer" ) @@ -30,8 +29,6 @@ const ( UpdateName = Name("update") // PubSubName is the name of the network pubsub event. PubSubName = Name("pubsub") - // PeerName is the name of the network connect event. - PeerName = Name("peer") // P2PTopicName is the name of the network p2p topic update event. P2PTopicName = Name("p2p-topic") // PeerInfoName is the name of the network peer info event. @@ -44,10 +41,6 @@ const ( ReplicatorCompletedName = Name("replicator-completed") ) -// Peer is an event that is published when -// a peer connection has changed status. -type Peer = event.EvtPeerConnectednessChanged - // PubSub is an event that is published when // a pubsub message has been received from a remote peer. type PubSub struct { diff --git a/go.mod b/go.mod index 1fdba68594..8b94ad9378 100644 --- a/go.mod +++ b/go.mod @@ -200,7 +200,6 @@ require ( github.com/hashicorp/go-safetemp v1.0.0 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect - github.com/hashicorp/golang-lru/arc/v2 v2.0.7 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect diff --git a/go.sum b/go.sum index 1165fcb757..c9ee7d5d1d 100644 --- a/go.sum +++ b/go.sum @@ -226,8 +226,6 @@ github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMb github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= github.com/99designs/keyring v1.2.2 h1:pZd3neh/EmUzWONb35LxQfvuY7kiSXAq3HQd97+XBn0= github.com/99designs/keyring v1.2.2/go.mod h1:wes/FrByc8j7lFOAGLGSNEg8f/PaI3cgTBqhFkHUrPk= -github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIoKjsnZuH8vjyaysT/ses3EvZeaV/1UkF2M= -github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -442,7 +440,6 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3 github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f h1:U5y3Y5UE0w7amNe7Z5G/twsBW0KEalRQXZzf8ufSh9I= github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f/go.mod h1:xH/i4TFMt8koVQZ6WFms69WAsDWr2XsYL3Hkl7jkoLE= github.com/dgraph-io/badger v1.6.2 h1:mNw0qs90GVgGGWylh0umH5iag1j6n/PeJtNvL6KY/x8= -github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrVH//y2UQE= github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o= github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk= github.com/dgraph-io/badger/v3 v3.2011.1 h1:Hmyof0WMEF/QtutX5SQHzIMnJQxb/IrSzhjckV2SD6g= @@ -869,10 +866,6 @@ github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0M github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= -github.com/ipfs/go-ds-badger v0.3.0 h1:xREL3V0EH9S219kFFueOYJJTcjgNSZ2HY1iSvN7U1Ro= -github.com/ipfs/go-ds-badger v0.3.0/go.mod h1:1ke6mXNqeV8K3y5Ak2bAA0osoTfmxUdupVCGm4QUIek= -github.com/ipfs/go-ds-leveldb v0.5.0 h1:s++MEBbD3ZKc9/8/njrn4flZLnCuY9I79v94gBUNumo= -github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q= github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ= github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= @@ -908,6 +901,7 @@ github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:ej/GTRX+HjlHMs/M3zg9fM8mUlQXgHqRvPJjtp+atHw= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABoLk/+KKHggpk= github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= diff --git a/net/client.go b/net/client.go index 9930710891..77eb28d4d6 100644 --- a/net/client.go +++ b/net/client.go @@ -31,7 +31,7 @@ var ( // pushLog creates a pushLog request and sends it to another node // over libp2p grpc connection -func (s *server) pushLog(ctx context.Context, evt event.Update, pid peer.ID) error { +func (s *server) pushLog(evt event.Update, pid peer.ID) error { body := &pb.PushLogRequest_Body{ DocID: []byte(evt.DocID), Cid: evt.Cid.Bytes(), @@ -50,10 +50,10 @@ func (s *server) pushLog(ctx context.Context, evt event.Update, pid peer.ID) err return NewErrPushLog(err) } - cctx, cancel := context.WithTimeout(ctx, PushTimeout) + ctx, cancel := context.WithTimeout(s.peer.ctx, PushTimeout) defer cancel() - if _, err := client.PushLog(cctx, req); err != nil { + if _, err := client.PushLog(ctx, req); err != nil { return NewErrPushLog( err, errors.NewKV("CID", evt.Cid), diff --git a/net/client_test.go b/net/client_test.go index 43c4ec1e01..629b176605 100644 --- a/net/client_test.go +++ b/net/client_test.go @@ -63,7 +63,7 @@ func TestPushlogWithDialFailure(t *testing.T) { grpc.WithCredentialsBundle(nil), ) - err = p.server.pushLog(ctx, event.Update{ + err = p.server.pushLog(event.Update{ DocID: id.String(), Cid: cid, SchemaRoot: "test", @@ -86,7 +86,7 @@ func TestPushlogWithInvalidPeerID(t *testing.T) { cid, err := createCID(doc) require.NoError(t, err) - err = p.server.pushLog(ctx, event.Update{ + err = p.server.pushLog(event.Update{ DocID: id.String(), Cid: cid, SchemaRoot: "test", @@ -100,11 +100,9 @@ func TestPushlogW_WithValidPeerID_NoError(t *testing.T) { db1, p1 := newTestPeer(ctx, t) defer db1.Close() defer p1.Close() - p1.Start() db2, p2 := newTestPeer(ctx, t) defer p2.Close() defer db2.Close() - p2.Start() err := p1.host.Connect(ctx, p2.PeerInfo()) require.NoError(t, err) @@ -139,7 +137,7 @@ func TestPushlogW_WithValidPeerID_NoError(t *testing.T) { b, err := db1.Blockstore().AsIPLDStorage().Get(ctx, headCID.KeyString()) require.NoError(t, err) - err = p1.server.pushLog(ctx, event.Update{ + err = p1.server.pushLog(event.Update{ DocID: doc.ID().String(), Cid: headCID, SchemaRoot: col.SchemaRoot(), diff --git a/net/config.go b/net/config.go index d2a3039d4a..8128aef215 100644 --- a/net/config.go +++ b/net/config.go @@ -24,6 +24,7 @@ type Options struct { EnableRelay bool GRPCServerOptions []grpc.ServerOption GRPCDialOptions []grpc.DialOption + BootstrapPeers []string } // DefaultOptions returns the default net options. @@ -64,3 +65,10 @@ func WithListenAddresses(addresses ...string) NodeOpt { opt.ListenAddresses = addresses } } + +// WithBootstrapPeers sets the bootstrap peer addresses to attempt to connect to. +func WithBootstrapPeers(peers ...string) NodeOpt { + return func(opt *Options) { + opt.BootstrapPeers = peers + } +} diff --git a/net/config_test.go b/net/config_test.go index 869c820788..b9920d8915 100644 --- a/net/config_test.go +++ b/net/config_test.go @@ -34,3 +34,15 @@ func TestWithEnablePubSub(t *testing.T) { WithEnablePubSub(true)(opts) assert.Equal(t, true, opts.EnablePubSub) } + +func TestWithBootstrapPeers(t *testing.T) { + opts := &Options{} + WithBootstrapPeers("/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ")(opts) + assert.ElementsMatch(t, []string{"/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"}, opts.BootstrapPeers) +} + +func TestWithPrivateKey(t *testing.T) { + opts := &Options{} + WithPrivateKey([]byte("abc"))(opts) + assert.Equal(t, []byte("abc"), opts.PrivateKey) +} diff --git a/net/dialer_test.go b/net/dialer_test.go index 479d4d7e63..64060f2660 100644 --- a/net/dialer_test.go +++ b/net/dialer_test.go @@ -16,8 +16,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - netutils "github.com/sourcenetwork/defradb/net/utils" ) func TestDial_WithConnectedPeer_NoError(t *testing.T) { @@ -28,27 +26,24 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) { ctx := context.Background() n1, err := NewPeer( ctx, - db1.Rootstore(), db1.Blockstore(), db1.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), + WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) defer n1.Close() n2, err := NewPeer( ctx, - db2.Rootstore(), db2.Blockstore(), db2.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), + WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) defer n2.Close() - addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()}) - if err != nil { - t.Fatal(err) - } - n2.Bootstrap(addrs) + + err = n2.Connect(ctx, n1.PeerInfo()) + require.NoError(t, err) + _, err = n1.server.dial(n2.PeerID()) require.NoError(t, err) } @@ -61,27 +56,24 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) { ctx := context.Background() n1, err := NewPeer( ctx, - db1.Rootstore(), db1.Blockstore(), db1.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), + WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) defer n1.Close() n2, err := NewPeer( ctx, - db2.Rootstore(), db2.Blockstore(), db2.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), + WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) defer n2.Close() - addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()}) - if err != nil { - t.Fatal(err) - } - n2.Bootstrap(addrs) + + err = n2.Connect(ctx, n1.PeerInfo()) + require.NoError(t, err) + _, err = n1.server.dial(n2.PeerID()) require.NoError(t, err) @@ -97,27 +89,24 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing ctx := context.Background() n1, err := NewPeer( ctx, - db1.Rootstore(), db1.Blockstore(), db1.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), + WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) defer n1.Close() n2, err := NewPeer( ctx, - db2.Rootstore(), db2.Blockstore(), db2.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), + WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) assert.NoError(t, err) defer n2.Close() - addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()}) - if err != nil { - t.Fatal(err) - } - n2.Bootstrap(addrs) + + err = n2.Connect(ctx, n1.PeerInfo()) + require.NoError(t, err) + _, err = n1.server.dial(n2.PeerID()) require.NoError(t, err) diff --git a/net/host.go b/net/host.go new file mode 100644 index 0000000000..2de5ffe25c --- /dev/null +++ b/net/host.go @@ -0,0 +1,72 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package net + +import ( + "context" + "time" + + libp2p "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" + dualdht "github.com/libp2p/go-libp2p-kad-dht/dual" + record "github.com/libp2p/go-libp2p-record" + libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/routing" + "github.com/libp2p/go-libp2p/p2p/net/connmgr" +) + +// setupHost returns a host and router configured with the given options. +func setupHost(ctx context.Context, options *Options) (host.Host, *dualdht.DHT, error) { + connManager, err := connmgr.NewConnManager(100, 400, connmgr.WithGracePeriod(time.Second*20)) + if err != nil { + return nil, nil, err + } + + dhtOpts := []dualdht.Option{ + dualdht.DHTOption(dht.NamespacedValidator("pk", record.PublicKeyValidator{})), + dualdht.DHTOption(dht.Concurrency(10)), + dualdht.DHTOption(dht.Mode(dht.ModeAuto)), + } + + var ddht *dualdht.DHT + routing := func(h host.Host) (routing.PeerRouting, error) { + ddht, err = dualdht.New(ctx, h, dhtOpts...) + return ddht, err + } + + libp2pOpts := []libp2p.Option{ + libp2p.ConnectionManager(connManager), + libp2p.DefaultTransports, + libp2p.ListenAddrStrings(options.ListenAddresses...), + libp2p.Routing(routing), + } + + // relay is enabled by default unless explicitly disabled + if !options.EnableRelay { + libp2pOpts = append(libp2pOpts, libp2p.DisableRelay()) + } + + // use the private key from options or generate a random one + if options.PrivateKey != nil { + privateKey, err := libp2pCrypto.UnmarshalEd25519PrivateKey(options.PrivateKey) + if err != nil { + return nil, nil, err + } + libp2pOpts = append(libp2pOpts, libp2p.Identity(privateKey)) + } + + h, err := libp2p.New(libp2pOpts...) + if err != nil { + return nil, nil, err + } + return h, ddht, nil +} diff --git a/net/host_test.go b/net/host_test.go new file mode 100644 index 0000000000..f7787e4c5e --- /dev/null +++ b/net/host_test.go @@ -0,0 +1,29 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package net + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSetupHostWithDefaultOptions(t *testing.T) { + h, dht, err := setupHost(context.Background(), DefaultOptions()) + require.NoError(t, err) + + require.NotNil(t, h) + require.NotNil(t, dht) + + err = h.Close() + require.NoError(t, err) +} diff --git a/net/peer.go b/net/peer.go index 7222d7cf9f..301c080edb 100644 --- a/net/peer.go +++ b/net/peer.go @@ -14,41 +14,27 @@ package net import ( "context" - "fmt" - "sync" - "sync/atomic" + "io" "time" "github.com/ipfs/boxo/bitswap" "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/bootstrap" exchange "github.com/ipfs/boxo/exchange" - "github.com/ipfs/boxo/ipns" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - libp2p "github.com/libp2p/go-libp2p" gostream "github.com/libp2p/go-libp2p-gostream" - dht "github.com/libp2p/go-libp2p-kad-dht" - dualdht "github.com/libp2p/go-libp2p-kad-dht/dual" pubsub "github.com/libp2p/go-libp2p-pubsub" - record "github.com/libp2p/go-libp2p-record" - libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto" - libp2pEvent "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" - // @TODO: https://github.com/sourcenetwork/defradb/issues/1902 - //nolint:staticcheck - "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds" - "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/multiformats/go-multiaddr" "github.com/sourcenetwork/corelog" "google.golang.org/grpc" "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/crypto" "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" @@ -64,6 +50,9 @@ type Peer struct { bus *event.Bus updateSub *event.Subscription + ctx context.Context + cancel context.CancelFunc + host host.Host dht routing.Routing ps *pubsub.PubSub @@ -75,97 +64,48 @@ type Peer struct { exch exchange.Interface bserv blockservice.BlockService - ctx context.Context - cancel context.CancelFunc - dhtClose func() error + bootCloser io.Closer } // NewPeer creates a new instance of the DefraDB server as a peer-to-peer node. func NewPeer( ctx context.Context, - rootstore datastore.Rootstore, blockstore datastore.Blockstore, bus *event.Bus, opts ...NodeOpt, ) (p *Peer, err error) { - if rootstore == nil || blockstore == nil { - return nil, ErrNilDB - } - - options := DefaultOptions() - for _, opt := range opts { - opt(options) - } - - connManager, err := connmgr.NewConnManager(100, 400, connmgr.WithGracePeriod(time.Second*20)) - if err != nil { - return nil, err - } - - var listenAddresses []multiaddr.Multiaddr - for _, addr := range options.ListenAddresses { - listenAddress, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return nil, err - } - listenAddresses = append(listenAddresses, listenAddress) - } - ctx, cancel := context.WithCancel(ctx) defer func() { if p == nil { cancel() + } else if err != nil { + p.Close() } }() - peerstore, err := pstoreds.NewPeerstore(ctx, rootstore, pstoreds.DefaultOpts()) - if err != nil { - return nil, err + if blockstore == nil { + return nil, ErrNilDB } - if options.PrivateKey == nil { - // generate an ephemeral private key - key, err := crypto.GenerateEd25519() + options := DefaultOptions() + for _, opt := range opts { + opt(options) + } + + peers := make([]peer.AddrInfo, len(options.BootstrapPeers)) + for i, p := range options.BootstrapPeers { + addr, err := peer.AddrInfoFromString(p) if err != nil { return nil, err } - options.PrivateKey = key + peers[i] = *addr } - // unmarshal the private key bytes - privateKey, err := libp2pCrypto.UnmarshalEd25519PrivateKey(options.PrivateKey) + h, ddht, err := setupHost(ctx, options) if err != nil { return nil, err } - var ddht *dualdht.DHT - - libp2pOpts := []libp2p.Option{ - libp2p.ConnectionManager(connManager), - libp2p.DefaultTransports, - libp2p.Identity(privateKey), - libp2p.ListenAddrs(listenAddresses...), - libp2p.Peerstore(peerstore), - libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { - // Delete this line and uncomment the next 6 lines once we remove batchable datastore support. - // var store ds.Batching - // // If `rootstore` doesn't implement `Batching`, `nil` will be passed - // // to newDHT which will cause the DHT to be stored in memory. - // if dsb, isBatching := rootstore.(ds.Batching); isBatching { - // store = dsb - // } - ddht, err = newDHT(ctx, h, rootstore) - return ddht, err - }), - } - if !options.EnableRelay { - libp2pOpts = append(libp2pOpts, libp2p.DisableRelay()) - } - - h, err := libp2p.New(libp2pOpts...) - if err != nil { - return nil, err - } log.InfoContext( ctx, "Created LibP2P host", @@ -173,9 +113,23 @@ func NewPeer( corelog.Any("Address", options.ListenAddresses), ) - var ps *pubsub.PubSub + bswapnet := network.NewFromIpfsHost(h, ddht) + bswap := bitswap.New(ctx, bswapnet, blockstore) + + p = &Peer{ + host: h, + dht: ddht, + blockstore: blockstore, + ctx: ctx, + cancel: cancel, + bus: bus, + p2pRPC: grpc.NewServer(options.GRPCServerOptions...), + bserv: blockservice.New(blockstore, bswap), + exch: bswap, + } + if options.EnablePubSub { - ps, err = pubsub.NewGossipSub( + p.ps, err = pubsub.NewGossipSub( ctx, h, pubsub.WithPeerExchange(true), @@ -184,119 +138,65 @@ func NewPeer( if err != nil { return nil, err } + p.updateSub, err = p.bus.Subscribe(event.UpdateName, event.P2PTopicName, event.ReplicatorName) + if err != nil { + return nil, err + } + log.Info("Starting internal broadcaster for pubsub network") + go p.handleMessageLoop() } + p.server, err = newServer(p, options.GRPCDialOptions...) if err != nil { return nil, err } - sub, err := h.EventBus().Subscribe(&libp2pEvent.EvtPeerConnectednessChanged{}) + p2plistener, err := gostream.Listen(h, corenet.Protocol) if err != nil { return nil, err } - // publish subscribed events to the event bus - go func() { - for { - select { - case <-ctx.Done(): - return - case val, isOpen := <-sub.Out(): - if !isOpen { - return - } - bus.Publish(event.NewMessage(event.PeerName, val)) - } - } - }() - - p = &Peer{ - host: h, - dht: ddht, - ps: ps, - blockstore: blockstore, - bus: bus, - p2pRPC: grpc.NewServer(options.GRPCServerOptions...), - ctx: ctx, - cancel: cancel, - } - p.server, err = newServer(p, options.GRPCDialOptions...) + p.bootCloser, err = bootstrap.Bootstrap(p.PeerID(), h, ddht, bootstrap.BootstrapConfigWithPeers(peers)) if err != nil { return nil, err } - p.setupBlockService() - - return p, nil -} - -// Start all the internal workers/goroutines/loops that manage the P2P state. -func (p *Peer) Start() error { - // reconnect to known peers - var wg sync.WaitGroup - for _, id := range p.host.Peerstore().PeersWithAddrs() { - if id == p.host.ID() { - continue - } - wg.Add(1) - go func(id peer.ID) { - defer wg.Done() - addr := p.host.Peerstore().PeerInfo(id) - err := p.host.Connect(p.ctx, addr) - if err != nil { - log.InfoContext( - p.ctx, - "Failure while reconnecting to a known peer", - corelog.Any("peer", id)) - } - }(id) - } - wg.Wait() - - p2plistener, err := gostream.Listen(p.host, corenet.Protocol) - if err != nil { - return err - } - - if p.ps != nil { - sub, err := p.bus.Subscribe(event.UpdateName, event.P2PTopicName, event.ReplicatorName) - if err != nil { - return err - } - p.updateSub = sub - log.InfoContext(p.ctx, "Starting internal broadcaster for pubsub network") - go p.handleMessageLoop() - } - - log.InfoContext( - p.ctx, - "Starting P2P node", - corelog.Any("P2P addresses", p.host.Addrs())) // register the P2P gRPC server go func() { pb.RegisterServiceServer(p.p2pRPC, p.server) if err := p.p2pRPC.Serve(p2plistener); err != nil && !errors.Is(err, grpc.ErrServerStopped) { - log.ErrorContextE(p.ctx, "Fatal P2P RPC server error", err) + log.ErrorE("Fatal P2P RPC server error", err) } }() - p.bus.Publish(event.NewMessage(event.PeerInfoName, event.PeerInfo{Info: p.PeerInfo()})) + bus.Publish(event.NewMessage(event.PeerInfoName, event.PeerInfo{Info: p.PeerInfo()})) - return nil + return p, nil } // Close the peer node and all its internal workers/goroutines/loops. func (p *Peer) Close() { - // close topics - if err := p.server.removeAllPubsubTopics(); err != nil { - log.ErrorContextE(p.ctx, "Error closing pubsub topics", err) + defer p.cancel() + + if p.bootCloser != nil { + // close bootstrap service + if err := p.bootCloser.Close(); err != nil { + log.ErrorE("Error closing bootstrap", err) + } } - // stop gRPC server - for _, c := range p.server.conns { - if err := c.Close(); err != nil { - log.ErrorContextE(p.ctx, "Failed closing server RPC connections", err) + if p.server != nil { + // close topics + if err := p.server.removeAllPubsubTopics(); err != nil { + log.ErrorE("Error closing pubsub topics", err) + } + + // stop gRPC server + for _, c := range p.server.conns { + if err := c.Close(); err != nil { + log.ErrorE("Failed closing server RPC connections", err) + } } } @@ -305,24 +205,25 @@ func (p *Peer) Close() { } if err := p.bserv.Close(); err != nil { - log.ErrorContextE(p.ctx, "Error closing block service", err) + log.ErrorE("Error closing block service", err) } if err := p.host.Close(); err != nil { - log.ErrorContextE(p.ctx, "Error closing host", err) - } - - if p.dhtClose != nil { - err := p.dhtClose() - if err != nil { - log.ErrorContextE(p.ctx, "Failed to close DHT", err) - } + log.ErrorE("Error closing host", err) } - stopGRPCServer(p.ctx, p.p2pRPC) - - if p.cancel != nil { - p.cancel() + stopped := make(chan struct{}) + go func() { + p.p2pRPC.GracefulStop() + close(stopped) + }() + timer := time.NewTimer(10 * time.Second) + select { + case <-timer.C: + p.p2pRPC.Stop() + log.Info("Peer gRPC server was shutdown ungracefully") + case <-stopped: + timer.Stop() } } @@ -345,7 +246,7 @@ func (p *Peer) handleMessageLoop() { } if err != nil { - log.ErrorContextE(p.ctx, "Error while handling broadcast log", err) + log.ErrorE("Error while handling broadcast log", err) } case event.P2PTopic: @@ -369,9 +270,9 @@ func (p *Peer) RegisterNewDocument( schemaRoot string, ) error { // register topic - if err := p.server.addPubSubTopic(docID.String(), !p.server.hasPubSubTopic(schemaRoot)); err != nil { - log.ErrorContextE( - p.ctx, + err := p.server.addPubSubTopic(docID.String(), !p.server.hasPubSubTopic(schemaRoot)) + if err != nil { + log.ErrorE( "Failed to create new pubsub topic", err, corelog.String("DocID", docID.String()), @@ -392,7 +293,7 @@ func (p *Peer) RegisterNewDocument( }, } - return p.server.publishLog(p.ctx, schemaRoot, req) + return p.server.publishLog(ctx, schemaRoot, req) } func (p *Peer) handleDocCreateLog(evt event.Update) error { @@ -449,9 +350,9 @@ func (p *Peer) handleDocUpdateLog(evt event.Update) error { func (p *Peer) pushLogToReplicators(lg event.Update) { // let the exchange know we have this block // this should speed up the dag sync process - err := p.bserv.Exchange().NotifyNewBlocks(p.ctx, blocks.NewBlock(lg.Block)) + err := p.bserv.Exchange().NotifyNewBlocks(context.Background(), blocks.NewBlock(lg.Block)) if err != nil { - log.ErrorContextE(p.ctx, "Failed to notify new blocks", err) + log.ErrorE("Failed to notify new blocks", err) } // push to each peer (replicator) @@ -475,9 +376,8 @@ func (p *Peer) pushLogToReplicators(lg event.Update) { continue } go func(peerID peer.ID) { - if err := p.server.pushLog(p.ctx, lg, peerID); err != nil { - log.ErrorContextE( - p.ctx, + if err := p.server.pushLog(lg, peerID); err != nil { + log.ErrorE( "Failed pushing log", err, corelog.String("DocID", lg.DocID), @@ -489,66 +389,11 @@ func (p *Peer) pushLogToReplicators(lg event.Update) { } } -func (p *Peer) setupBlockService() { - bswapnet := network.NewFromIpfsHost(p.host, p.dht) - bswap := bitswap.New(p.ctx, bswapnet, p.blockstore) - p.bserv = blockservice.New(p.blockstore, bswap) - p.exch = bswap -} - -func stopGRPCServer(ctx context.Context, server *grpc.Server) { - stopped := make(chan struct{}) - go func() { - server.GracefulStop() - close(stopped) - }() - timer := time.NewTimer(10 * time.Second) - select { - case <-timer.C: - server.Stop() - log.InfoContext(ctx, "Peer gRPC server was shutdown ungracefully") - case <-stopped: - timer.Stop() - } -} - // Connect initiates a connection to the peer with the given address. func (p *Peer) Connect(ctx context.Context, addr peer.AddrInfo) error { return p.host.Connect(ctx, addr) } -// Bootstrap connects to the given peers. -func (p *Peer) Bootstrap(addrs []peer.AddrInfo) { - var connected uint64 - - var wg sync.WaitGroup - for _, pinfo := range addrs { - wg.Add(1) - go func(pinfo peer.AddrInfo) { - defer wg.Done() - err := p.host.Connect(p.ctx, pinfo) - if err != nil { - log.InfoContext(p.ctx, "Cannot connect to peer", corelog.Any("Error", err)) - return - } - log.InfoContext(p.ctx, "Connected", corelog.Any("PeerID", pinfo.ID)) - atomic.AddUint64(&connected, 1) - }(pinfo) - } - - wg.Wait() - - if nPeers := len(addrs); int(connected) < nPeers/2 { - log.InfoContext(p.ctx, fmt.Sprintf("Only connected to %d bootstrap peers out of %d", connected, nPeers)) - } - - err := p.dht.Bootstrap(p.ctx) - if err != nil { - log.ErrorContextE(p.ctx, "Problem bootstraping using DHT", err) - return - } -} - func (p *Peer) PeerID() peer.ID { return p.host.ID() } @@ -563,17 +408,3 @@ func (p *Peer) PeerInfo() peer.AddrInfo { Addrs: p.host.Network().ListenAddresses(), } } - -func newDHT(ctx context.Context, h host.Host, dsb ds.Batching) (*dualdht.DHT, error) { - dhtOpts := []dualdht.Option{ - dualdht.DHTOption(dht.NamespacedValidator("pk", record.PublicKeyValidator{})), - dualdht.DHTOption(dht.NamespacedValidator("ipns", ipns.Validator{KeyBook: h.Peerstore()})), - dualdht.DHTOption(dht.Concurrency(10)), - dualdht.DHTOption(dht.Mode(dht.ModeAuto)), - } - if dsb != nil { - dhtOpts = append(dhtOpts, dualdht.DHTOption(dht.Datastore(dsb))) - } - - return dualdht.New(ctx, h, dhtOpts...) -} diff --git a/net/peer_test.go b/net/peer_test.go index cb8c8eab44..5322d32f6e 100644 --- a/net/peer_test.go +++ b/net/peer_test.go @@ -13,10 +13,8 @@ package net import ( "context" "testing" - "time" "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/peer" mh "github.com/multiformats/go-multihash" badger "github.com/sourcenetwork/badger/v4" rpc "github.com/sourcenetwork/go-libp2p-pubsub-rpc" @@ -31,7 +29,6 @@ import ( coreblock "github.com/sourcenetwork/defradb/internal/core/block" "github.com/sourcenetwork/defradb/internal/core/crdt" "github.com/sourcenetwork/defradb/internal/db" - netutils "github.com/sourcenetwork/defradb/net/utils" ) func emptyBlock() []byte { @@ -75,7 +72,6 @@ func newTestPeer(ctx context.Context, t *testing.T) (client.DB, *Peer) { n, err := NewPeer( ctx, - db.Rootstore(), db.Blockstore(), db.Events(), WithListenAddresses(randomMultiaddr), @@ -91,7 +87,7 @@ func TestNewPeer_NoError(t *testing.T) { db, err := db.NewDB(ctx, store, acp.NoACP, nil) require.NoError(t, err) defer db.Close() - p, err := NewPeer(ctx, db.Rootstore(), db.Blockstore(), db.Events()) + p, err := NewPeer(ctx, db.Blockstore(), db.Events()) require.NoError(t, err) p.Close() } @@ -102,16 +98,6 @@ func TestNewPeer_NoDB_NilDBError(t *testing.T) { require.ErrorIs(t, err, ErrNilDB) } -func TestStartAndClose_NoError(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - - err := p.Start() - require.NoError(t, err) -} - func TestStart_WithKnownPeer_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) @@ -126,75 +112,22 @@ func TestStart_WithKnownPeer_NoError(t *testing.T) { n1, err := NewPeer( ctx, - db1.Rootstore(), - db1.Blockstore(), - db1.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), - ) - require.NoError(t, err) - defer n1.Close() - n2, err := NewPeer( - ctx, - db2.Rootstore(), - db2.Blockstore(), - db2.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), - ) - require.NoError(t, err) - defer n2.Close() - - addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()}) - if err != nil { - t.Fatal(err) - } - n2.Bootstrap(addrs) - - err = n2.Start() - require.NoError(t, err) -} - -func TestStart_WithOfflineKnownPeer_NoError(t *testing.T) { - ctx := context.Background() - store := memory.NewDatastore(ctx) - db1, err := db.NewDB(ctx, store, acp.NoACP, nil) - require.NoError(t, err) - defer db1.Close() - - store2 := memory.NewDatastore(ctx) - db2, err := db.NewDB(ctx, store2, acp.NoACP, nil) - require.NoError(t, err) - defer db2.Close() - - n1, err := NewPeer( - ctx, - db1.Rootstore(), db1.Blockstore(), db1.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), + WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) defer n1.Close() n2, err := NewPeer( ctx, - db2.Rootstore(), db2.Blockstore(), db2.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), + WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) defer n2.Close() - addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()}) - if err != nil { - t.Fatal(err) - } - n2.Bootstrap(addrs) - n1.Close() - - // give time for n1 to close - time.Sleep(100 * time.Millisecond) - - err = n2.Start() + err = n2.Connect(ctx, n1.PeerInfo()) require.NoError(t, err) } @@ -317,10 +250,7 @@ func TestHandleDocCreateLog_WithExistingTopic_TopicExistsError(t *testing.T) { doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`), col.Definition()) require.NoError(t, err) - err = col.Create(ctx, doc) - require.NoError(t, err) - - _, err = rpc.NewTopic(ctx, p.ps, p.host.ID(), doc.ID().String(), true) + _, err = rpc.NewTopic(ctx, p.ps, p.host.ID(), "bae-7fca96a2-5f01-5558-a81f-09b47587f26d", true) require.NoError(t, err) err = p.handleDocCreateLog(event.Update{ @@ -348,20 +278,13 @@ func TestHandleDocUpdateLog_NoError(t *testing.T) { doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`), col.Definition()) require.NoError(t, err) - err = col.Create(ctx, doc) - require.NoError(t, err) - - headCID, err := getHead(ctx, db, doc.ID()) - require.NoError(t, err) - - b, err := db.Blockstore().AsIPLDStorage().Get(ctx, headCID.KeyString()) + cid, err := createCID(doc) require.NoError(t, err) err = p.handleDocUpdateLog(event.Update{ DocID: doc.ID().String(), - Cid: headCID, + Cid: cid, SchemaRoot: col.SchemaRoot(), - Block: b, }) require.NoError(t, err) } @@ -396,23 +319,16 @@ func TestHandleDocUpdateLog_WithExistingDocIDTopic_TopicExistsError(t *testing.T doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`), col.Definition()) require.NoError(t, err) - err = col.Create(ctx, doc) - require.NoError(t, err) - - headCID, err := getHead(ctx, db, doc.ID()) - require.NoError(t, err) - - b, err := db.Blockstore().AsIPLDStorage().Get(ctx, headCID.KeyString()) + cid, err := createCID(doc) require.NoError(t, err) - _, err = rpc.NewTopic(ctx, p.ps, p.host.ID(), doc.ID().String(), true) + _, err = rpc.NewTopic(ctx, p.ps, p.host.ID(), "bae-7fca96a2-5f01-5558-a81f-09b47587f26d", true) require.NoError(t, err) err = p.handleDocUpdateLog(event.Update{ DocID: doc.ID().String(), - Cid: headCID, + Cid: cid, SchemaRoot: col.SchemaRoot(), - Block: b, }) require.ErrorContains(t, err, "topic already exists") } @@ -435,23 +351,16 @@ func TestHandleDocUpdateLog_WithExistingSchemaTopic_TopicExistsError(t *testing. doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`), col.Definition()) require.NoError(t, err) - err = col.Create(ctx, doc) - require.NoError(t, err) - - headCID, err := getHead(ctx, db, doc.ID()) - require.NoError(t, err) - - b, err := db.Blockstore().AsIPLDStorage().Get(ctx, headCID.KeyString()) + cid, err := createCID(doc) require.NoError(t, err) - _, err = rpc.NewTopic(ctx, p.ps, p.host.ID(), col.SchemaRoot(), true) + _, err = rpc.NewTopic(ctx, p.ps, p.host.ID(), "bafkreia7ljiy5oief4dp5xsk7t7zlgfjzqh3537hw7rtttjzchybfxtn4u", true) require.NoError(t, err) err = p.handleDocUpdateLog(event.Update{ DocID: doc.ID().String(), - Cid: headCID, + Cid: cid, SchemaRoot: col.SchemaRoot(), - Block: b, }) require.ErrorContains(t, err, "topic already exists") } @@ -475,7 +384,6 @@ func TestNewPeer_WithEnableRelay_NoError(t *testing.T) { defer db.Close() n, err := NewPeer( context.Background(), - db.Rootstore(), db.Blockstore(), db.Events(), WithEnableRelay(true), @@ -484,23 +392,6 @@ func TestNewPeer_WithEnableRelay_NoError(t *testing.T) { n.Close() } -func TestNewPeer_WithDBClosed_NoError(t *testing.T) { - ctx := context.Background() - store := memory.NewDatastore(ctx) - - db, err := db.NewDB(ctx, store, acp.NoACP, nil) - require.NoError(t, err) - db.Close() - - _, err = NewPeer( - context.Background(), - db.Rootstore(), - db.Blockstore(), - db.Events(), - ) - require.ErrorContains(t, err, "datastore closed") -} - func TestNewPeer_NoPubSub_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) @@ -510,7 +401,6 @@ func TestNewPeer_NoPubSub_NoError(t *testing.T) { n, err := NewPeer( context.Background(), - db.Rootstore(), db.Blockstore(), db.Events(), WithEnablePubSub(false), @@ -529,7 +419,6 @@ func TestNewPeer_WithEnablePubSub_NoError(t *testing.T) { n, err := NewPeer( ctx, - db.Rootstore(), db.Blockstore(), db.Events(), WithEnablePubSub(true), @@ -549,7 +438,6 @@ func TestNodeClose_NoError(t *testing.T) { defer db.Close() n, err := NewPeer( context.Background(), - db.Rootstore(), db.Blockstore(), db.Events(), ) @@ -557,92 +445,25 @@ func TestNodeClose_NoError(t *testing.T) { n.Close() } -func TestNewPeer_BootstrapWithNoPeer_NoError(t *testing.T) { - ctx := context.Background() - store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, nil) - require.NoError(t, err) - defer db.Close() - - n1, err := NewPeer( - ctx, - db.Rootstore(), - db.Blockstore(), - db.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), - ) - require.NoError(t, err) - n1.Bootstrap([]peer.AddrInfo{}) - n1.Close() -} - -func TestNewPeer_BootstrapWithOnePeer_NoError(t *testing.T) { - ctx := context.Background() - store := memory.NewDatastore(ctx) - db, err := db.NewDB(ctx, store, acp.NoACP, nil) - require.NoError(t, err) - defer db.Close() - n1, err := NewPeer( - ctx, - db.Rootstore(), - db.Blockstore(), - db.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), - ) - require.NoError(t, err) - defer n1.Close() - n2, err := NewPeer( - ctx, - db.Rootstore(), - db.Blockstore(), - db.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), - ) - require.NoError(t, err) - defer n2.Close() - addrs, err := netutils.ParsePeers([]string{n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String()}) - if err != nil { - t.Fatal(err) - } - n2.Bootstrap(addrs) -} - -func TestNewPeer_BootstrapWithOneValidPeerAndManyInvalidPeers_NoError(t *testing.T) { +func TestListenAddrs_WithListenAddresses_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) db, err := db.NewDB(ctx, store, acp.NoACP, nil) require.NoError(t, err) defer db.Close() - n1, err := NewPeer( - ctx, - db.Rootstore(), - db.Blockstore(), - db.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), - ) - require.NoError(t, err) - defer n1.Close() - n2, err := NewPeer( - ctx, - db.Rootstore(), + n, err := NewPeer( + context.Background(), db.Blockstore(), db.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), + WithListenAddresses("/ip4/127.0.0.1/tcp/0"), ) require.NoError(t, err) - defer n2.Close() - addrs, err := netutils.ParsePeers([]string{ - n1.host.Addrs()[0].String() + "/p2p/" + n1.PeerID().String(), - "/ip4/0.0.0.0/tcp/1234/p2p/" + "12D3KooWC8YY6Tx3uAeHsdBmoy7PJPwqXAHE4HkCZ5veankKWci6", - "/ip4/0.0.0.0/tcp/1235/p2p/" + "12D3KooWC8YY6Tx3uAeHsdBmoy7PJPwqXAHE4HkCZ5veankKWci5", - "/ip4/0.0.0.0/tcp/1236/p2p/" + "12D3KooWC8YY6Tx3uAeHsdBmoy7PJPwqXAHE4HkCZ5veankKWci4", - }) - require.NoError(t, err) - n2.Bootstrap(addrs) + require.Contains(t, n.ListenAddrs()[0].String(), "/tcp/") + n.Close() } -func TestListenAddrs_WithListenAddresses_NoError(t *testing.T) { +func TestPeer_WithBootstrapPeers_NoError(t *testing.T) { ctx := context.Background() store := memory.NewDatastore(ctx) db, err := db.NewDB(ctx, store, acp.NoACP, nil) @@ -651,12 +472,11 @@ func TestListenAddrs_WithListenAddresses_NoError(t *testing.T) { n, err := NewPeer( context.Background(), - db.Rootstore(), db.Blockstore(), db.Events(), - WithListenAddresses("/ip4/0.0.0.0/tcp/0"), + WithBootstrapPeers("/ip4/127.0.0.1/tcp/6666/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"), ) require.NoError(t, err) - require.Contains(t, n.ListenAddrs()[0].String(), "/tcp/") + n.Close() } diff --git a/net/server.go b/net/server.go index 0e36eb7e3c..2f129d19cf 100644 --- a/net/server.go +++ b/net/server.go @@ -18,7 +18,6 @@ import ( "sync" cid "github.com/ipfs/go-cid" - "github.com/libp2p/go-libp2p/core/peer" libpeer "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/sourcenetwork/corelog" @@ -46,7 +45,7 @@ type server struct { topics map[string]pubsubTopic // replicators is a map from collectionName => peerId - replicators map[string]map[peer.ID]struct{} + replicators map[string]map[libpeer.ID]struct{} mu sync.Mutex conns map[libpeer.ID]*grpc.ClientConn @@ -68,7 +67,7 @@ func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) { peer: p, conns: make(map[libpeer.ID]*grpc.ClientConn), topics: make(map[string]pubsubTopic), - replicators: make(map[string]map[peer.ID]struct{}), + replicators: make(map[string]map[libpeer.ID]struct{}), } cred := insecure.NewCredentials() @@ -224,7 +223,7 @@ func (s *server) removePubSubTopic(topic string) error { return nil } - log.InfoContext(s.peer.ctx, "Removing pubsub topic", + log.Info("Removing pubsub topic", corelog.String("PeerID", s.peer.PeerID().String()), corelog.String("Topic", topic)) @@ -242,7 +241,7 @@ func (s *server) removeAllPubsubTopics() error { return nil } - log.InfoContext(s.peer.ctx, "Removing all pubsub topics", + log.Info("Removing all pubsub topics", corelog.String("PeerID", s.peer.PeerID().String())) s.mu.Lock() @@ -282,7 +281,8 @@ func (s *server) publishLog(ctx context.Context, topic string, req *pb.PushLogRe return errors.Wrap("failed marshling pubsub message", err) } - if _, err := t.Publish(ctx, data, rpc.WithIgnoreResponse(true)); err != nil { + _, err = t.Publish(ctx, data, rpc.WithIgnoreResponse(true)) + if err != nil { return errors.Wrap(fmt.Sprintf("failed publishing to thread %s", topic), err) } return nil @@ -290,14 +290,14 @@ func (s *server) publishLog(ctx context.Context, topic string, req *pb.PushLogRe // pubSubMessageHandler handles incoming PushLog messages from the pubsub network. func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) ([]byte, error) { - log.InfoContext(s.peer.ctx, "Received new pubsub event", + log.Info("Received new pubsub event", corelog.String("PeerID", s.peer.PeerID().String()), corelog.Any("SenderId", from), corelog.String("Topic", topic)) req := new(pb.PushLogRequest) if err := proto.Unmarshal(msg, req); err != nil { - log.ErrorContextE(s.peer.ctx, "Failed to unmarshal pubsub message %s", err) + log.ErrorE("Failed to unmarshal pubsub message %s", err) return nil, err } ctx := grpcpeer.NewContext(s.peer.ctx, &grpcpeer.Peer{ @@ -311,7 +311,7 @@ func (s *server) pubSubMessageHandler(from libpeer.ID, topic string, msg []byte) // pubSubEventHandler logs events from the subscribed DocID topics. func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) { - log.InfoContext(s.peer.ctx, "Received new pubsub event", + log.Info("Received new pubsub event", corelog.String("PeerID", s.peer.PeerID().String()), corelog.Any("SenderId", from), corelog.String("Topic", topic), @@ -349,14 +349,14 @@ func (s *server) updatePubSubTopics(evt event.P2PTopic) { for _, topic := range evt.ToAdd { err := s.addPubSubTopic(topic, true) if err != nil { - log.ErrorContextE(s.peer.ctx, "Failed to add pubsub topic.", err) + log.ErrorE("Failed to add pubsub topic.", err) } } for _, topic := range evt.ToRemove { err := s.removePubSubTopic(topic) if err != nil { - log.ErrorContextE(s.peer.ctx, "Failed to remove pubsub topic.", err) + log.ErrorE("Failed to remove pubsub topic.", err) } } s.peer.bus.Publish(event.NewMessage(event.P2PTopicCompletedName, nil)) @@ -371,7 +371,7 @@ func (s *server) updateReplicators(evt event.Replicator) { s.peer.host.Peerstore().AddAddrs(evt.Info.ID, evt.Info.Addrs, peerstore.PermanentAddrTTL) // connect to the peer if err := s.peer.Connect(s.peer.ctx, evt.Info); err != nil { - log.ErrorContextE(s.peer.ctx, "Failed to connect to replicator peer", err) + log.ErrorE("Failed to connect to replicator peer", err) } } @@ -389,7 +389,7 @@ func (s *server) updateReplicators(evt event.Replicator) { } for schema := range evt.Schemas { if _, exists := s.replicators[schema]; !exists { - s.replicators[schema] = make(map[peer.ID]struct{}) + s.replicators[schema] = make(map[libpeer.ID]struct{}) } s.replicators[schema][evt.Info.ID] = struct{}{} } @@ -397,9 +397,8 @@ func (s *server) updateReplicators(evt event.Replicator) { if evt.Docs != nil { for update := range evt.Docs { - if err := s.pushLog(s.peer.ctx, update, evt.Info.ID); err != nil { - log.ErrorContextE( - s.peer.ctx, + if err := s.pushLog(update, evt.Info.ID); err != nil { + log.ErrorE( "Failed to replicate log", err, corelog.Any("CID", update.Cid), diff --git a/net/server_test.go b/net/server_test.go index 1ac178a2d1..0e23e3b019 100644 --- a/net/server_test.go +++ b/net/server_test.go @@ -16,8 +16,6 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore/query" - "github.com/libp2p/go-libp2p/core/event" - "github.com/libp2p/go-libp2p/core/host" "github.com/stretchr/testify/require" grpcpeer "google.golang.org/grpc/peer" @@ -36,55 +34,6 @@ func TestNewServerSimple(t *testing.T) { require.NoError(t, err) } -var mockError = errors.New("mock error") - -type mockHost struct { - host.Host -} - -func (mH *mockHost) EventBus() event.Bus { - return &mockBus{} -} - -type mockBus struct { - event.Bus -} - -func (mB *mockBus) Emitter(eventType any, opts ...event.EmitterOpt) (event.Emitter, error) { - return nil, mockError -} - -func (mB *mockBus) Subscribe(eventType any, opts ...event.SubscriptionOpt) (event.Subscription, error) { - return nil, mockError -} - -func TestNewServerWithEmitterError(t *testing.T) { - ctx := context.Background() - db, p := newTestPeer(ctx, t) - defer db.Close() - defer p.Close() - - _, err := db.AddSchema(ctx, `type User { - name: String - age: Int - }`) - require.NoError(t, err) - - col, err := db.GetCollectionByName(ctx, "User") - require.NoError(t, err) - - doc, err := client.NewDocFromJSON([]byte(`{"name": "John", "age": 30}`), col.Definition()) - require.NoError(t, err) - - err = col.Create(ctx, doc) - require.NoError(t, err) - - p.host = &mockHost{p.host} - - _, err = newServer(p) - require.NoError(t, err) -} - func TestGetDocGraph(t *testing.T) { ctx := context.Background() db, p := newTestPeer(ctx, t) @@ -151,10 +100,8 @@ func TestPushLog(t *testing.T) { db, p := newTestPeer(ctx, t) defer db.Close() defer p.Close() - err := p.Start() - require.NoError(t, err) - _, err = db.AddSchema(ctx, `type User { + _, err := db.AddSchema(ctx, `type User { name: String age: Int }`) diff --git a/net/utils/util.go b/net/utils/util.go deleted file mode 100644 index 5e345cc12b..0000000000 --- a/net/utils/util.go +++ /dev/null @@ -1,53 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/* -Package utils provides utility functions for the defradb networking facilities. -*/ -package utils - -import ( - "fmt" - - "github.com/libp2p/go-libp2p/core/peer" - ma "github.com/multiformats/go-multiaddr" - - "github.com/sourcenetwork/defradb/errors" -) - -func ParsePeers(addrs []string) ([]peer.AddrInfo, error) { - maddrs := make([]ma.Multiaddr, len(addrs)) - for i, addr := range addrs { - var err error - maddrs[i], err = ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - } - return peer.AddrInfosFromP2pAddrs(maddrs...) -} - -func TCPAddrFromMultiAddr(maddr ma.Multiaddr) (string, error) { - var addr string - if maddr == nil { - return addr, errors.New("address can't be empty") - } - ip4, err := maddr.ValueForProtocol(ma.P_IP4) - if err != nil { - return addr, err - } - tcp, err := maddr.ValueForProtocol(ma.P_TCP) - if err != nil { - return addr, err - } - return fmt.Sprintf("%s:%s", ip4, tcp), nil -} diff --git a/node/node.go b/node/node.go index 6dad35f593..5eea424956 100644 --- a/node/node.go +++ b/node/node.go @@ -16,7 +16,6 @@ import ( "fmt" gohttp "net/http" - "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" "github.com/sourcenetwork/defradb/client" @@ -40,7 +39,6 @@ type Option any // Options contains start configuration values. type Options struct { - peers []peer.AddrInfo disableP2P bool disableAPI bool } @@ -67,13 +65,6 @@ func WithDisableAPI(disable bool) NodeOpt { } } -// WithPeers sets the bootstrap peers. -func WithPeers(peers ...peer.AddrInfo) NodeOpt { - return func(o *Options) { - o.peers = peers - } -} - // Node is a DefraDB instance with optional sub-systems. type Node struct { DB client.DB @@ -141,13 +132,10 @@ func NewNode(ctx context.Context, opts ...Option) (*Node, error) { var peer *net.Peer if !options.disableP2P { // setup net node - peer, err = net.NewPeer(ctx, db.Rootstore(), db.Blockstore(), db.Events(), netOpts...) + peer, err = net.NewPeer(ctx, db.Blockstore(), db.Events(), netOpts...) if err != nil { return nil, err } - if len(options.peers) > 0 { - peer.Bootstrap(options.peers) - } } var server *http.Server @@ -172,11 +160,6 @@ func NewNode(ctx context.Context, opts ...Option) (*Node, error) { // Start starts the node sub-systems. func (n *Node) Start(ctx context.Context) error { - if n.Peer != nil { - if err := n.Peer.Start(); err != nil { - return err - } - } if n.Server != nil { err := n.Server.SetListener() if err != nil { diff --git a/node/node_test.go b/node/node_test.go index d3bf4c5048..1aa1dac92a 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -13,9 +13,7 @@ package node import ( "testing" - "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestWithDisableP2P(t *testing.T) { @@ -29,14 +27,3 @@ func TestWithDisableAPI(t *testing.T) { WithDisableAPI(true)(options) assert.Equal(t, true, options.disableAPI) } - -func TestWithPeers(t *testing.T) { - peer, err := peer.AddrInfoFromString("/ip4/127.0.0.1/tcp/9000/p2p/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") - require.NoError(t, err) - - options := &Options{} - WithPeers(*peer)(options) - - require.Len(t, options.peers, 1) - assert.Equal(t, *peer, options.peers[0]) -} diff --git a/tests/clients/cli/wrapper.go b/tests/clients/cli/wrapper.go index 14e4df7cc4..8600448968 100644 --- a/tests/clients/cli/wrapper.go +++ b/tests/clients/cli/wrapper.go @@ -540,8 +540,8 @@ func (w *Wrapper) PrintDump(ctx context.Context) error { return w.node.DB.PrintDump(ctx) } -func (w *Wrapper) Bootstrap(addrs []peer.AddrInfo) { - w.node.Peer.Bootstrap(addrs) +func (w *Wrapper) Connect(ctx context.Context, addr peer.AddrInfo) error { + return w.node.Peer.Connect(ctx, addr) } func (w *Wrapper) Host() string { diff --git a/tests/clients/clients.go b/tests/clients/clients.go index f5d822ab39..2a67ed0812 100644 --- a/tests/clients/clients.go +++ b/tests/clients/clients.go @@ -11,6 +11,8 @@ package clients import ( + "context" + "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/defradb/client" @@ -20,5 +22,5 @@ import ( // required for testing. type Client interface { client.DB - Bootstrap([]peer.AddrInfo) + Connect(context.Context, peer.AddrInfo) error } diff --git a/tests/clients/http/wrapper.go b/tests/clients/http/wrapper.go index 76e31d9cbd..5a813c9265 100644 --- a/tests/clients/http/wrapper.go +++ b/tests/clients/http/wrapper.go @@ -233,8 +233,8 @@ func (w *Wrapper) PrintDump(ctx context.Context) error { return w.node.DB.PrintDump(ctx) } -func (w *Wrapper) Bootstrap(addrs []peer.AddrInfo) { - w.node.Peer.Bootstrap(addrs) +func (w *Wrapper) Connect(ctx context.Context, addr peer.AddrInfo) error { + return w.node.Peer.Connect(ctx, addr) } func (w *Wrapper) Host() string { diff --git a/tests/integration/client.go b/tests/integration/client.go index d9c9f74334..d37a9f22c0 100644 --- a/tests/integration/client.go +++ b/tests/integration/client.go @@ -11,6 +11,7 @@ package tests import ( + "context" "fmt" "os" "strconv" @@ -100,10 +101,11 @@ func newGoClientWrapper(n *node.Node) *goClientWrapper { } } -func (w *goClientWrapper) Bootstrap(addrs []peer.AddrInfo) { +func (w *goClientWrapper) Connect(ctx context.Context, addr peer.AddrInfo) error { if w.peer != nil { - w.peer.Bootstrap(addrs) + return w.peer.Connect(ctx, addr) } + return nil } func (w *goClientWrapper) Close() { diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index b1b79982cf..99c713bb79 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -16,7 +16,6 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/net" - "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -147,9 +146,12 @@ func connectPeers( sourceNode := s.nodes[cfg.SourceNodeID] targetNode := s.nodes[cfg.TargetNodeID] - addrs := []peer.AddrInfo{targetNode.PeerInfo()} - log.InfoContext(s.ctx, "Bootstrapping with peers", corelog.Any("Addresses", addrs)) - sourceNode.Bootstrap(addrs) + log.InfoContext(s.ctx, "Connect peers", + corelog.Any("Source", sourceNode.PeerInfo()), + corelog.Any("Target", targetNode.PeerInfo())) + + err := sourceNode.Connect(s.ctx, targetNode.PeerInfo()) + require.NoError(s.t, err) s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} @@ -287,6 +289,23 @@ func getAllP2PCollections( assert.Equal(s.t, expectedCollections, cols) } +// reconnectPeers makes sure that all peers are connected after a node restart action. +func reconnectPeers(s *state) { + for i, n := range s.nodeP2P { + for j := range n.connections { + sourceNode := s.nodes[i] + targetNode := s.nodes[j] + + log.InfoContext(s.ctx, "Connect peers", + corelog.Any("Source", sourceNode.PeerInfo()), + corelog.Any("Target", targetNode.PeerInfo())) + + err := sourceNode.Connect(s.ctx, targetNode.PeerInfo()) + require.NoError(s.t, err) + } + } +} + func RandomNetworkingConfig() ConfigureNode { return func() []net.NodeOpt { return []net.NodeOpt{ diff --git a/tests/integration/utils.go b/tests/integration/utils.go index f26ee4ed5b..2576f30762 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -709,15 +709,9 @@ func restartNodes( nodeOpts := s.nodeConfigs[i] nodeOpts = append(nodeOpts, net.WithListenAddresses(addresses...)) - p, err := net.NewPeer(s.ctx, node.DB.Rootstore(), node.DB.Blockstore(), node.DB.Events(), nodeOpts...) + node.Peer, err = net.NewPeer(s.ctx, node.DB.Blockstore(), node.DB.Events(), nodeOpts...) require.NoError(s.t, err) - if err := p.Start(); err != nil { - p.Close() - require.NoError(s.t, err) - } - node.Peer = p - c, err := setupClient(s, node) require.NoError(s.t, err) s.nodes[i] = c @@ -733,6 +727,7 @@ func restartNodes( // will reference the old (closed) database instances. refreshCollections(s) refreshIndexes(s) + reconnectPeers(s) } // refreshCollections refreshes all the collections of the given names, preserving order. @@ -783,20 +778,12 @@ func configureNode( nodeOpts := action() nodeOpts = append(nodeOpts, net.WithPrivateKey(privateKey)) - p, err := net.NewPeer(s.ctx, node.DB.Rootstore(), node.DB.Blockstore(), node.DB.Events(), nodeOpts...) + node.Peer, err = net.NewPeer(s.ctx, node.DB.Blockstore(), node.DB.Events(), nodeOpts...) require.NoError(s.t, err) - log.InfoContext(s.ctx, "Starting P2P node", corelog.Any("P2P address", p.PeerInfo())) - if err := p.Start(); err != nil { - p.Close() - require.NoError(s.t, err) - } - - s.nodeAddresses = append(s.nodeAddresses, p.PeerInfo()) + s.nodeAddresses = append(s.nodeAddresses, node.Peer.PeerInfo()) s.nodeConfigs = append(s.nodeConfigs, nodeOpts) - node.Peer = p - c, err := setupClient(s, node) require.NoError(s.t, err)