Skip to content

Commit

Permalink
fix: improve how buckets are managed in the store (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
jahvon authored Nov 17, 2024
1 parent 4ab1e10 commit 1a71adf
Show file tree
Hide file tree
Showing 10 changed files with 424 additions and 79 deletions.
13 changes: 10 additions & 3 deletions cmd/internal/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func execPreRun(_ *context.Context, _ *cobra.Command, _ []string) {
runner.RegisterRunner(parallel.NewRunner())
}

//nolint:funlen
// TODO: refactor this function to simplify the logic
//
//nolint:funlen,gocognit
func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, args []string) {
logger := ctx.Logger
if err := verb.Validate(); err != nil {
Expand Down Expand Up @@ -111,9 +113,14 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
if err != nil {
logger.FatalErr(err)
}
if err = store.SetProcessBucketID(ref.String(), false); err != nil {
s, err := store.NewStore()
if err != nil {
logger.FatalErr(err)
}
if _, err = s.CreateAndSetBucket(ref.String()); err != nil {
logger.FatalErr(err)
}
_ = s.Close()
if envMap == nil {
envMap = make(map[string]string)
}
Expand Down Expand Up @@ -143,7 +150,7 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
logger.Errorf("failed clearing process store\n%v", err)
}
if processStore != nil {
if err = processStore.DeleteBucket(); err != nil {
if err = processStore.DeleteBucket(store.EnvironmentBucket()); err != nil {
logger.Errorf("failed clearing process store\n%v", err)
}
_ = processStore.Close()
Expand Down
6 changes: 6 additions & 0 deletions cmd/internal/flags/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,9 @@ var SetSoundNotificationFlag = &Metadata{
Usage: "Update completion sound notification setting",
Default: false,
}

var StoreFullFlag = &Metadata{
Name: "full",
Usage: "Force clear all stored data",
Default: false,
}
32 changes: 24 additions & 8 deletions cmd/internal/store.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package internal

import (
"fmt"
"strings"

"github.com/jahvon/tuikit/views"
"github.com/spf13/cobra"

"github.com/jahvon/flow/cmd/internal/flags"
"github.com/jahvon/flow/internal/context"
"github.com/jahvon/flow/internal/io"
"github.com/jahvon/flow/internal/services/store"
Expand All @@ -25,7 +27,7 @@ func RegisterStoreCmd(ctx *context.Context, rootCmd *cobra.Command) {

func registerStoreSetCmd(ctx *context.Context, rootCmd *cobra.Command) {
subCmd := &cobra.Command{
Use: "set",
Use: "set KEY [VALUE]",
Short: "Set a key-value pair in the data store.",
Long: dataStoreDescription + "This will overwrite any existing value for the key.",
Args: cobra.MinimumNArgs(1),
Expand Down Expand Up @@ -61,15 +63,15 @@ func storeSetFunc(ctx *context.Context, _ *cobra.Command, args []string) {
case len(args) == 2:
value = args[1]
default:
ctx.Logger.Warnx("merging multiple arguments into a single value", "count", len(args))
ctx.Logger.PlainTextWarn(fmt.Sprintf("merging multiple (%d) arguments into a single value", len(args)-1))
value = strings.Join(args[1:], " ")
}

s, err := store.NewStore()
if err != nil {
ctx.Logger.FatalErr(err)
}
if err = s.CreateBucket(); err != nil {
if err = s.CreateBucket(store.EnvironmentBucket()); err != nil {
ctx.Logger.FatalErr(err)
}
defer func() {
Expand All @@ -80,12 +82,12 @@ func storeSetFunc(ctx *context.Context, _ *cobra.Command, args []string) {
if err = s.Set(key, value); err != nil {
ctx.Logger.FatalErr(err)
}
ctx.Logger.Infof("Key %q set in the store", key)
ctx.Logger.PlainTextInfo(fmt.Sprintf("Key %q set in the store", key))
}

func registerStoreGetCmd(ctx *context.Context, rootCmd *cobra.Command) {
subCmd := &cobra.Command{
Use: "get",
Use: "get KEY",
Aliases: []string{"view"},
Short: "Get a value from the data store.",
Long: dataStoreDescription + "This will retrieve the value for the given key.",
Expand All @@ -104,7 +106,7 @@ func storeGetFunc(ctx *context.Context, _ *cobra.Command, args []string) {
if err != nil {
ctx.Logger.FatalErr(err)
}
if err = s.CreateBucket(); err != nil {
if _, err = s.CreateAndSetBucket(store.EnvironmentBucket()); err != nil {
ctx.Logger.FatalErr(err)
}
defer func() {
Expand All @@ -130,15 +132,29 @@ func registerStoreClearCmd(ctx *context.Context, rootCmd *cobra.Command) {
storeClearFunc(ctx, cmd, args)
},
}
RegisterFlag(ctx, subCmd, *flags.StoreFullFlag)
rootCmd.AddCommand(subCmd)
}

func storeClearFunc(ctx *context.Context, _ *cobra.Command, _ []string) {
func storeClearFunc(ctx *context.Context, cmd *cobra.Command, _ []string) {
full := flags.ValueFor[bool](ctx, cmd, *flags.StoreFullFlag, false)
if full {
if err := store.DestroyStore(); err != nil {
ctx.Logger.FatalErr(err)
}
ctx.Logger.PlainTextSuccess("Data store cleared")
return
}
s, err := store.NewStore()
if err != nil {
ctx.Logger.FatalErr(err)
}
if err := s.DeleteBucket(); err != nil {
defer func() {
if err := s.Close(); err != nil {
ctx.Logger.Error(err, "cleanup failure")
}
}()
if err := s.DeleteBucket(store.EnvironmentBucket()); err != nil {
ctx.Logger.FatalErr(err)
}
ctx.Logger.PlainTextSuccess("Data store cleared")
Expand Down
1 change: 1 addition & 0 deletions docs/cli/flow_store_clear.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ flow store clear [flags]
### Options

```
--full Force clear all stored data
-h, --help help for clear
```

Expand Down
2 changes: 1 addition & 1 deletion docs/cli/flow_store_get.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The data store is a key-value store that can be used to persist data across exec
This will retrieve the value for the given key.

```
flow store get [flags]
flow store get KEY [flags]
```

### Options
Expand Down
2 changes: 1 addition & 1 deletion docs/cli/flow_store_set.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The data store is a key-value store that can be used to persist data across exec
This will overwrite any existing value for the key.

```
flow store set [flags]
flow store set KEY [VALUE] [flags]
```

### Options
Expand Down
22 changes: 22 additions & 0 deletions examples/wip.flow
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# yaml-language-server: $schema=https://flowexec.io/schemas/flowfile_schema.json
visibility: internal
namespace: wip
description: Work-in-progress executables. These may include configurations that are not yet released.
executables:
- verb: run
name: stateful
serial:
params:
- envKey: VAL1
text: value
- envKey: FLOW
text: ../.bin/flow
execs:
- cmd: |
$FLOW store set wip-test1 hello
echo "$VAL1" | $FLOW store set wip-test2
$FLOW store set wip-test3 to be merged
- cmd: |
echo "wip-test1: $($FLOW store get wip-test1)"
echo "wip-test2: $($FLOW store get wip-test2)"
echo "wip-test3: $($FLOW store get wip-test3)"
169 changes: 169 additions & 0 deletions internal/services/store/mocks/mock_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1a71adf

Please sign in to comment.