Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Decouple net config #2258

Merged
merged 7 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@
var node *net.Node
if !cfg.Net.P2PDisabled {
nodeOpts := []net.NodeOpt{
net.WithConfig(cfg),
net.WithListenAddress(cfg.Net.P2PAddress),
net.WithEnablePubSub(cfg.Net.PubSubEnabled),
net.WithEnableRelay(cfg.Net.RelayEnabled),

Check warning on line 225 in cli/start.go

View check run for this annotation

Codecov / codecov/patch

cli/start.go#L223-L225

Added lines #L223 - L225 were not covered by tests
}
if cfg.Datastore.Store == badgerDatastoreName {
// It would be ideal to not have the key path tied to the datastore.
Expand Down
92 changes: 17 additions & 75 deletions net/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,113 +13,55 @@
package net

import (
"time"

cconnmgr "github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
ma "github.com/multiformats/go-multiaddr"
"google.golang.org/grpc"

"github.com/sourcenetwork/defradb/config"
)

// Options is the node options.
type Options struct {
ListenAddrs []ma.Multiaddr
ListenAddress string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: We need to be able to listen on multiple addresses. For example, we may want to listen on 0.0.0.0 to expose a port externally but alse listen on 127.0.0.1 so that we can have more than one node locally talking to each other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since one of the addresses will always be localhost could we just add an EnableLocalHost option here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an option. I'm wondering if there may also be a situation where someone would want to listen on multiple ports 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the config to support multiple addresses and set the default to localhost like we discussed.

PrivateKey crypto.PrivKey
EnablePubSub bool
EnableRelay bool
GRPCServerOptions []grpc.ServerOption
GRPCDialOptions []grpc.DialOption
ConnManager cconnmgr.ConnManager
}

type NodeOpt func(*Options) error

// NewMergedOptions obtains Options by applying given NodeOpts.
func NewMergedOptions(opts ...NodeOpt) (*Options, error) {
var options Options
for _, opt := range opts {
if opt == nil {
continue
}
if err := opt(&options); err != nil {
return nil, err
}
// DefaultOptions returns the default net options.
func DefaultOptions() *Options {
return &Options{
ListenAddress: "/ip4/0.0.0.0/tcp/9171",
EnablePubSub: true,
EnableRelay: false,
}
return &options, nil
}

// NewConnManager gives a new ConnManager.
func NewConnManager(low int, high int, grace time.Duration) (cconnmgr.ConnManager, error) {
c, err := connmgr.NewConnManager(low, high, connmgr.WithGracePeriod(grace))
if err != nil {
return nil, err
}
return c, nil
}

// WithConfig provides the Node-specific configuration, from the top-level Net config.
func WithConfig(cfg *config.Config) NodeOpt {
return func(opt *Options) error {
var err error
err = WithListenP2PAddrStrings(cfg.Net.P2PAddress)(opt)
if err != nil {
return err
}
opt.EnableRelay = cfg.Net.RelayEnabled
opt.EnablePubSub = cfg.Net.PubSubEnabled
opt.ConnManager, err = NewConnManager(100, 400, time.Second*20)
if err != nil {
return err
}
return nil
}
}
type NodeOpt func(*Options)

// WithPrivateKey sets the p2p host private key.
func WithPrivateKey(priv crypto.PrivKey) NodeOpt {
return func(opt *Options) error {
return func(opt *Options) {
opt.PrivateKey = priv
return nil
}
}

// WithPubSub enables the pubsub feature.
func WithPubSub(enable bool) NodeOpt {
return func(opt *Options) error {
// WithEnablePubSub enables the pubsub feature.
func WithEnablePubSub(enable bool) NodeOpt {
return func(opt *Options) {
opt.EnablePubSub = enable
return nil
}
}

// WithEnableRelay enables the relay feature.
func WithEnableRelay(enable bool) NodeOpt {
return func(opt *Options) error {
return func(opt *Options) {
opt.EnableRelay = enable
return nil
}
}

// ListenP2PAddrStrings sets the address to listen on given as strings.
func WithListenP2PAddrStrings(addrs ...string) NodeOpt {
return func(opt *Options) error {
for _, addrstr := range addrs {
a, err := ma.NewMultiaddr(addrstr)
if err != nil {
return err
}
opt.ListenAddrs = append(opt.ListenAddrs, a)
}
return nil
}
}

// ListenAddrs sets the address to listen on given as MultiAddr(s).
func WithListenAddrs(addrs ...ma.Multiaddr) NodeOpt {
return func(opt *Options) error {
opt.ListenAddrs = addrs
return nil
// WithListenAddress sets the address to listen on given as a multiaddress string.
func WithListenAddress(address string) NodeOpt {
return func(opt *Options) {
opt.ListenAddress = address
}
}
94 changes: 12 additions & 82 deletions net/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,94 +12,24 @@ package net

import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/crypto"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"

"github.com/sourcenetwork/defradb/config"
"github.com/stretchr/testify/assert"
)

func TestNewMergedOptionsSimple(t *testing.T) {
opt, err := NewMergedOptions()
require.NoError(t, err)
require.NotNil(t, opt)
}

func TestNewMergedOptionsWithNilOption(t *testing.T) {
opt, err := NewMergedOptions(nil)
require.NoError(t, err)
require.NotNil(t, opt)
}

func TestNewConnManagerSimple(t *testing.T) {
conMngr, err := NewConnManager(1, 10, time.Second)
require.NoError(t, err)
err = conMngr.Close()
require.NoError(t, err)
}

func TestNewConnManagerWithError(t *testing.T) {
_, err := NewConnManager(1, 10, -time.Second)
require.Contains(t, err.Error(), "grace period must be non-negative")
}

func TestWithConfigWithP2PAddressError(t *testing.T) {
cfg := config.Config{
Net: &config.NetConfig{
P2PAddress: "/willerror/0.0.0.0/tcp/9999",
},
}
err := WithConfig(&cfg)(&Options{})
require.Contains(t, err.Error(), "failed to parse multiaddr")
}

func TestWithPrivateKey(t *testing.T) {
key, _, err := crypto.GenerateKeyPair(crypto.Ed25519, 0)
require.NoError(t, err)

opt, err := NewMergedOptions(WithPrivateKey(key))
require.NoError(t, err)
require.NotNil(t, opt)
require.Equal(t, key, opt.PrivateKey)
}

func TestWithPubSub(t *testing.T) {
opt, err := NewMergedOptions(WithPubSub(true))
require.NoError(t, err)
require.NotNil(t, opt)
require.True(t, opt.EnablePubSub)
func TestWithListenAddress(t *testing.T) {
opts := &Options{}
WithListenAddress("/ip4/127.0.0.1/tcp/6666")(opts)
assert.Equal(t, "/ip4/127.0.0.1/tcp/6666", opts.ListenAddress)
}

func TestWithEnableRelay(t *testing.T) {
opt, err := NewMergedOptions(WithEnableRelay(true))
require.NoError(t, err)
require.NotNil(t, opt)
require.True(t, opt.EnableRelay)
opts := &Options{}
WithEnableRelay(true)(opts)
assert.Equal(t, true, opts.EnableRelay)
}

func TestWithListenP2PAddrStringsWithError(t *testing.T) {
addr := "/willerror/0.0.0.0/tcp/9999"
_, err := NewMergedOptions(WithListenP2PAddrStrings(addr))
require.Contains(t, err.Error(), "failed to parse multiaddr")
}

func TestWithListenP2PAddrStrings(t *testing.T) {
addr := "/ip4/0.0.0.0/tcp/9999"
opt, err := NewMergedOptions(WithListenP2PAddrStrings(addr))
require.NoError(t, err)
require.NotNil(t, opt)
require.Equal(t, addr, opt.ListenAddrs[0].String())
}

func TestWithListenAddrs(t *testing.T) {
addr := "/ip4/0.0.0.0/tcp/9999"
a, err := ma.NewMultiaddr(addr)
require.NoError(t, err)

opt, err := NewMergedOptions(WithListenAddrs(a))
require.NoError(t, err)
require.NotNil(t, opt)
require.Equal(t, addr, opt.ListenAddrs[0].String())
func TestWithEnablePubSub(t *testing.T) {
opts := &Options{}
WithEnablePubSub(true)(opts)
assert.Equal(t, true, opts.EnablePubSub)
}
12 changes: 6 additions & 6 deletions net/dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) {
n1, err := NewNode(
ctx,
db1,
WithListenP2PAddrStrings("/ip4/0.0.0.0/tcp/0"),
WithListenAddress("/ip4/0.0.0.0/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewNode(
ctx,
db2,
WithListenP2PAddrStrings("/ip4/0.0.0.0/tcp/0"),
WithListenAddress("/ip4/0.0.0.0/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
Expand All @@ -54,14 +54,14 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) {
n1, err := NewNode(
ctx,
db1,
WithListenP2PAddrStrings("/ip4/0.0.0.0/tcp/0"),
WithListenAddress("/ip4/0.0.0.0/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewNode(
ctx,
db2,
WithListenP2PAddrStrings("/ip4/0.0.0.0/tcp/0"),
WithListenAddress("/ip4/0.0.0.0/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
Expand All @@ -84,14 +84,14 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing
n1, err := NewNode(
ctx,
db1,
WithListenP2PAddrStrings("/ip4/0.0.0.0/tcp/0"),
WithListenAddress("/ip4/0.0.0.0/tcp/0"),
)
assert.NoError(t, err)
defer n1.Close()
n2, err := NewNode(
ctx,
db2,
WithListenP2PAddrStrings("/ip4/0.0.0.0/tcp/0"),
WithListenAddress("/ip4/0.0.0.0/tcp/0"),
)
assert.NoError(t, err)
defer n2.Close()
Expand Down
17 changes: 13 additions & 4 deletions net/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
// @TODO: https://github.com/sourcenetwork/defradb/issues/1902
//nolint:staticcheck
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/logging"
Expand Down Expand Up @@ -78,7 +79,15 @@
db client.DB,
opts ...NodeOpt,
) (*Node, error) {
options, err := NewMergedOptions(opts...)
options := DefaultOptions()
for _, opt := range opts {
opt(options)
}
connManager, err := connmgr.NewConnManager(100, 400, connmgr.WithGracePeriod(time.Second*20))
if err != nil {
return nil, err
}

Check warning on line 89 in net/node.go

View check run for this annotation

Codecov / codecov/patch

net/node.go#L88-L89

Added lines #L88 - L89 were not covered by tests
listenAddress, err := multiaddr.NewMultiaddr(options.ListenAddress)
if err != nil {
return nil, err
}
Expand All @@ -103,10 +112,10 @@
var ddht *dualdht.DHT

libp2pOpts := []libp2p.Option{
libp2p.ConnectionManager(options.ConnManager),
libp2p.ConnectionManager(connManager),
libp2p.DefaultTransports,
libp2p.Identity(options.PrivateKey),
libp2p.ListenAddrs(options.ListenAddrs...),
libp2p.ListenAddrs(listenAddress),
libp2p.Peerstore(peerstore),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
// Delete this line and uncomment the next 6 lines once we remove batchable datastore support.
Expand All @@ -133,7 +142,7 @@
ctx,
"Created LibP2P host",
logging.NewKV("PeerId", h.ID()),
logging.NewKV("Address", options.ListenAddrs),
logging.NewKV("Address", options.ListenAddress),
)

var ps *pubsub.PubSub
Expand Down
Loading
Loading