Skip to content

Commit

Permalink
[FEAT] simple flag to enable jetstream tier
Browse files Browse the repository at this point in the history
Fix #631
  • Loading branch information
aricart committed Mar 5, 2024
1 parent 5c51c37 commit 68f9d8b
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 21 deletions.
120 changes: 99 additions & 21 deletions cmd/editaccount.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func createEditAccount() *cobra.Command {

cmd.Flags().StringVarP(&params.AccountContextParams.Name, "name", "n", "", "account to edit")
cmd.Flags().BoolVarP(&params.disableJetStream, "js-disable", "", false, "disables all JetStream limits in the account by deleting any limits")
cmd.Flags().IntVarP(&params.enableJetStream, "js-enable", "", -1, "enables JetStream for the specified tier")

params.signingKeys.BindFlags("sk", "", nkeys.PrefixByteAccount, cmd)
params.TimeParams.BindFlags(cmd)
Expand Down Expand Up @@ -168,6 +169,7 @@ type EditAccountParams struct {
signingKeys SigningKeysParams
rmSigningKeys []string
disableJetStream bool
enableJetStream int
traceContextSubject string
traceContextSampling NumberParams
}
Expand All @@ -180,14 +182,17 @@ func (p *EditAccountParams) SetDefaults(ctx ActionCtx) error {
p.SignerParams.SetDefaults(nkeys.PrefixByteOperator, true, ctx)

hasDeleteTier := ctx.AnySet("rm-js-tier")
p.hasJSSetParams = ctx.AnySet("js-tier", "js-mem-storage",
p.hasJSSetParams = ctx.AnySet("js-tier",
"js-mem-storage",
"js-disk-storage",
"js-streams",
"js-consumer",
"js-max-mem-stream",
"js-max-disk-stream",
"js-max-bytes-required",
"js-max-ack-pending")
"js-max-ack-pending",
"js-enable",
)

if hasDeleteTier && p.hasJSSetParams {
return fmt.Errorf("rm-js-tier is exclusive of all other js options")
Expand All @@ -197,6 +202,27 @@ func (p *EditAccountParams) SetDefaults(ctx ActionCtx) error {
return fmt.Errorf("js-disable is exclusive of all other js options")
}

hasEnableTier := ctx.AnySet("js-enable")
p.hasJSSetParams = ctx.AnySet("js-tier",
"js-mem-storage",
"js-disk-storage",
"js-streams",
"js-consumer",
"js-max-mem-stream",
"js-max-disk-stream",
"js-max-bytes-required",
"js-max-ack-pending",
"rm-js-tier",
)

if hasEnableTier && p.hasJSSetParams {
return fmt.Errorf("js-enable is exclusive of all other js options")
}

if p.disableJetStream && (p.hasJSSetParams || hasEnableTier) {
return fmt.Errorf("js-enable is exclusive of all other js options")
}

if !InteractiveFlag && ctx.NothingToDo(
"start", "expiry", "tag", "rm-tag", "conns", "leaf-conns", "exports", "imports", "subscriptions",
"payload", "data", "wildcard-exports", "sk", "rm-sk", "description", "info-url", "response-ttl", "allow-pub-response",
Expand All @@ -213,6 +239,7 @@ func (p *EditAccountParams) SetDefaults(ctx ActionCtx) error {
"js-max-bytes-required",
"js-max-ack-pending",
"js-disable",
"js-enable",
"trace-context-subject",
"trace-context-sampling",
) {
Expand Down Expand Up @@ -604,6 +631,7 @@ func (p *EditAccountParams) checkSystemAccount(ctx ActionCtx) error {
if p.MaxAckPending > 0 {
mustUnset = append(mustUnset, "--js-max-ack-pending")
}

if len(mustUnset) == 0 {
// no user specified values set - so set to zero/false
// reset all the flags to zero
Expand Down Expand Up @@ -682,6 +710,20 @@ func (p *EditAccountParams) Validate(ctx ActionCtx) error {
}
}

if p.enableJetStream > -1 {
tier, err := p.getTierLimits(p.enableJetStream)
if err != nil {
return err
}
if jsLimitsSet(tier) {
label := "global"
if p.enableJetStream > 0 {
label = fmt.Sprintf("R%d", p.enableJetStream)
}
return fmt.Errorf("jetstream tier %s is already enabled", label)
}
}

return nil
}

Expand All @@ -694,8 +736,8 @@ func (p *EditAccountParams) applyLimits(ctx ActionCtx, r *store.Report) error {
}
params := &p.JetStreamLimitParams

// on delete we don't honor any of the JS options
if p.DeleteTier != -1 {
// on delete or enable we don't honor any of the JS options
if p.DeleteTier != -1 || p.enableJetStream != -1 {
params.MemMaxStreamBytes = 0
params.DiskMaxStreamBytes = 0
params.MaxAckPending = 0
Expand All @@ -706,27 +748,30 @@ func (p *EditAccountParams) applyLimits(ctx ActionCtx, r *store.Report) error {
params.MaxBytesRequired = false
}

switch p.DeleteTier {
case -1:
break
case 0:
// values are zeroed by the params which are zeroed above
p.claim.Limits.JetStreamLimits = jwt.JetStreamLimits{}
r.AddOK("deleted global limit")
default:
if p.claim.Limits.JetStreamTieredLimits != nil {
label := fmt.Sprintf("R%d", p.DeleteTier)
_, ok := p.claim.Limits.JetStreamTieredLimits[label]
if ok {
delete(p.claim.Limits.JetStreamTieredLimits, label)
r.AddOK("deleted tier limit %s", label)
if p.DeleteTier != -1 {
switch p.DeleteTier {
case -1:
break
case 0:
// values are zeroed by the params which are zeroed above
p.claim.Limits.JetStreamLimits = jwt.JetStreamLimits{}
r.AddOK("deleted global limit")
default:
if p.claim.Limits.JetStreamTieredLimits != nil {
label := fmt.Sprintf("R%d", p.DeleteTier)
_, ok := p.claim.Limits.JetStreamTieredLimits[label]
if ok {
delete(p.claim.Limits.JetStreamTieredLimits, label)
r.AddOK("deleted tier limit %s", label)
} else {
return fmt.Errorf("account doesn't have tier %s limit", label)
}
} else {
return fmt.Errorf("account doesn't have tier %s limit", label)
return errors.New("account doesn't have tier limits")
}
} else {
return errors.New("account doesn't have tier limits")
}
}

if p.DeleteTier != -1 {
return nil
}
Expand All @@ -735,6 +780,39 @@ func (p *EditAccountParams) applyLimits(ctx ActionCtx, r *store.Report) error {
return p.doDisableJetStream(r)
}

if p.enableJetStream != -1 {
switch p.enableJetStream {
case -1:
break
case 0:
// values are zeroed by the params which are zeroed above
p.claim.Limits.JetStreamLimits = jwt.JetStreamLimits{
DiskStorage: -1,
MemoryStorage: -1,
}
r.AddOK("enabled global limit")
default:
label := fmt.Sprintf("R%d", p.enableJetStream)
_, ok := p.claim.Limits.JetStreamTieredLimits[label]
if ok {
return fmt.Errorf("tier limit %s is already enabled", label)
} else {
if p.claim.Limits.JetStreamTieredLimits == nil {
p.claim.Limits.JetStreamTieredLimits = make(map[string]jwt.JetStreamLimits)
}
p.claim.Limits.JetStreamTieredLimits[label] = jwt.JetStreamLimits{
DiskStorage: -1,
MemoryStorage: -1,
}
r.AddOK("enabled tier limit %s", label)
}
}
}

if p.enableJetStream != -1 {
return nil
}

label := p.tierLabel()

limits.Streams = params.Streams.Int64()
Expand Down
48 changes: 48 additions & 0 deletions cmd/editaccount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,3 +475,51 @@ func Test_TracingSubject(t *testing.T) {
require.NoError(t, err)
require.Nil(t, ac.Trace)
}

func Test_EnableTier(t *testing.T) {
ts := NewTestStore(t, "O")
defer ts.Done(t)
ts.AddAccount(t, "A")

ac, err := ts.Store.ReadAccountClaim("A")
require.NoError(t, err)
require.Equal(t, ac.Limits.JetStreamLimits, jwt.JetStreamLimits{})

_, _, err = ExecuteCmd(createEditAccount(), "A", "--js-enable", "0")
require.NoError(t, err)

ac, err = ts.Store.ReadAccountClaim("A")
require.NoError(t, err)
require.Equal(t, ac.Limits.JetStreamLimits, jwt.JetStreamLimits{DiskStorage: -1, MemoryStorage: -1})
}

func Test_EnableTierDoesntClobber(t *testing.T) {
ts := NewTestStore(t, "O")
defer ts.Done(t)
ts.AddAccount(t, "A")

ac, err := ts.Store.ReadAccountClaim("A")
require.NoError(t, err)
require.Equal(t, ac.Limits.JetStreamLimits, jwt.JetStreamLimits{})

_, _, err = ExecuteCmd(createEditAccount(), "A", "--js-enable", "0")
require.NoError(t, err)

_, _, err = ExecuteCmd(createEditAccount(), "A", "--js-enable", "0")
require.Error(t, err)
require.Equal(t, "jetstream tier global is already enabled", err.Error())
}

func Test_EnableTierNoOtherFlag(t *testing.T) {
ts := NewTestStore(t, "O")
defer ts.Done(t)
ts.AddAccount(t, "A")

ac, err := ts.Store.ReadAccountClaim("A")
require.NoError(t, err)
require.Equal(t, ac.Limits.JetStreamLimits, jwt.JetStreamLimits{})

_, _, err = ExecuteCmd(createEditAccount(), "A", "--js-enable", "0", "--rm-js-tier", "0")
require.Error(t, err)
require.Equal(t, "rm-js-tier is exclusive of all other js options", err.Error())
}

0 comments on commit 68f9d8b

Please sign in to comment.