From 5f6fc7fae4ad97da9b007e1a6b45a04f06ca28fe Mon Sep 17 00:00:00 2001 From: Pieter Loubser Date: Fri, 6 Dec 2024 12:50:12 +0000 Subject: [PATCH] Add stream- and consumer balance commands --- cli/consumer_command.go | 96 +++++++++++++++++++++++++++++++++++++++++ cli/stream_command.go | 89 +++++++++++++++++++++++++++++++++++++- go.mod | 2 +- go.sum | 4 +- 4 files changed, 187 insertions(+), 4 deletions(-) diff --git a/cli/consumer_command.go b/cli/consumer_command.go index b768dc63..07b8dc2b 100644 --- a/cli/consumer_command.go +++ b/cli/consumer_command.go @@ -40,6 +40,7 @@ import ( "github.com/dustin/go-humanize" "github.com/google/go-cmp/cmp" "github.com/nats-io/jsm.go/api" + "github.com/nats-io/jsm.go/balancer" "github.com/nats-io/nats.go" "github.com/nats-io/jsm.go" @@ -297,6 +298,22 @@ func configureConsumerCommand(app commandHost) { conClusterDown.Arg("stream", "Stream to act on").StringVar(&c.stream) conClusterDown.Arg("consumer", "Consumer to act on").StringVar(&c.consumer) conClusterDown.Flag("force", "Force leader step down ignoring current leader").Short('f').UnNegatableBoolVar(&c.force) + conClusterBalance := conCluster.Command("balance", "Balance consumer leaders").Action(c.balanceAction) + conClusterBalance.Arg("stream", "Stream to act on").StringVar(&c.stream) + conClusterBalance.Flag("pull", "Balance only pull based consumers").UnNegatableBoolVar(&c.fPull) + conClusterBalance.Flag("push", "Balance only push based consumers").UnNegatableBoolVar(&c.fPush) + conClusterBalance.Flag("bound", "Balance push-bound or pull consumers with waiting pulls").UnNegatableBoolVar(&c.fBound) + conClusterBalance.Flag("waiting", "Balance consumers with fewer waiting pulls").IntVar(&c.fWaiting) + conClusterBalance.Flag("ack-pending", "Balance consumers with fewer pending acks").IntVar(&c.fAckPending) + conClusterBalance.Flag("pending", "Balance consumers with fewer unprocessed messages").Uint64Var(&c.fPending) + conClusterBalance.Flag("idle", "Balance consumers with no new deliveries for a period").DurationVar(&c.fIdle) + conClusterBalance.Flag("created", "Balance consumers created longer ago than duration").PlaceHolder("DURATION").DurationVar(&c.fCreated) + conClusterBalance.Flag("replicas", "Balance consumers with fewer or equal replicas than the value").PlaceHolder("REPLICAS").UintVar(&c.fReplicas) + conClusterBalance.Flag("leader", "Balance only clustered streams with a specific leader").PlaceHolder("SERVER").StringVar(&c.fLeader) + conClusterBalance.Flag("pinned", "Balance Pinned Client priority group consumers that are fully pinned").UnNegatableBoolVar(&c.fPinned) + conClusterBalance.Flag("invert", "Invert the check - before becomes after, with becomes without").BoolVar(&c.fInvert) + conClusterBalance.Flag("expression", "Balance matching consumers using an expression language").StringVar(&c.fExpression) + } func init() { @@ -573,6 +590,85 @@ func (c *consumerCmd) graphAction(_ *fisk.ParseContext) error { } } +func (c *consumerCmd) balanceAction(_ *fisk.ParseContext) error { + var err error + var stream *jsm.Stream + + c.connectAndSetup(true, false) + + c.stream, stream, err = selectStream(c.mgr, c.stream, c.force, c.showAll) + if err != nil { + return err + } + + if stream == nil { + return fmt.Errorf("no stream selected") + } + + var opts []jsm.ConsumerQueryOpt + if c.fPush { + opts = append(opts, jsm.ConsumerQueryIsPush()) + } + if c.fPull { + opts = append(opts, jsm.ConsumerQueryIsPull()) + } + if c.fBound { + opts = append(opts, jsm.ConsumerQueryIsBound()) + } + if c.fWaiting > 0 { + opts = append(opts, jsm.ConsumerQueryWithFewerWaiting(c.fWaiting)) + } + if c.fAckPending > 0 { + opts = append(opts, jsm.ConsumerQueryWithFewerAckPending(c.fAckPending)) + } + if c.fPending > 0 { + opts = append(opts, jsm.ConsumerQueryWithFewerPending(c.fPending)) + } + if c.fIdle > 0 { + opts = append(opts, jsm.ConsumerQueryWithDeliverySince(c.fIdle)) + } + if c.fCreated > 0 { + opts = append(opts, jsm.ConsumerQueryOlderThan(c.fCreated)) + } + if c.fReplicas > 0 { + opts = append(opts, jsm.ConsumerQueryReplicas(c.fReplicas)) + } + if c.fInvert { + opts = append(opts, jsm.ConsumerQueryInvert()) + } + if c.fExpression != "" { + opts = append(opts, jsm.ConsumerQueryExpression(c.fExpression)) + } + if c.fLeader != "" { + opts = append(opts, jsm.ConsumerQueryLeaderServer(c.fLeader)) + } + if c.fPinned { + opts = append(opts, jsm.ConsumerQueryIsPinned()) + } + + consumers, err := stream.QueryConsumers(opts...) + if err != nil { + return err + } + + if len(consumers) > 0 { + balancer, err := balancer.New(c.mgr.NatsConn(), api.NewDefaultLogger(api.InfoLevel)) + if err != nil { + return err + } + + balanced, err := balancer.BalanceConsumers(consumers) + if err != nil { + return fmt.Errorf("failed to balance consumers on %s - %s", c.stream, err) + } + fmt.Printf("Balanced %d consumers on %s\n", balanced, c.stream) + + } else { + fmt.Printf("No consumers on %s\n", c.stream) + } + return nil +} + func (c *consumerCmd) leaderStandDownAction(_ *fisk.ParseContext) error { c.connectAndSetup(true, true) diff --git a/cli/stream_command.go b/cli/stream_command.go index 2b142fd2..78a94797 100644 --- a/cli/stream_command.go +++ b/cli/stream_command.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/jedib0t/go-pretty/v6/progress" "io" "math" "os" @@ -31,6 +30,8 @@ import ( "syscall" "time" + "github.com/jedib0t/go-pretty/v6/progress" + "github.com/nats-io/natscli/internal/asciigraph" iu "github.com/nats-io/natscli/internal/util" terminal "golang.org/x/term" @@ -42,6 +43,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/nats-io/jsm.go" "github.com/nats-io/jsm.go/api" + "github.com/nats-io/jsm.go/balancer" "github.com/nats-io/nats.go" "github.com/nats-io/natscli/columns" "gopkg.in/yaml.v3" @@ -403,6 +405,20 @@ Finding streams with certain subjects configured: strClusterDown := strCluster.Command("step-down", "Force a new leader election by standing down the current leader").Alias("stepdown").Alias("sd").Alias("elect").Alias("down").Alias("d").Action(c.leaderStandDown) strClusterDown.Arg("stream", "Stream to act on").StringVar(&c.stream) strClusterDown.Flag("force", "Force leader step down ignoring current leader").Short('f').UnNegatableBoolVar(&c.force) + strClusterBalance := strCluster.Command("balance", "Balance stream leaders").Action(c.balanceAction) + strClusterBalance.Flag("server-name", "Balance streams present on a regular expression matched server").StringVar(&c.fServer) + strClusterBalance.Flag("cluster", "Balance streams present on a regular expression matched cluster").StringVar(&c.fCluster) + strClusterBalance.Flag("empty", "Balance streams with no messages").UnNegatableBoolVar(&c.fEmpty) + strClusterBalance.Flag("idle", "Balance streams with no new messages or consumer deliveries for a period").PlaceHolder("DURATION").DurationVar(&c.fIdle) + strClusterBalance.Flag("created", "Balance streams created longer ago than duration").PlaceHolder("DURATION").DurationVar(&c.fCreated) + strClusterBalance.Flag("consumers", "Balance streams with fewer consumers than threshold").PlaceHolder("THRESHOLD").Default("-1").IntVar(&c.fConsumers) + strClusterBalance.Flag("subject", "Filters Streams by those with interest matching a subject or wildcard and balances them").StringVar(&c.filterSubject) + strClusterBalance.Flag("replicas", "Balance streams with fewer or equal replicas than the value").PlaceHolder("REPLICAS").UintVar(&c.fReplicas) + strClusterBalance.Flag("sourced", "Balance that sources data from other streams").IsSetByUser(&c.fSourcedSet).UnNegatableBoolVar(&c.fSourced) + strClusterBalance.Flag("mirrored", "Balance that mirrors data from other streams").IsSetByUser(&c.fMirroredSet).UnNegatableBoolVar(&c.fMirrored) + strClusterBalance.Flag("leader", "Balance only clustered streams with a specific leader").PlaceHolder("SERVER").StringVar(&c.fLeader) + strClusterBalance.Flag("invert", "Invert the check - before becomes after, with becomes without").BoolVar(&c.fInvert) + strClusterBalance.Flag("expression", "Balance matching streams using an expression language").StringVar(&c.fExpression) strClusterRemovePeer := strCluster.Command("peer-remove", "Removes a peer from the Stream cluster").Alias("pr").Action(c.removePeer) strClusterRemovePeer.Arg("stream", "The stream to act on").StringVar(&c.stream) @@ -829,6 +845,77 @@ func (c *streamCmd) loadStream(stream string) (*jsm.Stream, error) { return c.mgr.LoadStream(stream) } +func (c *streamCmd) balanceAction(_ *fisk.ParseContext) error { + var err error + + c.nc, c.mgr, err = prepareHelper("", natsOpts()...) + if err != nil { + return fmt.Errorf("setup failed: %v", err) + } + + var opts []jsm.StreamQueryOpt + if c.fServer != "" { + opts = append(opts, jsm.StreamQueryServerName(c.fServer)) + } + if c.fCluster != "" { + opts = append(opts, jsm.StreamQueryClusterName(c.fCluster)) + } + if c.fEmpty { + opts = append(opts, jsm.StreamQueryWithoutMessages()) + } + if c.fIdle > 0 { + opts = append(opts, jsm.StreamQueryIdleLongerThan(c.fIdle)) + } + if c.fCreated > 0 { + opts = append(opts, jsm.StreamQueryOlderThan(c.fCreated)) + } + if c.fConsumers >= 0 { + opts = append(opts, jsm.StreamQueryFewerConsumersThan(uint(c.fConsumers))) + } + if c.fInvert { + opts = append(opts, jsm.StreamQueryInvert()) + } + if c.filterSubject != "" { + opts = append(opts, jsm.StreamQuerySubjectWildcard(c.filterSubject)) + } + if c.fSourcedSet { + opts = append(opts, jsm.StreamQueryIsSourced()) + } + if c.fMirroredSet { + opts = append(opts, jsm.StreamQueryIsMirror()) + } + if c.fReplicas > 0 { + opts = append(opts, jsm.StreamQueryReplicas(c.fReplicas)) + } + if c.fExpression != "" { + opts = append(opts, jsm.StreamQueryExpression(c.fExpression)) + } + if c.fLeader != "" { + opts = append(opts, jsm.StreamQueryLeaderServer(c.fLeader)) + } + + found, err := c.mgr.QueryStreams(opts...) + if err != nil { + return err + } + + if len(found) > 0 { + balancer, err := balancer.New(c.mgr.NatsConn(), api.NewDefaultLogger(api.InfoLevel)) + if err != nil { + return err + } + + balanced, err := balancer.BalanceStreams(found) + if err != nil { + return fmt.Errorf("failed to balance streams - %s", err) + } + fmt.Printf("Balanced %d streams.\n", balanced) + + } + + return nil +} + func (c *streamCmd) leaderStandDown(_ *fisk.ParseContext) error { c.connectAndAskStream() diff --git a/go.mod b/go.mod index aed3571e..8e277c0b 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/klauspost/compress v1.17.11 github.com/mattn/go-isatty v0.0.20 - github.com/nats-io/jsm.go v0.1.1-0.20241128091442-da16cd219f9b + github.com/nats-io/jsm.go v0.1.1-0.20241211112912-e379cb1c3877 github.com/nats-io/jwt/v2 v2.7.2 github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241127165413-cfaad68e19db github.com/nats-io/nats.go v1.37.0 diff --git a/go.sum b/go.sum index 658021ef..7f6284c5 100644 --- a/go.sum +++ b/go.sum @@ -106,8 +106,8 @@ github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zx github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= -github.com/nats-io/jsm.go v0.1.1-0.20241128091442-da16cd219f9b h1:LNt3vx1htGxJ+jGw4nZ0xe+zKabImvOBf0XgujSPURc= -github.com/nats-io/jsm.go v0.1.1-0.20241128091442-da16cd219f9b/go.mod h1:JzvMg3G7JtwRnl+ZenOH6NVI73Kuc4kIM1YDRaI4xZs= +github.com/nats-io/jsm.go v0.1.1-0.20241211112912-e379cb1c3877 h1:Q9u7cz8jPvNZOvX9Z5TAk0G2To0iZOnNeUL4mVER7P8= +github.com/nats-io/jsm.go v0.1.1-0.20241211112912-e379cb1c3877/go.mod h1:5Wh2yAEPwB+sIAwS4MShJDw+ToZZmayZeqqK1zh+0xQ= github.com/nats-io/jwt/v2 v2.7.2 h1:SCRjfDLJ2q8naXp8YlGJJS5/yj3wGSODFYVi4nnwVMw= github.com/nats-io/jwt/v2 v2.7.2/go.mod h1:kB6QUmqHG6Wdrzj0KP2L+OX4xiTPBeV+NHVstFaATXU= github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241127165413-cfaad68e19db h1:XSsKLcdTjNwRhhiPS2G193zgh7yCHjT5IRzRyTuI/Y0=