Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve how buckets are managed in the store #196

Merged
merged 1 commit into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions cmd/internal/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func execPreRun(_ *context.Context, _ *cobra.Command, _ []string) {
runner.RegisterRunner(parallel.NewRunner())
}

//nolint:funlen
// TODO: refactor this function to simplify the logic
//
//nolint:funlen,gocognit
func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, args []string) {
logger := ctx.Logger
if err := verb.Validate(); err != nil {
Expand Down Expand Up @@ -111,9 +113,14 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
if err != nil {
logger.FatalErr(err)
}
if err = store.SetProcessBucketID(ref.String(), false); err != nil {
s, err := store.NewStore()
if err != nil {
logger.FatalErr(err)
}
if _, err = s.CreateAndSetBucket(ref.String()); err != nil {
logger.FatalErr(err)
}
_ = s.Close()
if envMap == nil {
envMap = make(map[string]string)
}
Expand Down Expand Up @@ -143,7 +150,7 @@ func execFunc(ctx *context.Context, cmd *cobra.Command, verb executable.Verb, ar
logger.Errorf("failed clearing process store\n%v", err)
}
if processStore != nil {
if err = processStore.DeleteBucket(); err != nil {
if err = processStore.DeleteBucket(store.EnvironmentBucket()); err != nil {
logger.Errorf("failed clearing process store\n%v", err)
}
_ = processStore.Close()
Expand Down
6 changes: 6 additions & 0 deletions cmd/internal/flags/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,9 @@ var SetSoundNotificationFlag = &Metadata{
Usage: "Update completion sound notification setting",
Default: false,
}

var StoreFullFlag = &Metadata{
Name: "full",
Usage: "Force clear all stored data",
Default: false,
}
32 changes: 24 additions & 8 deletions cmd/internal/store.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package internal

import (
"fmt"
"strings"

"github.com/jahvon/tuikit/views"
"github.com/spf13/cobra"

"github.com/jahvon/flow/cmd/internal/flags"
"github.com/jahvon/flow/internal/context"
"github.com/jahvon/flow/internal/io"
"github.com/jahvon/flow/internal/services/store"
Expand All @@ -25,7 +27,7 @@ func RegisterStoreCmd(ctx *context.Context, rootCmd *cobra.Command) {

func registerStoreSetCmd(ctx *context.Context, rootCmd *cobra.Command) {
subCmd := &cobra.Command{
Use: "set",
Use: "set KEY [VALUE]",
Short: "Set a key-value pair in the data store.",
Long: dataStoreDescription + "This will overwrite any existing value for the key.",
Args: cobra.MinimumNArgs(1),
Expand Down Expand Up @@ -61,15 +63,15 @@ func storeSetFunc(ctx *context.Context, _ *cobra.Command, args []string) {
case len(args) == 2:
value = args[1]
default:
ctx.Logger.Warnx("merging multiple arguments into a single value", "count", len(args))
ctx.Logger.PlainTextWarn(fmt.Sprintf("merging multiple (%d) arguments into a single value", len(args)-1))
value = strings.Join(args[1:], " ")
}

s, err := store.NewStore()
if err != nil {
ctx.Logger.FatalErr(err)
}
if err = s.CreateBucket(); err != nil {
if err = s.CreateBucket(store.EnvironmentBucket()); err != nil {
ctx.Logger.FatalErr(err)
}
defer func() {
Expand All @@ -80,12 +82,12 @@ func storeSetFunc(ctx *context.Context, _ *cobra.Command, args []string) {
if err = s.Set(key, value); err != nil {
ctx.Logger.FatalErr(err)
}
ctx.Logger.Infof("Key %q set in the store", key)
ctx.Logger.PlainTextInfo(fmt.Sprintf("Key %q set in the store", key))
}

func registerStoreGetCmd(ctx *context.Context, rootCmd *cobra.Command) {
subCmd := &cobra.Command{
Use: "get",
Use: "get KEY",
Aliases: []string{"view"},
Short: "Get a value from the data store.",
Long: dataStoreDescription + "This will retrieve the value for the given key.",
Expand All @@ -104,7 +106,7 @@ func storeGetFunc(ctx *context.Context, _ *cobra.Command, args []string) {
if err != nil {
ctx.Logger.FatalErr(err)
}
if err = s.CreateBucket(); err != nil {
if _, err = s.CreateAndSetBucket(store.EnvironmentBucket()); err != nil {
ctx.Logger.FatalErr(err)
}
defer func() {
Expand All @@ -130,15 +132,29 @@ func registerStoreClearCmd(ctx *context.Context, rootCmd *cobra.Command) {
storeClearFunc(ctx, cmd, args)
},
}
RegisterFlag(ctx, subCmd, *flags.StoreFullFlag)
rootCmd.AddCommand(subCmd)
}

func storeClearFunc(ctx *context.Context, _ *cobra.Command, _ []string) {
func storeClearFunc(ctx *context.Context, cmd *cobra.Command, _ []string) {
full := flags.ValueFor[bool](ctx, cmd, *flags.StoreFullFlag, false)
if full {
if err := store.DestroyStore(); err != nil {
ctx.Logger.FatalErr(err)
}
ctx.Logger.PlainTextSuccess("Data store cleared")
return
}
s, err := store.NewStore()
if err != nil {
ctx.Logger.FatalErr(err)
}
if err := s.DeleteBucket(); err != nil {
defer func() {
if err := s.Close(); err != nil {
ctx.Logger.Error(err, "cleanup failure")
}
}()
if err := s.DeleteBucket(store.EnvironmentBucket()); err != nil {
ctx.Logger.FatalErr(err)
}
ctx.Logger.PlainTextSuccess("Data store cleared")
Expand Down
1 change: 1 addition & 0 deletions docs/cli/flow_store_clear.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ flow store clear [flags]
### Options

```
--full Force clear all stored data
-h, --help help for clear
```

Expand Down
2 changes: 1 addition & 1 deletion docs/cli/flow_store_get.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The data store is a key-value store that can be used to persist data across exec
This will retrieve the value for the given key.

```
flow store get [flags]
flow store get KEY [flags]
```

### Options
Expand Down
2 changes: 1 addition & 1 deletion docs/cli/flow_store_set.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ The data store is a key-value store that can be used to persist data across exec
This will overwrite any existing value for the key.

```
flow store set [flags]
flow store set KEY [VALUE] [flags]
```

### Options
Expand Down
22 changes: 22 additions & 0 deletions examples/wip.flow
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# yaml-language-server: $schema=https://flowexec.io/schemas/flowfile_schema.json
visibility: internal
namespace: wip
description: Work-in-progress executables. These may include configurations that are not yet released.
executables:
- verb: run
name: stateful
serial:
params:
- envKey: VAL1
text: value
- envKey: FLOW
text: ../.bin/flow
execs:
- cmd: |
$FLOW store set wip-test1 hello
echo "$VAL1" | $FLOW store set wip-test2
$FLOW store set wip-test3 to be merged
- cmd: |
echo "wip-test1: $($FLOW store get wip-test1)"
echo "wip-test2: $($FLOW store get wip-test2)"
echo "wip-test3: $($FLOW store get wip-test3)"
169 changes: 169 additions & 0 deletions internal/services/store/mocks/mock_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading