Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream- and consumer balance commands #1194

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions cli/consumer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/google/go-cmp/cmp"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/jsm.go/balancer"
"github.com/nats-io/nats.go"

"github.com/nats-io/jsm.go"
Expand Down Expand Up @@ -297,6 +298,22 @@ func configureConsumerCommand(app commandHost) {
conClusterDown.Arg("stream", "Stream to act on").StringVar(&c.stream)
conClusterDown.Arg("consumer", "Consumer to act on").StringVar(&c.consumer)
conClusterDown.Flag("force", "Force leader step down ignoring current leader").Short('f').UnNegatableBoolVar(&c.force)
conClusterBalance := conCluster.Command("balance", "Balance consumer leaders").Action(c.balanceAction)
ripienaar marked this conversation as resolved.
Show resolved Hide resolved
conClusterBalance.Arg("stream", "Stream to act on").StringVar(&c.stream)
ripienaar marked this conversation as resolved.
Show resolved Hide resolved
conClusterBalance.Flag("pull", "Balance only pull based consumers").UnNegatableBoolVar(&c.fPull)
conClusterBalance.Flag("push", "Balance only push based consumers").UnNegatableBoolVar(&c.fPush)
conClusterBalance.Flag("bound", "Balance push-bound or pull consumers with waiting pulls").UnNegatableBoolVar(&c.fBound)
conClusterBalance.Flag("waiting", "Balance consumers with fewer waiting pulls").IntVar(&c.fWaiting)
conClusterBalance.Flag("ack-pending", "Balance consumers with fewer pending acks").IntVar(&c.fAckPending)
conClusterBalance.Flag("pending", "Balance consumers with fewer unprocessed messages").Uint64Var(&c.fPending)
conClusterBalance.Flag("idle", "Balance consumers with no new deliveries for a period").DurationVar(&c.fIdle)
conClusterBalance.Flag("created", "Balance consumers created longer ago than duration").PlaceHolder("DURATION").DurationVar(&c.fCreated)
conClusterBalance.Flag("replicas", "Balance consumers with fewer or equal replicas than the value").PlaceHolder("REPLICAS").UintVar(&c.fReplicas)
conClusterBalance.Flag("leader", "Balance only clustered streams with a specific leader").PlaceHolder("SERVER").StringVar(&c.fLeader)
conClusterBalance.Flag("pinned", "Balance Pinned Client priority group consumers that are fully pinned").UnNegatableBoolVar(&c.fPinned)
conClusterBalance.Flag("invert", "Invert the check - before becomes after, with becomes without").BoolVar(&c.fInvert)
conClusterBalance.Flag("expression", "Balance matching consumers using an expression language").StringVar(&c.fExpression)

}

func init() {
Expand Down Expand Up @@ -573,6 +590,85 @@ func (c *consumerCmd) graphAction(_ *fisk.ParseContext) error {
}
}

func (c *consumerCmd) balanceAction(_ *fisk.ParseContext) error {
var err error
var stream *jsm.Stream

c.connectAndSetup(true, false)

c.stream, stream, err = selectStream(c.mgr, c.stream, c.force, c.showAll)
if err != nil {
return err
}

if stream == nil {
return fmt.Errorf("no stream selected")
}

var opts []jsm.ConsumerQueryOpt
if c.fPush {
opts = append(opts, jsm.ConsumerQueryIsPush())
}
if c.fPull {
opts = append(opts, jsm.ConsumerQueryIsPull())
}
if c.fBound {
opts = append(opts, jsm.ConsumerQueryIsBound())
}
if c.fWaiting > 0 {
opts = append(opts, jsm.ConsumerQueryWithFewerWaiting(c.fWaiting))
}
if c.fAckPending > 0 {
opts = append(opts, jsm.ConsumerQueryWithFewerAckPending(c.fAckPending))
}
if c.fPending > 0 {
opts = append(opts, jsm.ConsumerQueryWithFewerPending(c.fPending))
}
if c.fIdle > 0 {
opts = append(opts, jsm.ConsumerQueryWithDeliverySince(c.fIdle))
}
if c.fCreated > 0 {
opts = append(opts, jsm.ConsumerQueryOlderThan(c.fCreated))
}
if c.fReplicas > 0 {
opts = append(opts, jsm.ConsumerQueryReplicas(c.fReplicas))
}
if c.fInvert {
opts = append(opts, jsm.ConsumerQueryInvert())
}
if c.fExpression != "" {
opts = append(opts, jsm.ConsumerQueryExpression(c.fExpression))
}
if c.fLeader != "" {
opts = append(opts, jsm.ConsumerQueryLeaderServer(c.fLeader))
}
if c.fPinned {
opts = append(opts, jsm.ConsumerQueryIsPinned())
}

consumers, err := stream.QueryConsumers(opts...)
if err != nil {
return err
}

if len(consumers) > 0 {
balancer, err := balancer.New(c.mgr.NatsConn(), api.NewDefaultLogger(api.InfoLevel))
if err != nil {
return err
}

balanced, err := balancer.BalanceConsumers(consumers)
if err != nil {
return fmt.Errorf("failed to balance consumers on %s - %s", c.stream, err)
}
fmt.Printf("Balanced %d consumers on %s\n", balanced, c.stream)

} else {
fmt.Printf("No consumers on %s\n", c.stream)
}
return nil
}

func (c *consumerCmd) leaderStandDownAction(_ *fisk.ParseContext) error {
c.connectAndSetup(true, true)

Expand Down
89 changes: 88 additions & 1 deletion cli/stream_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/jedib0t/go-pretty/v6/progress"
"io"
"math"
"os"
Expand All @@ -31,6 +30,8 @@ import (
"syscall"
"time"

"github.com/jedib0t/go-pretty/v6/progress"

"github.com/nats-io/natscli/internal/asciigraph"
iu "github.com/nats-io/natscli/internal/util"
terminal "golang.org/x/term"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/nats-io/jsm.go"
"github.com/nats-io/jsm.go/api"
"github.com/nats-io/jsm.go/balancer"
"github.com/nats-io/nats.go"
"github.com/nats-io/natscli/columns"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -403,6 +405,20 @@ Finding streams with certain subjects configured:
strClusterDown := strCluster.Command("step-down", "Force a new leader election by standing down the current leader").Alias("stepdown").Alias("sd").Alias("elect").Alias("down").Alias("d").Action(c.leaderStandDown)
strClusterDown.Arg("stream", "Stream to act on").StringVar(&c.stream)
strClusterDown.Flag("force", "Force leader step down ignoring current leader").Short('f').UnNegatableBoolVar(&c.force)
strClusterBalance := strCluster.Command("balance", "Balance stream leaders").Action(c.balanceAction)
strClusterBalance.Flag("server-name", "Balance streams present on a regular expression matched server").StringVar(&c.fServer)
strClusterBalance.Flag("cluster", "Balance streams present on a regular expression matched cluster").StringVar(&c.fCluster)
strClusterBalance.Flag("empty", "Balance streams with no messages").UnNegatableBoolVar(&c.fEmpty)
strClusterBalance.Flag("idle", "Balance streams with no new messages or consumer deliveries for a period").PlaceHolder("DURATION").DurationVar(&c.fIdle)
strClusterBalance.Flag("created", "Balance streams created longer ago than duration").PlaceHolder("DURATION").DurationVar(&c.fCreated)
strClusterBalance.Flag("consumers", "Balance streams with fewer consumers than threshold").PlaceHolder("THRESHOLD").Default("-1").IntVar(&c.fConsumers)
strClusterBalance.Flag("subject", "Filters Streams by those with interest matching a subject or wildcard and balances them").StringVar(&c.filterSubject)
strClusterBalance.Flag("replicas", "Balance streams with fewer or equal replicas than the value").PlaceHolder("REPLICAS").UintVar(&c.fReplicas)
strClusterBalance.Flag("sourced", "Balance that sources data from other streams").IsSetByUser(&c.fSourcedSet).UnNegatableBoolVar(&c.fSourced)
strClusterBalance.Flag("mirrored", "Balance that mirrors data from other streams").IsSetByUser(&c.fMirroredSet).UnNegatableBoolVar(&c.fMirrored)
strClusterBalance.Flag("leader", "Balance only clustered streams with a specific leader").PlaceHolder("SERVER").StringVar(&c.fLeader)
strClusterBalance.Flag("invert", "Invert the check - before becomes after, with becomes without").BoolVar(&c.fInvert)
strClusterBalance.Flag("expression", "Balance matching streams using an expression language").StringVar(&c.fExpression)

strClusterRemovePeer := strCluster.Command("peer-remove", "Removes a peer from the Stream cluster").Alias("pr").Action(c.removePeer)
strClusterRemovePeer.Arg("stream", "The stream to act on").StringVar(&c.stream)
Expand Down Expand Up @@ -829,6 +845,77 @@ func (c *streamCmd) loadStream(stream string) (*jsm.Stream, error) {
return c.mgr.LoadStream(stream)
}

func (c *streamCmd) balanceAction(_ *fisk.ParseContext) error {
var err error

c.nc, c.mgr, err = prepareHelper("", natsOpts()...)
if err != nil {
return fmt.Errorf("setup failed: %v", err)
}

var opts []jsm.StreamQueryOpt
if c.fServer != "" {
opts = append(opts, jsm.StreamQueryServerName(c.fServer))
}
if c.fCluster != "" {
opts = append(opts, jsm.StreamQueryClusterName(c.fCluster))
}
if c.fEmpty {
opts = append(opts, jsm.StreamQueryWithoutMessages())
}
if c.fIdle > 0 {
opts = append(opts, jsm.StreamQueryIdleLongerThan(c.fIdle))
}
if c.fCreated > 0 {
opts = append(opts, jsm.StreamQueryOlderThan(c.fCreated))
}
if c.fConsumers >= 0 {
opts = append(opts, jsm.StreamQueryFewerConsumersThan(uint(c.fConsumers)))
}
if c.fInvert {
opts = append(opts, jsm.StreamQueryInvert())
}
if c.filterSubject != "" {
opts = append(opts, jsm.StreamQuerySubjectWildcard(c.filterSubject))
}
if c.fSourcedSet {
opts = append(opts, jsm.StreamQueryIsSourced())
}
if c.fMirroredSet {
opts = append(opts, jsm.StreamQueryIsMirror())
}
if c.fReplicas > 0 {
opts = append(opts, jsm.StreamQueryReplicas(c.fReplicas))
}
if c.fExpression != "" {
opts = append(opts, jsm.StreamQueryExpression(c.fExpression))
}
if c.fLeader != "" {
opts = append(opts, jsm.StreamQueryLeaderServer(c.fLeader))
}

found, err := c.mgr.QueryStreams(opts...)
if err != nil {
return err
}

if len(found) > 0 {
balancer, err := balancer.New(c.mgr.NatsConn(), api.NewDefaultLogger(api.InfoLevel))
if err != nil {
return err
}

balanced, err := balancer.BalanceStreams(found)
if err != nil {
return fmt.Errorf("failed to balance streams - %s", err)
}
fmt.Printf("Balanced %d streams.\n", balanced)

}

return nil
}

func (c *streamCmd) leaderStandDown(_ *fisk.ParseContext) error {
c.connectAndAskStream()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/klauspost/compress v1.17.11
github.com/mattn/go-isatty v0.0.20
github.com/nats-io/jsm.go v0.1.1-0.20241128091442-da16cd219f9b
github.com/nats-io/jsm.go v0.1.1-0.20241211112912-e379cb1c3877
github.com/nats-io/jwt/v2 v2.7.2
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241127165413-cfaad68e19db
github.com/nats-io/nats.go v1.37.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zx
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/jsm.go v0.1.1-0.20241128091442-da16cd219f9b h1:LNt3vx1htGxJ+jGw4nZ0xe+zKabImvOBf0XgujSPURc=
github.com/nats-io/jsm.go v0.1.1-0.20241128091442-da16cd219f9b/go.mod h1:JzvMg3G7JtwRnl+ZenOH6NVI73Kuc4kIM1YDRaI4xZs=
github.com/nats-io/jsm.go v0.1.1-0.20241211112912-e379cb1c3877 h1:Q9u7cz8jPvNZOvX9Z5TAk0G2To0iZOnNeUL4mVER7P8=
github.com/nats-io/jsm.go v0.1.1-0.20241211112912-e379cb1c3877/go.mod h1:5Wh2yAEPwB+sIAwS4MShJDw+ToZZmayZeqqK1zh+0xQ=
github.com/nats-io/jwt/v2 v2.7.2 h1:SCRjfDLJ2q8naXp8YlGJJS5/yj3wGSODFYVi4nnwVMw=
github.com/nats-io/jwt/v2 v2.7.2/go.mod h1:kB6QUmqHG6Wdrzj0KP2L+OX4xiTPBeV+NHVstFaATXU=
github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20241127165413-cfaad68e19db h1:XSsKLcdTjNwRhhiPS2G193zgh7yCHjT5IRzRyTuI/Y0=
Expand Down
Loading