Skip to content

Commit

Permalink
Merge pull request #964 from ripienaar/kv_compress
Browse files Browse the repository at this point in the history
Support KV compression, also use nats.go key lister
  • Loading branch information
ripienaar authored Jan 12, 2024
2 parents 4d826f1 + aa1f341 commit b4fd0e9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 23 deletions.
45 changes: 23 additions & 22 deletions cli/kv_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type kvCommand struct {
mirror string
mirrorDomain string
sources []string
compression bool
}

func configureKVCommand(app commandHost) {
Expand All @@ -82,6 +83,7 @@ for an indefinite period or a per-bucket configured TTL.
add.Flag("max-bucket-size", "Maximum size for the bucket").PlaceHolder("BYTES").StringVar(&c.maxBucketSizeString)
add.Flag("description", "A description for the bucket").StringVar(&c.description)
add.Flag("storage", "Storage backend to use (file, memory)").EnumVar(&c.storage, "file", "f", "memory", "m")
add.Flag("compress", "Compress the bucket data").BoolVar(&c.compression)
add.Flag("tags", "Place the bucket on servers that has specific tags").StringsVar(&c.placementTags)
add.Flag("cluster", "Place the bucket on a specific cluster").StringVar(&c.placementCluster)
add.Flag("republish-source", "Republish messages to --republish-destination").PlaceHolder("SRC").StringVar(&c.repubSource)
Expand Down Expand Up @@ -207,32 +209,36 @@ func (c *kvCommand) lsBucketKeys() error {
return fmt.Errorf("unable to load bucket: %s", err)
}

keys, err := kv.Keys()
lister, err := kv.ListKeys()
if err != nil {
if err == nats.ErrNoKeysFound {
fmt.Println("No keys found in bucket")
return nil
}

return fmt.Errorf("unable to fetch keys in bucket: %s", err)
return err
}

var found bool
if c.lsVerbose {
if err := c.displayKeyInfo(kv, keys); err != nil {
found, err = c.displayKeyInfo(kv, lister)
if err != nil {
return fmt.Errorf("unable to display key info: %s", err)
}
} else {
for _, v := range keys {
for v := range lister.Keys() {
found = true
fmt.Println(v)
}
}
if !found {
fmt.Println("No keys found in bucket")
return nil
}

return nil
}

func (c *kvCommand) displayKeyInfo(kv nats.KeyValue, keys []string) error {
func (c *kvCommand) displayKeyInfo(kv nats.KeyValue, keys nats.KeyLister) (bool, error) {
var found bool

if kv == nil {
return errors.New("key value cannot be nil")
return found, errors.New("key value cannot be nil")
}

table := newTableWriter(fmt.Sprintf("Contents for bucket '%s'", c.bucket))
Expand All @@ -243,10 +249,11 @@ func (c *kvCommand) displayKeyInfo(kv nats.KeyValue, keys []string) error {
table.AddHeaders("Key", "Created", "Delta", "Revision")
}

for _, keyName := range keys {
for keyName := range keys.Keys() {
found = true
kve, err := kv.Get(keyName)
if err != nil {
return fmt.Errorf("unable to fetch key %s: %s", keyName, err)
return found, fmt.Errorf("unable to fetch key %s: %s", keyName, err)
}

row := []interface{}{
Expand All @@ -265,7 +272,7 @@ func (c *kvCommand) displayKeyInfo(kv nats.KeyValue, keys []string) error {

fmt.Println(table.Render())

return nil
return found, nil
}

func (c *kvCommand) lsBuckets() error {
Expand Down Expand Up @@ -449,6 +456,7 @@ func (c *kvCommand) addAction(_ *fisk.ParseContext) error {
Storage: storage,
Replicas: int(c.replicas),
Placement: placement,
Compression: c.compression,
}

if c.repubDest != "" {
Expand Down Expand Up @@ -747,6 +755,7 @@ func (c *kvCommand) showStatus(store nats.KeyValue) error {
cols.AddRow("Bucket Name", status.Bucket())
cols.AddRow("History Kept", status.History())
cols.AddRow("Values Stored", status.Values())
cols.AddRow("Compressed", status.IsCompressed())
cols.AddRow("Backing Store Kind", status.BackingStore())

if nfo != nil {
Expand Down Expand Up @@ -813,14 +822,6 @@ func (c *kvCommand) showStatus(store nats.KeyValue) error {
cols.AddSectionTitle("Cluster Information")
renderNatsGoClusterInfo(cols, nfo)
}

if !nfo.Config.AllowRollup || nfo.Config.Discard != nats.DiscardNew {
cols.Println()
cols.Println("Warning the bucket if not compatible with the latest")
cols.Println("configuration format and needs a configuration upgrade.")
cols.Println()
cols.Println("Please run: nats kv upgrade ", status.Bucket())
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/nats-io/jsm.go v0.1.1-0.20240111112330-8508bd502b64
github.com/nats-io/jwt/v2 v2.5.3
github.com/nats-io/nats-server/v2 v2.10.9
github.com/nats-io/nats.go v1.31.0
github.com/nats-io/nats.go v1.32.0
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
github.com/prometheus/client_golang v1.18.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ github.com/nats-io/nats-server/v2 v2.10.9 h1:VEW43Zz+p+9lARtiPM9ctd6ckun+92ZT2T1
github.com/nats-io/nats-server/v2 v2.10.9/go.mod h1:oorGiV9j3BOLLO3ejQe+U7pfAGyPo+ppD7rpgNF6KTQ=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nsc/v2 v2.8.6-0.20231117160437-b49262c20be8 h1:MqR1Fp9vHrcKcs3BMRmEnHaF7m+fcWiKJEWT6cInMNM=
Expand Down

0 comments on commit b4fd0e9

Please sign in to comment.