diff --git a/cli/server_dump.go b/cli/server_dump.go index 0ba638d268..c57299dafc 100644 --- a/cli/server_dump.go +++ b/cli/server_dump.go @@ -11,65 +11,40 @@ package cli import ( - "fmt" - "os" - "os/signal" - "github.com/spf13/cobra" "github.com/sourcenetwork/defradb/config" - ds "github.com/sourcenetwork/defradb/datastore" - badgerds "github.com/sourcenetwork/defradb/datastore/badger/v4" "github.com/sourcenetwork/defradb/db" "github.com/sourcenetwork/defradb/errors" - "github.com/sourcenetwork/defradb/logging" + "github.com/sourcenetwork/defradb/node" ) func MakeServerDumpCmd(cfg *config.Config) *cobra.Command { - var datastore string - cmd := &cobra.Command{ Use: "server-dump", Short: "Dumps the state of the entire database", RunE: func(cmd *cobra.Command, _ []string) error { - log.FeedbackInfo(cmd.Context(), "Starting DefraDB process...") - - // setup signal handlers - signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, os.Interrupt) + log.FeedbackInfo(cmd.Context(), "Dumping DB state...") - var rootstore ds.RootStore - var err error - if datastore == badgerDatastoreName { - info, err := os.Stat(cfg.Datastore.Badger.Path) - exists := (err == nil && info.IsDir()) - if !exists { - return errors.New(fmt.Sprintf( - "badger store does not exist at %s. Try with an existing directory", - cfg.Datastore.Badger.Path, - )) - } - log.FeedbackInfo(cmd.Context(), "Opening badger store", logging.NewKV("Path", cfg.Datastore.Badger.Path)) - rootstore, err = badgerds.NewDatastore(cfg.Datastore.Badger.Path, cfg.Datastore.Badger.Options) - if err != nil { - return errors.Wrap("could not open badger datastore", err) - } - } else { + if cfg.Datastore.Store == config.DatastoreMemory { return errors.New("server-side dump is only supported for the Badger datastore") } - + storeOpts := []node.StoreOpt{ + node.WithPath(cfg.Datastore.Badger.Path), + node.WithInMemory(cfg.Datastore.Store == config.DatastoreMemory), + } + rootstore, err := node.NewStore(storeOpts...) + if err != nil { + return err + } db, err := db.NewDB(cmd.Context(), rootstore) if err != nil { return errors.Wrap("failed to initialize database", err) } + defer db.Close() - log.FeedbackInfo(cmd.Context(), "Dumping DB state...") return db.PrintDump(cmd.Context()) }, } - cmd.Flags().StringVar( - &datastore, "store", cfg.Datastore.Store, - "Datastore to use. Options are badger, memory", - ) return cmd } diff --git a/cli/start.go b/cli/start.go index 0c344ea94c..f3e12fb8a7 100644 --- a/cli/start.go +++ b/cli/start.go @@ -13,30 +13,24 @@ package cli import ( "context" "fmt" - "net/http" "os" "os/signal" "path/filepath" "strings" "syscall" - badger "github.com/sourcenetwork/badger/v4" + "github.com/libp2p/go-libp2p/core/peer" "github.com/spf13/cobra" - "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/config" - ds "github.com/sourcenetwork/defradb/datastore" - badgerds "github.com/sourcenetwork/defradb/datastore/badger/v4" "github.com/sourcenetwork/defradb/db" "github.com/sourcenetwork/defradb/errors" - httpapi "github.com/sourcenetwork/defradb/http" - "github.com/sourcenetwork/defradb/logging" + "github.com/sourcenetwork/defradb/http" "github.com/sourcenetwork/defradb/net" netutils "github.com/sourcenetwork/defradb/net/utils" + "github.com/sourcenetwork/defradb/node" ) -const badgerDatastoreName = "badger" - func MakeStartCommand(cfg *config.Config) *cobra.Command { var cmd = &cobra.Command{ Use: "start", @@ -53,12 +47,86 @@ func MakeStartCommand(cfg *config.Config) *cobra.Command { return nil }, RunE: func(cmd *cobra.Command, args []string) error { - di, err := start(cmd.Context(), cfg) + dbOpts := []db.Option{ + db.WithUpdateEvents(), + db.WithMaxRetries(cfg.Datastore.MaxTxnRetries), + } + + netOpts := []net.NodeOpt{ + net.WithListenAddresses(cfg.Net.P2PAddresses...), + net.WithEnablePubSub(cfg.Net.PubSubEnabled), + net.WithEnableRelay(cfg.Net.RelayEnabled), + } + + serverOpts := []http.ServerOpt{ + http.WithAddress(cfg.API.Address), + http.WithAllowedOrigins(cfg.API.AllowedOrigins...), + http.WithTLSCertPath(cfg.API.PubKeyPath), + http.WithTLSKeyPath(cfg.API.PrivKeyPath), + } + + storeOpts := []node.StoreOpt{ + node.WithPath(cfg.Datastore.Badger.Path), + node.WithInMemory(cfg.Datastore.Store == config.DatastoreMemory), + } + + var peers []peer.AddrInfo + if cfg.Net.Peers != "" { + addrs, err := netutils.ParsePeers(strings.Split(cfg.Net.Peers, ",")) + if err != nil { + return errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %v", cfg.Net.Peers), err) + } + peers = addrs + } + + if cfg.Datastore.Store == "badger" { + // It would be ideal to not have the key path tied to the datastore. + // Running with memory store mode will always generate a random key. + // Adding support for an ephemeral mode and moving the key to the + // config would solve both of these issues. + key, err := loadOrGeneratePrivateKey(filepath.Join(cfg.Rootdir, "data", "key")) + if err != nil { + return err + } + netOpts = append(netOpts, net.WithPrivateKey(key)) + } + + opts := []node.NodeOpt{ + node.WithPeers(peers...), + node.WithStoreOpts(storeOpts...), + node.WithDatabaseOpts(dbOpts...), + node.WithNetOpts(netOpts...), + node.WithServerOpts(serverOpts...), + node.WithDisableP2P(cfg.Net.P2PDisabled), + } + + n, err := node.NewNode(cmd.Context(), opts...) if err != nil { return err } - return wait(cmd.Context(), di) + defer func() { + if err := n.Close(cmd.Context()); err != nil { + log.FeedbackErrorE(cmd.Context(), "Stopping DefraDB", err) + } + }() + + log.FeedbackInfo(cmd.Context(), "Starting DefraDB") + if err := n.Start(cmd.Context()); err != nil { + return err + } + + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + + select { + case <-cmd.Context().Done(): + log.FeedbackInfo(cmd.Context(), "Received context cancellation; shutting down...") + case <-signalCh: + log.FeedbackInfo(cmd.Context(), "Received interrupt; shutting down...") + } + + return nil }, } @@ -162,159 +230,3 @@ func MakeStartCommand(cfg *config.Config) *cobra.Command { } return cmd } - -type defraInstance struct { - node *net.Node - db client.DB - server *httpapi.Server -} - -func (di *defraInstance) close(ctx context.Context) { - if di.node != nil { - di.node.Close() - } else { - di.db.Close() - } - if err := di.server.Shutdown(ctx); err != nil { - log.FeedbackInfo( - ctx, - "The server could not be closed successfully", - logging.NewKV("Error", err.Error()), - ) - } -} - -func start(ctx context.Context, cfg *config.Config) (*defraInstance, error) { - log.FeedbackInfo(ctx, "Starting DefraDB service...") - - var rootstore ds.RootStore - - var err error - if cfg.Datastore.Store == badgerDatastoreName { - log.FeedbackInfo(ctx, "Opening badger store", logging.NewKV("Path", cfg.Datastore.Badger.Path)) - rootstore, err = badgerds.NewDatastore( - cfg.Datastore.Badger.Path, - cfg.Datastore.Badger.Options, - ) - } else if cfg.Datastore.Store == "memory" { - log.FeedbackInfo(ctx, "Building new memory store") - opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} - rootstore, err = badgerds.NewDatastore("", &opts) - } - - if err != nil { - return nil, errors.Wrap("failed to open datastore", err) - } - - options := []db.Option{ - db.WithUpdateEvents(), - db.WithMaxRetries(cfg.Datastore.MaxTxnRetries), - } - - db, err := db.NewDB(ctx, rootstore, options...) - if err != nil { - return nil, errors.Wrap("failed to create database", err) - } - - // init the p2p node - var node *net.Node - if !cfg.Net.P2PDisabled { - nodeOpts := []net.NodeOpt{ - net.WithListenAddresses(cfg.Net.P2PAddresses...), - net.WithEnablePubSub(cfg.Net.PubSubEnabled), - net.WithEnableRelay(cfg.Net.RelayEnabled), - } - if cfg.Datastore.Store == badgerDatastoreName { - // It would be ideal to not have the key path tied to the datastore. - // Running with memory store mode will always generate a random key. - // Adding support for an ephemeral mode and moving the key to the - // config would solve both of these issues. - key, err := loadOrGeneratePrivateKey(filepath.Join(cfg.Rootdir, "data", "key")) - if err != nil { - return nil, err - } - nodeOpts = append(nodeOpts, net.WithPrivateKey(key)) - } - log.FeedbackInfo(ctx, "Starting P2P node", logging.NewKV("P2P addresses", cfg.Net.P2PAddresses)) - node, err = net.NewNode(ctx, db, nodeOpts...) - if err != nil { - db.Close() - return nil, errors.Wrap("failed to start P2P node", err) - } - - // parse peers and bootstrap - if len(cfg.Net.Peers) != 0 { - log.Debug(ctx, "Parsing bootstrap peers", logging.NewKV("Peers", cfg.Net.Peers)) - addrs, err := netutils.ParsePeers(strings.Split(cfg.Net.Peers, ",")) - if err != nil { - return nil, errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %v", cfg.Net.Peers), err) - } - log.Debug(ctx, "Bootstrapping with peers", logging.NewKV("Addresses", addrs)) - node.Bootstrap(addrs) - } - - if err := node.Start(); err != nil { - node.Close() - return nil, errors.Wrap("failed to start P2P listeners", err) - } - } - - serverOpts := []httpapi.ServerOpt{ - httpapi.WithAddress(cfg.API.Address), - httpapi.WithAllowedOrigins(cfg.API.AllowedOrigins...), - httpapi.WithTLSCertPath(cfg.API.PubKeyPath), - httpapi.WithTLSKeyPath(cfg.API.PrivKeyPath), - } - - var handler *httpapi.Handler - if node != nil { - handler, err = httpapi.NewHandler(node) - } else { - handler, err = httpapi.NewHandler(db) - } - if err != nil { - return nil, errors.Wrap("failed to create http handler", err) - } - server, err := httpapi.NewServer(handler, serverOpts...) - if err != nil { - return nil, errors.Wrap("failed to create http server", err) - } - - // run the server in a separate goroutine - go func() { - log.FeedbackInfo(ctx, fmt.Sprintf("Providing HTTP API at %s.", cfg.API.AddressToURL())) - if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - log.FeedbackErrorE(ctx, "Failed to run the HTTP server", err) - if node != nil { - node.Close() - } else { - db.Close() - } - os.Exit(1) - } - }() - - return &defraInstance{ - node: node, - db: db, - server: server, - }, nil -} - -// wait waits for an interrupt signal to close the program. -func wait(ctx context.Context, di *defraInstance) error { - // setup signal handlers - signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) - - select { - case <-ctx.Done(): - log.FeedbackInfo(ctx, "Received context cancellation; closing database...") - di.close(ctx) - return ctx.Err() - case <-signalCh: - log.FeedbackInfo(ctx, "Received interrupt; closing database...") - di.close(ctx) - return ctx.Err() - } -} diff --git a/cli/utils.go b/cli/utils.go index 8c1a40dc1f..b8b102c0c8 100644 --- a/cli/utils.go +++ b/cli/utils.go @@ -14,6 +14,7 @@ import ( "context" "encoding/json" "os" + "path/filepath" "github.com/libp2p/go-libp2p/core/crypto" "github.com/spf13/cobra" @@ -138,6 +139,10 @@ func generatePrivateKey(path string) (crypto.PrivKey, error) { if err != nil { return nil, err } + err = os.MkdirAll(filepath.Dir(path), 0755) + if err != nil { + return nil, err + } return key, os.WriteFile(path, data, 0644) } diff --git a/config/config.go b/config/config.go index 23fd2d43fe..52c93c8372 100644 --- a/config/config.go +++ b/config/config.go @@ -72,6 +72,7 @@ const ( logLevelInfo = "info" logLevelError = "error" logLevelFatal = "fatal" + DatastoreMemory = "memory" ) // Config is DefraDB's main configuration struct, embedding component-specific config structs. diff --git a/node/node.go b/node/node.go new file mode 100644 index 0000000000..f13fd0b1c2 --- /dev/null +++ b/node/node.go @@ -0,0 +1,183 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package node + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/peer" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/db" + "github.com/sourcenetwork/defradb/http" + "github.com/sourcenetwork/defradb/logging" + "github.com/sourcenetwork/defradb/net" +) + +var log = logging.MustNewLogger("node") + +// Options contains start configuration values. +type Options struct { + storeOpts []StoreOpt + dbOpts []db.Option + netOpts []net.NodeOpt + serverOpts []http.ServerOpt + peers []peer.AddrInfo + disableP2P bool + disableAPI bool +} + +// DefaultOptions returns options with default settings. +func DefaultOptions() *Options { + return &Options{} +} + +// NodeOpt is a function for setting configuration values. +type NodeOpt func(*Options) + +// WithStoreOpts sets the store options. +func WithStoreOpts(opts ...StoreOpt) NodeOpt { + return func(o *Options) { + o.storeOpts = opts + } +} + +// WithDatabaseOpts sets the database options. +func WithDatabaseOpts(opts ...db.Option) NodeOpt { + return func(o *Options) { + o.dbOpts = opts + } +} + +// WithNetOpts sets the net / p2p options. +func WithNetOpts(opts ...net.NodeOpt) NodeOpt { + return func(o *Options) { + o.netOpts = opts + } +} + +// WithServerOpts sets the api server options. +func WithServerOpts(opts ...http.ServerOpt) NodeOpt { + return func(o *Options) { + o.serverOpts = opts + } +} + +// WithDisableP2P sets the disable p2p flag. +func WithDisableP2P(disable bool) NodeOpt { + return func(o *Options) { + o.disableP2P = disable + } +} + +// WithDisableAPI sets the disable api flag. +func WithDisableAPI(disable bool) NodeOpt { + return func(o *Options) { + o.disableAPI = disable + } +} + +// WithPeers sets the bootstrap peers. +func WithPeers(peers ...peer.AddrInfo) NodeOpt { + return func(o *Options) { + o.peers = peers + } +} + +// Node is a DefraDB instance with optional sub-systems. +type Node struct { + DB client.DB + Node *net.Node + Server *http.Server +} + +// NewNode returns a new node instance configured with the given options. +func NewNode(ctx context.Context, opts ...NodeOpt) (*Node, error) { + options := DefaultOptions() + for _, opt := range opts { + opt(options) + } + rootstore, err := NewStore(options.storeOpts...) + if err != nil { + return nil, err + } + db, err := db.NewDB(ctx, rootstore, options.dbOpts...) + if err != nil { + return nil, err + } + + var node *net.Node + if !options.disableP2P { + // setup net node + node, err = net.NewNode(ctx, db, options.netOpts...) + if err != nil { + return nil, err + } + if len(options.peers) > 0 { + node.Bootstrap(options.peers) + } + } + + var server *http.Server + if !options.disableAPI { + // setup http server + var handler *http.Handler + if node != nil { + handler, err = http.NewHandler(node) + } else { + handler, err = http.NewHandler(db) + } + if err != nil { + return nil, err + } + server, err = http.NewServer(handler, options.serverOpts...) + if err != nil { + return nil, err + } + } + + return &Node{ + DB: db, + Node: node, + Server: server, + }, nil +} + +// Start starts the node sub-systems. +func (n *Node) Start(ctx context.Context) error { + if n.Node != nil { + if err := n.Node.Start(); err != nil { + return err + } + } + if n.Server != nil { + go func() { + if err := n.Server.ListenAndServe(); err != nil { + log.FeedbackErrorE(ctx, "HTTP server stopped", err) + } + }() + } + return nil +} + +// Close stops the node sub-systems. +func (n *Node) Close(ctx context.Context) error { + var err error + if n.Server != nil { + err = n.Server.Shutdown(ctx) + } + if n.Node != nil { + n.Node.Close() + } else { + n.DB.Close() + } + return err +} diff --git a/node/node_test.go b/node/node_test.go new file mode 100644 index 0000000000..3f3c7c854f --- /dev/null +++ b/node/node_test.go @@ -0,0 +1,101 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package node + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/sourcenetwork/defradb/db" + "github.com/sourcenetwork/defradb/http" + "github.com/sourcenetwork/defradb/net" +) + +func TestWithStoreOpts(t *testing.T) { + storeOpts := []StoreOpt{WithPath("test")} + + options := &Options{} + WithStoreOpts(storeOpts...)(options) + assert.Equal(t, storeOpts, options.storeOpts) +} + +func TestWithDatabaseOpts(t *testing.T) { + dbOpts := []db.Option{db.WithMaxRetries(10)} + + options := &Options{} + WithDatabaseOpts(dbOpts...)(options) + assert.Equal(t, dbOpts, options.dbOpts) +} + +func TestWithNetOpts(t *testing.T) { + netOpts := []net.NodeOpt{net.WithEnablePubSub(true)} + + options := &Options{} + WithNetOpts(netOpts...)(options) + assert.Equal(t, netOpts, options.netOpts) +} + +func TestWithServerOpts(t *testing.T) { + serverOpts := []http.ServerOpt{http.WithAddress("127.0.0.1:8080")} + + options := &Options{} + WithServerOpts(serverOpts...)(options) + assert.Equal(t, serverOpts, options.serverOpts) +} + +func TestWithDisableP2P(t *testing.T) { + options := &Options{} + WithDisableP2P(true)(options) + assert.Equal(t, true, options.disableP2P) +} + +func TestWithDisableAPI(t *testing.T) { + options := &Options{} + WithDisableAPI(true)(options) + assert.Equal(t, true, options.disableAPI) +} + +func TestWithPeers(t *testing.T) { + peer, err := peer.AddrInfoFromString("/ip4/127.0.0.1/tcp/9000/p2p/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N") + require.NoError(t, err) + + options := &Options{} + WithPeers(*peer)(options) + + require.Len(t, options.peers, 1) + assert.Equal(t, *peer, options.peers[0]) +} + +func TestNodeStart(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opts := []NodeOpt{ + WithStoreOpts(WithPath(t.TempDir())), + WithDatabaseOpts(db.WithUpdateEvents()), + } + + node, err := NewNode(ctx, opts...) + require.NoError(t, err) + + err = node.Start(ctx) + require.NoError(t, err) + + <-time.After(5 * time.Second) + + err = node.Close(ctx) + require.NoError(t, err) +} diff --git a/node/store.go b/node/store.go new file mode 100644 index 0000000000..6d05954662 --- /dev/null +++ b/node/store.go @@ -0,0 +1,78 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package node + +import ( + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/datastore/badger/v4" +) + +// StoreOptions contains store configuration values. +type StoreOptions struct { + path string + inMemory bool + valueLogFileSize int64 + encryptionKey []byte +} + +// DefaultStoreOptions returns new options with default values. +func DefaultStoreOptions() *StoreOptions { + return &StoreOptions{ + inMemory: false, + valueLogFileSize: 1 << 30, + } +} + +// StoreOpt is a function for setting configuration values. +type StoreOpt func(*StoreOptions) + +// WithInMemory sets the in memory flag. +func WithInMemory(inMemory bool) StoreOpt { + return func(o *StoreOptions) { + o.inMemory = inMemory + } +} + +// WithPath sets the datastore path. +func WithPath(path string) StoreOpt { + return func(o *StoreOptions) { + o.path = path + } +} + +// WithValueLogFileSize sets the badger value log file size. +func WithValueLogFileSize(size int64) StoreOpt { + return func(o *StoreOptions) { + o.valueLogFileSize = size + } +} + +// WithEncryptionKey sets the badger encryption key. +func WithEncryptionKey(encryptionKey []byte) StoreOpt { + return func(o *StoreOptions) { + o.encryptionKey = encryptionKey + } +} + +// NewStore returns a new store with the given options. +func NewStore(opts ...StoreOpt) (datastore.RootStore, error) { + options := DefaultStoreOptions() + for _, opt := range opts { + opt(options) + } + + badgerOpts := badger.DefaultOptions + badgerOpts.InMemory = options.inMemory + badgerOpts.ValueLogFileSize = options.valueLogFileSize + badgerOpts.EncryptionKey = options.encryptionKey + + return badger.NewDatastore(options.path, &badgerOpts) +} diff --git a/node/store_test.go b/node/store_test.go new file mode 100644 index 0000000000..69ce98e952 --- /dev/null +++ b/node/store_test.go @@ -0,0 +1,47 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package node + +import ( + "crypto/rand" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWithInMemory(t *testing.T) { + options := &StoreOptions{} + WithInMemory(true)(options) + assert.Equal(t, true, options.inMemory) +} + +func TestWithPath(t *testing.T) { + options := &StoreOptions{} + WithPath("tmp")(options) + assert.Equal(t, "tmp", options.path) +} + +func TestWithValueLogFileSize(t *testing.T) { + options := &StoreOptions{} + WithValueLogFileSize(int64(5 << 30))(options) + assert.Equal(t, int64(5<<30), options.valueLogFileSize) +} + +func TestWithEncryptionKey(t *testing.T) { + encryptionKey := make([]byte, 32) + _, err := rand.Read(encryptionKey) + require.NoError(t, err) + + options := &StoreOptions{} + WithEncryptionKey(encryptionKey)(options) + assert.Equal(t, encryptionKey, options.encryptionKey) +}