Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Node config #2296

Merged
merged 5 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 12 additions & 37 deletions cli/server_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,65 +11,40 @@
package cli

import (
"fmt"
"os"
"os/signal"

"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/config"
ds "github.com/sourcenetwork/defradb/datastore"
badgerds "github.com/sourcenetwork/defradb/datastore/badger/v4"
"github.com/sourcenetwork/defradb/db"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/logging"
"github.com/sourcenetwork/defradb/node"
)

func MakeServerDumpCmd(cfg *config.Config) *cobra.Command {
var datastore string

cmd := &cobra.Command{
Use: "server-dump",
Short: "Dumps the state of the entire database",
RunE: func(cmd *cobra.Command, _ []string) error {
log.FeedbackInfo(cmd.Context(), "Starting DefraDB process...")

// setup signal handlers
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
log.FeedbackInfo(cmd.Context(), "Dumping DB state...")

var rootstore ds.RootStore
var err error
if datastore == badgerDatastoreName {
info, err := os.Stat(cfg.Datastore.Badger.Path)
exists := (err == nil && info.IsDir())
if !exists {
return errors.New(fmt.Sprintf(
"badger store does not exist at %s. Try with an existing directory",
cfg.Datastore.Badger.Path,
))
}
log.FeedbackInfo(cmd.Context(), "Opening badger store", logging.NewKV("Path", cfg.Datastore.Badger.Path))
rootstore, err = badgerds.NewDatastore(cfg.Datastore.Badger.Path, cfg.Datastore.Badger.Options)
if err != nil {
return errors.Wrap("could not open badger datastore", err)
}
} else {
if cfg.Datastore.Store == config.DatastoreMemory {
return errors.New("server-side dump is only supported for the Badger datastore")
}

storeOpts := []node.StoreOpt{
node.WithPath(cfg.Datastore.Badger.Path),
node.WithInMemory(cfg.Datastore.Store == config.DatastoreMemory),
}
rootstore, err := node.NewStore(storeOpts...)
if err != nil {
return err
}
db, err := db.NewDB(cmd.Context(), rootstore)
if err != nil {
return errors.Wrap("failed to initialize database", err)
}
defer db.Close()

log.FeedbackInfo(cmd.Context(), "Dumping DB state...")
return db.PrintDump(cmd.Context())
},
}
cmd.Flags().StringVar(
&datastore, "store", cfg.Datastore.Store,
"Datastore to use. Options are badger, memory",
)
return cmd
}
246 changes: 79 additions & 167 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,24 @@ package cli
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

badger "github.com/sourcenetwork/badger/v4"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/cobra"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/config"
ds "github.com/sourcenetwork/defradb/datastore"
badgerds "github.com/sourcenetwork/defradb/datastore/badger/v4"
"github.com/sourcenetwork/defradb/db"
"github.com/sourcenetwork/defradb/errors"
httpapi "github.com/sourcenetwork/defradb/http"
"github.com/sourcenetwork/defradb/logging"
"github.com/sourcenetwork/defradb/http"
"github.com/sourcenetwork/defradb/net"
netutils "github.com/sourcenetwork/defradb/net/utils"
"github.com/sourcenetwork/defradb/node"
)

const badgerDatastoreName = "badger"

func MakeStartCommand(cfg *config.Config) *cobra.Command {
var cmd = &cobra.Command{
Use: "start",
Expand All @@ -53,12 +47,86 @@ func MakeStartCommand(cfg *config.Config) *cobra.Command {
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
di, err := start(cmd.Context(), cfg)
dbOpts := []db.Option{
db.WithUpdateEvents(),
db.WithMaxRetries(cfg.Datastore.MaxTxnRetries),
}

netOpts := []net.NodeOpt{
net.WithListenAddresses(cfg.Net.P2PAddresses...),
net.WithEnablePubSub(cfg.Net.PubSubEnabled),
net.WithEnableRelay(cfg.Net.RelayEnabled),
}

serverOpts := []http.ServerOpt{
http.WithAddress(cfg.API.Address),
http.WithAllowedOrigins(cfg.API.AllowedOrigins...),
http.WithTLSCertPath(cfg.API.PubKeyPath),
http.WithTLSKeyPath(cfg.API.PrivKeyPath),
}

storeOpts := []node.StoreOpt{
node.WithPath(cfg.Datastore.Badger.Path),
node.WithInMemory(cfg.Datastore.Store == config.DatastoreMemory),
}

var peers []peer.AddrInfo
if cfg.Net.Peers != "" {
addrs, err := netutils.ParsePeers(strings.Split(cfg.Net.Peers, ","))
if err != nil {
return errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %v", cfg.Net.Peers), err)
}
peers = addrs
}

if cfg.Datastore.Store == "badger" {
// It would be ideal to not have the key path tied to the datastore.
// Running with memory store mode will always generate a random key.
// Adding support for an ephemeral mode and moving the key to the
// config would solve both of these issues.
key, err := loadOrGeneratePrivateKey(filepath.Join(cfg.Rootdir, "data", "key"))
if err != nil {
return err
}
netOpts = append(netOpts, net.WithPrivateKey(key))
}

opts := []node.NodeOpt{
node.WithPeers(peers...),
node.WithStoreOpts(storeOpts...),
node.WithDatabaseOpts(dbOpts...),
node.WithNetOpts(netOpts...),
node.WithServerOpts(serverOpts...),
node.WithDisableP2P(cfg.Net.P2PDisabled),
}

n, err := node.NewNode(cmd.Context(), opts...)
if err != nil {
return err
}

return wait(cmd.Context(), di)
defer func() {
if err := n.Close(cmd.Context()); err != nil {
log.FeedbackErrorE(cmd.Context(), "Stopping DefraDB", err)
}
}()

log.FeedbackInfo(cmd.Context(), "Starting DefraDB")
if err := n.Start(cmd.Context()); err != nil {
return err
}

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

select {
case <-cmd.Context().Done():
log.FeedbackInfo(cmd.Context(), "Received context cancellation; shutting down...")
case <-signalCh:
log.FeedbackInfo(cmd.Context(), "Received interrupt; shutting down...")
}

return nil
},
}

Expand Down Expand Up @@ -162,159 +230,3 @@ func MakeStartCommand(cfg *config.Config) *cobra.Command {
}
return cmd
}

type defraInstance struct {
node *net.Node
db client.DB
server *httpapi.Server
}

func (di *defraInstance) close(ctx context.Context) {
if di.node != nil {
di.node.Close()
} else {
di.db.Close()
}
if err := di.server.Shutdown(ctx); err != nil {
log.FeedbackInfo(
ctx,
"The server could not be closed successfully",
logging.NewKV("Error", err.Error()),
)
}
}

func start(ctx context.Context, cfg *config.Config) (*defraInstance, error) {
log.FeedbackInfo(ctx, "Starting DefraDB service...")

var rootstore ds.RootStore

var err error
if cfg.Datastore.Store == badgerDatastoreName {
log.FeedbackInfo(ctx, "Opening badger store", logging.NewKV("Path", cfg.Datastore.Badger.Path))
rootstore, err = badgerds.NewDatastore(
cfg.Datastore.Badger.Path,
cfg.Datastore.Badger.Options,
)
} else if cfg.Datastore.Store == "memory" {
log.FeedbackInfo(ctx, "Building new memory store")
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
rootstore, err = badgerds.NewDatastore("", &opts)
}

if err != nil {
return nil, errors.Wrap("failed to open datastore", err)
}

options := []db.Option{
db.WithUpdateEvents(),
db.WithMaxRetries(cfg.Datastore.MaxTxnRetries),
}

db, err := db.NewDB(ctx, rootstore, options...)
if err != nil {
return nil, errors.Wrap("failed to create database", err)
}

// init the p2p node
var node *net.Node
if !cfg.Net.P2PDisabled {
nodeOpts := []net.NodeOpt{
net.WithListenAddresses(cfg.Net.P2PAddresses...),
net.WithEnablePubSub(cfg.Net.PubSubEnabled),
net.WithEnableRelay(cfg.Net.RelayEnabled),
}
if cfg.Datastore.Store == badgerDatastoreName {
// It would be ideal to not have the key path tied to the datastore.
// Running with memory store mode will always generate a random key.
// Adding support for an ephemeral mode and moving the key to the
// config would solve both of these issues.
key, err := loadOrGeneratePrivateKey(filepath.Join(cfg.Rootdir, "data", "key"))
if err != nil {
return nil, err
}
nodeOpts = append(nodeOpts, net.WithPrivateKey(key))
}
log.FeedbackInfo(ctx, "Starting P2P node", logging.NewKV("P2P addresses", cfg.Net.P2PAddresses))
node, err = net.NewNode(ctx, db, nodeOpts...)
if err != nil {
db.Close()
return nil, errors.Wrap("failed to start P2P node", err)
}

// parse peers and bootstrap
if len(cfg.Net.Peers) != 0 {
log.Debug(ctx, "Parsing bootstrap peers", logging.NewKV("Peers", cfg.Net.Peers))
addrs, err := netutils.ParsePeers(strings.Split(cfg.Net.Peers, ","))
if err != nil {
return nil, errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %v", cfg.Net.Peers), err)
}
log.Debug(ctx, "Bootstrapping with peers", logging.NewKV("Addresses", addrs))
node.Bootstrap(addrs)
}

if err := node.Start(); err != nil {
node.Close()
return nil, errors.Wrap("failed to start P2P listeners", err)
}
}

serverOpts := []httpapi.ServerOpt{
httpapi.WithAddress(cfg.API.Address),
httpapi.WithAllowedOrigins(cfg.API.AllowedOrigins...),
httpapi.WithTLSCertPath(cfg.API.PubKeyPath),
httpapi.WithTLSKeyPath(cfg.API.PrivKeyPath),
}

var handler *httpapi.Handler
if node != nil {
handler, err = httpapi.NewHandler(node)
} else {
handler, err = httpapi.NewHandler(db)
}
if err != nil {
return nil, errors.Wrap("failed to create http handler", err)
}
server, err := httpapi.NewServer(handler, serverOpts...)
if err != nil {
return nil, errors.Wrap("failed to create http server", err)
}

// run the server in a separate goroutine
go func() {
log.FeedbackInfo(ctx, fmt.Sprintf("Providing HTTP API at %s.", cfg.API.AddressToURL()))
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.FeedbackErrorE(ctx, "Failed to run the HTTP server", err)
if node != nil {
node.Close()
} else {
db.Close()
}
os.Exit(1)
}
}()

return &defraInstance{
node: node,
db: db,
server: server,
}, nil
}

// wait waits for an interrupt signal to close the program.
func wait(ctx context.Context, di *defraInstance) error {
// setup signal handlers
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

select {
case <-ctx.Done():
log.FeedbackInfo(ctx, "Received context cancellation; closing database...")
di.close(ctx)
return ctx.Err()
case <-signalCh:
log.FeedbackInfo(ctx, "Received interrupt; closing database...")
di.close(ctx)
return ctx.Err()
}
}
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const (
logLevelInfo = "info"
logLevelError = "error"
logLevelFatal = "fatal"
DatastoreMemory = "memory"
)

// Config is DefraDB's main configuration struct, embedding component-specific config structs.
Expand Down
Loading
Loading