Skip to content

Commit

Permalink
Merge branch 'develop' into nasdf/refactor/config
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Feb 12, 2024
2 parents 42296e4 + 86610d7 commit 19359da
Show file tree
Hide file tree
Showing 13 changed files with 1,394 additions and 1,153 deletions.
55 changes: 15 additions & 40 deletions cli/server_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,66 +11,41 @@
package cli

import (
"fmt"
"os"
"os/signal"

"github.com/spf13/cobra"
"github.com/spf13/viper"

ds "github.com/sourcenetwork/defradb/datastore"
badgerds "github.com/sourcenetwork/defradb/datastore/badger/v4"
"github.com/sourcenetwork/defradb/db"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/logging"
"github.com/sourcenetwork/defradb/node"
)

func MakeServerDumpCmd() *cobra.Command {
var datastore string

func MakeServerDumpCmd(cfg *viper.Viper) *cobra.Command {
cmd := &cobra.Command{
Use: "server-dump",
Short: "Dumps the state of the entire database",
RunE: func(cmd *cobra.Command, _ []string) error {
cfg := mustGetConfigContext(cmd)
log.FeedbackInfo(cmd.Context(), "Starting DefraDB process...")

// setup signal handlers
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
log.FeedbackInfo(cmd.Context(), "Dumping DB state...")

var rootstore ds.RootStore
var err error
if datastore == badgerDatastoreName {
badgerPath := cfg.GetString("datastore.badger.path")
info, err := os.Stat(badgerPath)
exists := (err == nil && info.IsDir())
if !exists {
return errors.New(fmt.Sprintf(
"badger store does not exist at %s. Try with an existing directory",
badgerPath,
))
}
log.FeedbackInfo(cmd.Context(), "Opening badger store", logging.NewKV("Path", badgerPath))
rootstore, err = badgerds.NewDatastore(badgerPath, &badgerds.DefaultOptions)
if err != nil {
return errors.Wrap("could not open badger datastore", err)
}
} else {
datastore := cfg.GetString("datastore.store")
if datastore == "memory" {
return errors.New("server-side dump is only supported for the Badger datastore")
}

storeOpts := []node.StoreOpt{
node.WithPath(cfg.GetString("datastore.badger.path")),
node.WithInMemory(datastore == "memory"),
}
rootstore, err := node.NewStore(storeOpts...)
if err != nil {
return err
}
db, err := db.NewDB(cmd.Context(), rootstore)
if err != nil {
return errors.Wrap("failed to initialize database", err)
}
defer db.Close()

log.FeedbackInfo(cmd.Context(), "Dumping DB state...")
return db.PrintDump(cmd.Context())
},
}
cmd.Flags().StringVar(
&datastore, "store", "badger",
"Datastore to use. Options are badger, memory",
)
return cmd
}
239 changes: 76 additions & 163 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,26 @@
package cli

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

badger "github.com/sourcenetwork/badger/v4"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/spf13/cobra"
"github.com/spf13/viper"

"github.com/sourcenetwork/defradb/client"
ds "github.com/sourcenetwork/defradb/datastore"
badgerds "github.com/sourcenetwork/defradb/datastore/badger/v4"
"github.com/sourcenetwork/defradb/db"
"github.com/sourcenetwork/defradb/errors"
httpapi "github.com/sourcenetwork/defradb/http"
"github.com/sourcenetwork/defradb/logging"
"github.com/sourcenetwork/defradb/http"
"github.com/sourcenetwork/defradb/net"
netutils "github.com/sourcenetwork/defradb/net/utils"
"github.com/sourcenetwork/defradb/node"
)

const badgerDatastoreName = "badger"

func MakeStartCommand() *cobra.Command {
func MakeStartCommand(cfg *viper.Viper) *cobra.Command {
var cmd = &cobra.Command{
Use: "start",
Short: "Start a DefraDB node",
Expand All @@ -47,172 +40,92 @@ func MakeStartCommand() *cobra.Command {
return setConfigContext(cmd)
},
RunE: func(cmd *cobra.Command, args []string) error {
cfg := mustGetConfigContext(cmd)

di, err := start(cmd.Context(), cfg)
if err != nil {
return err
dbOpts := []db.Option{
db.WithUpdateEvents(),
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
}

return wait(cmd.Context(), di)
},
}

return cmd
}

type defraInstance struct {
node *net.Node
db client.DB
server *httpapi.Server
}

func (di *defraInstance) close(ctx context.Context) {
if di.node != nil {
di.node.Close()
} else {
di.db.Close()
}
if err := di.server.Shutdown(ctx); err != nil {
log.FeedbackInfo(
ctx,
"The server could not be closed successfully",
logging.NewKV("Error", err.Error()),
)
}
}
netOpts := []net.NodeOpt{
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
net.WithEnablePubSub(cfg.GetBool("net.pubSubEnabled")),
net.WithEnableRelay(cfg.GetBool("net.relayEnabled")),
}

func start(ctx context.Context, cfg *viper.Viper) (*defraInstance, error) {
log.FeedbackInfo(ctx, "Starting DefraDB service...")

var rootstore ds.RootStore

var err error
if cfg.Datastore.Store == badgerDatastoreName {
log.FeedbackInfo(ctx, "Opening badger store", logging.NewKV("Path", cfg.Datastore.Badger.Path))
rootstore, err = badgerds.NewDatastore(
cfg.Datastore.Badger.Path,
cfg.Datastore.Badger.Options,
)
} else if cfg.Datastore.Store == "memory" {
log.FeedbackInfo(ctx, "Building new memory store")
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
rootstore, err = badgerds.NewDatastore("", &opts)
}
serverOpts := []http.ServerOpt{
http.WithAddress(cfg.GetString("api.address")),
http.WithAllowedOrigins(cfg.GetStringSlice("api.allowed-origins")...),
http.WithTLSCertPath(cfg.GetString("api.pubKeyPath")),
http.WithTLSKeyPath(cfg.GetString("api.privKeyPath")),
}

if err != nil {
return nil, errors.Wrap("failed to open datastore", err)
}
storeOpts := []node.StoreOpt{
node.WithPath(cfg.GetString("datastore.badger.path")),
node.WithInMemory(cfg.GetString("datastore.store") == "memory"),
}

options := []db.Option{
db.WithUpdateEvents(),
db.WithMaxRetries(cfg.Datastore.MaxTxnRetries),
}
var peers []peer.AddrInfo
if val := cfg.GetString("net.peers"); val != "" {
addrs, err := netutils.ParsePeers(strings.Split(val, ","))
if err != nil {
return errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %s", val), err)
}
peers = addrs
}

db, err := db.NewDB(ctx, rootstore, options...)
if err != nil {
return nil, errors.Wrap("failed to create database", err)
}
if cfg.GetString("datastore.store") == "badger" {
// It would be ideal to not have the key path tied to the datastore.
// Running with memory store mode will always generate a random key.
// Adding support for an ephemeral mode and moving the key to the
// config would solve both of these issues.
rootdir, err := cmd.PersistentFlags().GetString("rootdir")
if err != nil {
return err
}
key, err := loadOrGeneratePrivateKey(filepath.Join(rootdir, "data", "key"))
if err != nil {
return err
}
netOpts = append(netOpts, net.WithPrivateKey(key))
}

// init the p2p node
var node *net.Node
if !cfg.GetBool("net.p2pdisabled") {
nodeOpts := []net.NodeOpt{
net.WithListenAddresses(cfg.GetStringSlice("net.p2paddresses")...),
net.WithEnablePubSub(cfg.GetBool("net.pubsubenabled")),
net.WithEnableRelay(cfg.GetBool("net.relayenabled")),
}
if cfg.GetString("datastore.store") == badgerDatastoreName {
// It would be ideal to not have the key path tied to the datastore.
// Running with memory store mode will always generate a random key.
// Adding support for an ephemeral mode and moving the key to the
// config would solve both of these issues.
key, err := loadOrGeneratePrivateKey(filepath.Join(cfg.Rootdir, "data", "key"))
if err != nil {
return nil, err
opts := []node.NodeOpt{
node.WithPeers(peers...),
node.WithStoreOpts(storeOpts...),
node.WithDatabaseOpts(dbOpts...),
node.WithNetOpts(netOpts...),
node.WithServerOpts(serverOpts...),
node.WithDisableP2P(cfg.GetBool("net.p2pDisabled")),
}
nodeOpts = append(nodeOpts, net.WithPrivateKey(key))
}
log.FeedbackInfo(ctx, "Starting P2P node", logging.NewKV("P2P addresses", cfg.Net.P2PAddresses))
node, err = net.NewNode(ctx, db, nodeOpts...)
if err != nil {
db.Close()
return nil, errors.Wrap("failed to start P2P node", err)
}

// parse peers and bootstrap
if len(cfg.Net.Peers) != 0 {
log.Debug(ctx, "Parsing bootstrap peers", logging.NewKV("Peers", cfg.Net.Peers))
addrs, err := netutils.ParsePeers(strings.Split(cfg.Net.Peers, ","))

n, err := node.NewNode(cmd.Context(), opts...)
if err != nil {
return nil, errors.Wrap(fmt.Sprintf("failed to parse bootstrap peers %v", cfg.Net.Peers), err)
return err
}
log.Debug(ctx, "Bootstrapping with peers", logging.NewKV("Addresses", addrs))
node.Bootstrap(addrs)
}

if err := node.Start(); err != nil {
node.Close()
return nil, errors.Wrap("failed to start P2P listeners", err)
}
}

serverOpts := []httpapi.ServerOpt{
httpapi.WithAddress(cfg.API.Address),
httpapi.WithAllowedOrigins(cfg.API.AllowedOrigins...),
httpapi.WithTLSCertPath(cfg.API.PubKeyPath),
httpapi.WithTLSKeyPath(cfg.API.PrivKeyPath),
}
defer func() {
if err := n.Close(cmd.Context()); err != nil {
log.FeedbackErrorE(cmd.Context(), "Stopping DefraDB", err)
}
}()

var handler *httpapi.Handler
if node != nil {
handler, err = httpapi.NewHandler(node)
} else {
handler, err = httpapi.NewHandler(db)
}
if err != nil {
return nil, errors.Wrap("failed to create http handler", err)
}
server, err := httpapi.NewServer(handler, serverOpts...)
if err != nil {
return nil, errors.Wrap("failed to create http server", err)
}
log.FeedbackInfo(cmd.Context(), "Starting DefraDB")
if err := n.Start(cmd.Context()); err != nil {
return err
}

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

// run the server in a separate goroutine
go func() {
log.FeedbackInfo(ctx, fmt.Sprintf("Providing HTTP API at %s.", cfg.API.AddressToURL()))
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.FeedbackErrorE(ctx, "Failed to run the HTTP server", err)
if node != nil {
node.Close()
} else {
db.Close()
select {
case <-cmd.Context().Done():
log.FeedbackInfo(cmd.Context(), "Received context cancellation; shutting down...")
case <-signalCh:
log.FeedbackInfo(cmd.Context(), "Received interrupt; shutting down...")
}
os.Exit(1)
}
}()

return &defraInstance{
node: node,
db: db,
server: server,
}, nil
}

// wait waits for an interrupt signal to close the program.
func wait(ctx context.Context, di *defraInstance) error {
// setup signal handlers
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)

select {
case <-ctx.Done():
log.FeedbackInfo(ctx, "Received context cancellation; closing database...")
di.close(ctx)
return ctx.Err()
case <-signalCh:
log.FeedbackInfo(ctx, "Received interrupt; closing database...")
di.close(ctx)
return ctx.Err()
return nil
},
}

return cmd
}
5 changes: 5 additions & 0 deletions cli/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"encoding/json"
"os"
"path/filepath"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -215,6 +216,10 @@ func generatePrivateKey(path string) (crypto.PrivKey, error) {
if err != nil {
return nil, err
}
err = os.MkdirAll(filepath.Dir(path), 0755)
if err != nil {
return nil, err
}
return key, os.WriteFile(path, data, 0644)
}

Expand Down
Loading

0 comments on commit 19359da

Please sign in to comment.