Skip to content

Commit

Permalink
Add stream- and consumer balance commands
Browse files Browse the repository at this point in the history
  • Loading branch information
ploubser committed Dec 11, 2024
1 parent 234eda9 commit 5f6fc7f
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 4 deletions.
96 changes: 96 additions & 0 deletions cli/consumer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/google/go-cmp/cmp"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/jsm.go/balancer"
"github.com/nats-io/nats.go"

"github.com/nats-io/jsm.go"
Expand Down Expand Up @@ -297,6 +298,22 @@ func configureConsumerCommand(app commandHost) {
conClusterDown.Arg("stream", "Stream to act on").StringVar(&c.stream)
conClusterDown.Arg("consumer", "Consumer to act on").StringVar(&c.consumer)
conClusterDown.Flag("force", "Force leader step down ignoring current leader").Short('f').UnNegatableBoolVar(&c.force)
conClusterBalance := conCluster.Command("balance", "Balance consumer leaders").Action(c.balanceAction)
conClusterBalance.Arg("stream", "Stream to act on").StringVar(&c.stream)
conClusterBalance.Flag("pull", "Balance only pull based consumers").UnNegatableBoolVar(&c.fPull)
conClusterBalance.Flag("push", "Balance only push based consumers").UnNegatableBoolVar(&c.fPush)
conClusterBalance.Flag("bound", "Balance push-bound or pull consumers with waiting pulls").UnNegatableBoolVar(&c.fBound)
conClusterBalance.Flag("waiting", "Balance consumers with fewer waiting pulls").IntVar(&c.fWaiting)
conClusterBalance.Flag("ack-pending", "Balance consumers with fewer pending acks").IntVar(&c.fAckPending)
conClusterBalance.Flag("pending", "Balance consumers with fewer unprocessed messages").Uint64Var(&c.fPending)
conClusterBalance.Flag("idle", "Balance consumers with no new deliveries for a period").DurationVar(&c.fIdle)
conClusterBalance.Flag("created", "Balance consumers created longer ago than duration").PlaceHolder("DURATION").DurationVar(&c.fCreated)
conClusterBalance.Flag("replicas", "Balance consumers with fewer or equal replicas than the value").PlaceHolder("REPLICAS").UintVar(&c.fReplicas)
conClusterBalance.Flag("leader", "Balance only clustered streams with a specific leader").PlaceHolder("SERVER").StringVar(&c.fLeader)
conClusterBalance.Flag("pinned", "Balance Pinned Client priority group consumers that are fully pinned").UnNegatableBoolVar(&c.fPinned)
conClusterBalance.Flag("invert", "Invert the check - before becomes after, with becomes without").BoolVar(&c.fInvert)
conClusterBalance.Flag("expression", "Balance matching consumers using an expression language").StringVar(&c.fExpression)

}

func init() {
Expand Down Expand Up @@ -573,6 +590,85 @@ func (c *consumerCmd) graphAction(_ *fisk.ParseContext) error {
}
}

func (c *consumerCmd) balanceAction(_ *fisk.ParseContext) error {
var err error
var stream *jsm.Stream

c.connectAndSetup(true, false)

c.stream, stream, err = selectStream(c.mgr, c.stream, c.force, c.showAll)
if err != nil {
return err
}

if stream == nil {
return fmt.Errorf("no stream selected")
}

var opts []jsm.ConsumerQueryOpt
if c.fPush {
opts = append(opts, jsm.ConsumerQueryIsPush())
}
if c.fPull {
opts = append(opts, jsm.ConsumerQueryIsPull())
}
if c.fBound {
opts = append(opts, jsm.ConsumerQueryIsBound())
}
if c.fWaiting > 0 {
opts = append(opts, jsm.ConsumerQueryWithFewerWaiting(c.fWaiting))
}
if c.fAckPending > 0 {
opts = append(opts, jsm.ConsumerQueryWithFewerAckPending(c.fAckPending))
}
if c.fPending > 0 {
opts = append(opts, jsm.ConsumerQueryWithFewerPending(c.fPending))
}
if c.fIdle > 0 {
opts = append(opts, jsm.ConsumerQueryWithDeliverySince(c.fIdle))
}
if c.fCreated > 0 {
opts = append(opts, jsm.ConsumerQueryOlderThan(c.fCreated))
}
if c.fReplicas > 0 {
opts = append(opts, jsm.ConsumerQueryReplicas(c.fReplicas))
}
if c.fInvert {
opts = append(opts, jsm.ConsumerQueryInvert())
}
if c.fExpression != "" {
opts = append(opts, jsm.ConsumerQueryExpression(c.fExpression))
}
if c.fLeader != "" {
opts = append(opts, jsm.ConsumerQueryLeaderServer(c.fLeader))
}
if c.fPinned {
opts = append(opts, jsm.ConsumerQueryIsPinned())
}

consumers, err := stream.QueryConsumers(opts...)
if err != nil {
return err
}

if len(consumers) > 0 {
balancer, err := balancer.New(c.mgr.NatsConn(), api.NewDefaultLogger(api.InfoLevel))
if err != nil {
return err
}

balanced, err := balancer.BalanceConsumers(consumers)
if err != nil {
return fmt.Errorf("failed to balance consumers on %s - %s", c.stream, err)
}
fmt.Printf("Balanced %d consumers on %s\n", balanced, c.stream)

} else {
fmt.Printf("No consumers on %s\n", c.stream)
}
return nil
}

func (c *consumerCmd) leaderStandDownAction(_ *fisk.ParseContext) error {
c.connectAndSetup(true, true)

Expand Down
89 changes: 88 additions & 1 deletion cli/stream_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/jedib0t/go-pretty/v6/progress"
"io"
"math"
"os"
Expand All @@ -31,6 +30,8 @@ import (
"syscall"
"time"

"github.com/jedib0t/go-pretty/v6/progress"

"github.com/nats-io/natscli/internal/asciigraph"
iu "github.com/nats-io/natscli/internal/util"
terminal "golang.org/x/term"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/nats-io/jsm.go"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/jsm.go/balancer"
"github.com/nats-io/nats.go"
"github.com/nats-io/natscli/columns"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -403,6 +405,20 @@ Finding streams with certain subjects configured:
strClusterDown := strCluster.Command("step-down", "Force a new leader election by standing down the current leader").Alias("stepdown").Alias("sd").Alias("elect").Alias("down").Alias("d").Action(c.leaderStandDown)
strClusterDown.Arg("stream", "Stream to act on").StringVar(&c.stream)
strClusterDown.Flag("force", "Force leader step down ignoring current leader").Short('f').UnNegatableBoolVar(&c.force)
strClusterBalance := strCluster.Command("balance", "Balance stream leaders").Action(c.balanceAction)
strClusterBalance.Flag("server-name", "Balance streams present on a regular expression matched server").StringVar(&c.fServer)
strClusterBalance.Flag("cluster", "Balance streams present on a regular expression matched cluster").StringVar(&c.fCluster)
strClusterBalance.Flag("empty", "Balance streams with no messages").UnNegatableBoolVar(&c.fEmpty)
strClusterBalance.Flag("idle", "Balance streams with no new messages or consumer deliveries for a period").PlaceHolder("DURATION").DurationVar(&c.fIdle)
strClusterBalance.Flag("created", "Balance streams created longer ago than duration").PlaceHolder("DURATION").DurationVar(&c.fCreated)
strClusterBalance.Flag("consumers", "Balance streams with fewer consumers than threshold").PlaceHolder("THRESHOLD").Default("-1").IntVar(&c.fConsumers)
strClusterBalance.Flag("subject", "Filters Streams by those with interest matching a subject or wildcard and balances them").StringVar(&c.filterSubject)
strClusterBalance.Flag("replicas", "Balance streams with fewer or equal replicas than the value").PlaceHolder("REPLICAS").UintVar(&c.fReplicas)
strClusterBalance.Flag("sourced", "Balance that sources data from other streams").IsSetByUser(&c.fSourcedSet).UnNegatableBoolVar(&c.fSourced)
strClusterBalance.Flag("mirrored", "Balance that mirrors data from other streams").IsSetByUser(&c.fMirroredSet).UnNegatableBoolVar(&c.fMirrored)
strClusterBalance.Flag("leader", "Balance only clustered streams with a specific leader").PlaceHolder("SERVER").StringVar(&c.fLeader)
strClusterBalance.Flag("invert", "Invert the check - before becomes after, with becomes without").BoolVar(&c.fInvert)
strClusterBalance.Flag("expression", "Balance matching streams using an expression language").StringVar(&c.fExpression)

strClusterRemovePeer := strCluster.Command("peer-remove", "Removes a peer from the Stream cluster").Alias("pr").Action(c.removePeer)
strClusterRemovePeer.Arg("stream", "The stream to act on").StringVar(&c.stream)
Expand Down Expand Up @@ -829,6 +845,77 @@ func (c *streamCmd) loadStream(stream string) (*jsm.Stream, error) {
return c.mgr.LoadStream(stream)
}

func (c *streamCmd) balanceAction(_ *fisk.ParseContext) error {
var err error

c.nc, c.mgr, err = prepareHelper("", natsOpts()...)
if err != nil {
return fmt.Errorf("setup failed: %v", err)
}

var opts []jsm.StreamQueryOpt
if c.fServer != "" {
opts = append(opts, jsm.StreamQueryServerName(c.fServer))
}
if c.fCluster != "" {
opts = append(opts, jsm.StreamQueryClusterName(c.fCluster))
}
if c.fEmpty {
opts = append(opts, jsm.StreamQueryWithoutMessages())
}
if c.fIdle > 0 {
opts = append(opts, jsm.StreamQueryIdleLongerThan(c.fIdle))
}
if c.fCreated > 0 {
opts = append(opts, jsm.StreamQueryOlderThan(c.fCreated))
}
if c.fConsumers >= 0 {
opts = append(opts, jsm.StreamQueryFewerConsumersThan(uint(c.fConsumers)))
}
if c.fInvert {
opts = append(opts, jsm.StreamQueryInvert())
}
if c.filterSubject != "" {
opts = append(opts, jsm.StreamQuerySubjectWildcard(c.filterSubject))
}
if c.fSourcedSet {
opts = append(opts, jsm.StreamQueryIsSourced())
}
if c.fMirroredSet {
opts = append(opts, jsm.StreamQueryIsMirror())
}
if c.fReplicas > 0 {
opts = append(opts, jsm.StreamQueryReplicas(c.fReplicas))
}
if c.fExpression != "" {
opts = append(opts, jsm.StreamQueryExpression(c.fExpression))
}
if c.fLeader != "" {
opts = append(opts, jsm.StreamQueryLeaderServer(c.fLeader))
}

found, err := c.mgr.QueryStreams(opts...)
if err != nil {
return err
}

if len(found) > 0 {
balancer, err := balancer.New(c.mgr.NatsConn(), api.NewDefaultLogger(api.InfoLevel))
if err != nil {
return err
}

balanced, err := balancer.BalanceStreams(found)
if err != nil {
return fmt.Errorf("failed to balance streams - %s", err)
}
fmt.Printf("Balanced %d streams.\n", balanced)

}

return nil
}

func (c *streamCmd) leaderStandDown(_ *fisk.ParseContext) error {
c.connectAndAskStream()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/klauspost/compress v1.17.11
github.com/mattn/go-isatty v0.0.20
github.com/nats-io/jsm.go v0.1.1-0.20241128091442-da16cd219f9b
github.com/nats-io/jsm.go v0.1.1-0.20241211112912-e379cb1c3877
github.com/nats-io/jwt/v2 v2.7.2
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241127165413-cfaad68e19db
github.com/nats-io/nats.go v1.37.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zx
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jsm.go v0.1.1-0.20241128091442-da16cd219f9b h1:LNt3vx1htGxJ+jGw4nZ0xe+zKabImvOBf0XgujSPURc=
github.com/nats-io/jsm.go v0.1.1-0.20241128091442-da16cd219f9b/go.mod h1:JzvMg3G7JtwRnl+ZenOH6NVI73Kuc4kIM1YDRaI4xZs=
github.com/nats-io/jsm.go v0.1.1-0.20241211112912-e379cb1c3877 h1:Q9u7cz8jPvNZOvX9Z5TAk0G2To0iZOnNeUL4mVER7P8=
github.com/nats-io/jsm.go v0.1.1-0.20241211112912-e379cb1c3877/go.mod h1:5Wh2yAEPwB+sIAwS4MShJDw+ToZZmayZeqqK1zh+0xQ=
github.com/nats-io/jwt/v2 v2.7.2 h1:SCRjfDLJ2q8naXp8YlGJJS5/yj3wGSODFYVi4nnwVMw=
github.com/nats-io/jwt/v2 v2.7.2/go.mod h1:kB6QUmqHG6Wdrzj0KP2L+OX4xiTPBeV+NHVstFaATXU=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241127165413-cfaad68e19db h1:XSsKLcdTjNwRhhiPS2G193zgh7yCHjT5IRzRyTuI/Y0=
Expand Down

0 comments on commit 5f6fc7f

Please sign in to comment.