Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(#1145) Support interactive edit for consumers #1151

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 145 additions & 76 deletions cli/consumer_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
"math/rand"
"os"
"os/exec"
"os/signal"
"regexp"
"sort"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/nats-io/natscli/columns"
iu "github.com/nats-io/natscli/internal/util"
terminal "golang.org/x/term"
"gopkg.in/yaml.v3"

"github.com/AlecAivazis/survey/v2"
"github.com/choria-io/fisk"
Expand Down Expand Up @@ -116,6 +118,7 @@ type consumerCmd struct {
fInvert bool
fExpression string
fLeader string
interactive bool
}

func configureConsumerCommand(app commandHost) {
Expand Down Expand Up @@ -190,6 +193,7 @@ func configureConsumerCommand(app commandHost) {
edit.Arg("consumer", "Consumer name").StringVar(&c.consumer)
edit.Flag("config", "JSON file to read configuration from").ExistingFileVar(&c.inputFile)
edit.Flag("force", "Force removal without prompting").Short('f').UnNegatableBoolVar(&c.force)
edit.Flag("interactive", "Edit the configuring using your editor").Short('i').BoolVar(&c.interactive)
edit.Flag("dry-run", "Only shows differences, do not edit the stream").UnNegatableBoolVar(&c.dryRun)
addCreateFlags(edit, true)

Expand Down Expand Up @@ -556,108 +560,173 @@ func (c *consumerCmd) leaderStandDownAction(_ *fisk.ParseContext) error {
return nil
}

func (c *consumerCmd) editAction(pc *fisk.ParseContext) error {
c.connectAndSetup(true, true)
var err error
func (c *consumerCmd) interactiveEdit(cfg api.ConsumerConfig) (*api.ConsumerConfig, error) {
editor := os.Getenv("EDITOR")
if editor == "" {
return &api.ConsumerConfig{}, fmt.Errorf("set EDITOR environment variable to your chosen editor")
}

if c.selectedConsumer == nil {
c.selectedConsumer, err = c.mgr.LoadConsumer(c.stream, c.consumer)
fisk.FatalIfError(err, "could not load Consumer")
cj, err := decoratedYamlMarshal(cfg)
if err != nil {
return &api.ConsumerConfig{}, fmt.Errorf("could not create temporary file: %s", err)
}

if !c.selectedConsumer.IsDurable() {
return fmt.Errorf("only durable consumers can be edited")
tfile, err := os.CreateTemp("", "*.yaml")
if err != nil {
return &api.ConsumerConfig{}, fmt.Errorf("could not create temporary file: %s", err)
}
defer os.Remove(tfile.Name())

// lazy deep copy
t := c.selectedConsumer.Configuration()
tj, err := json.Marshal(t)
_, err = fmt.Fprint(tfile, string(cj))
if err != nil {
return err
return &api.ConsumerConfig{}, fmt.Errorf("could not create temporary file: %s", err)
}
var ncfg *api.ConsumerConfig

tfile.Close()

cmd := exec.Command(editor, tfile.Name())
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

err = cmd.Run()
if err != nil {
return &api.ConsumerConfig{}, fmt.Errorf("could not create temporary file: %s", err)
}

nb, err := os.ReadFile(tfile.Name())
if err != nil {
return &api.ConsumerConfig{}, err
}

ncfg := api.ConsumerConfig{}
err = yaml.Unmarshal(nb, &ncfg)
if err != nil {
return &api.ConsumerConfig{}, err
}

// some yaml quirks
if len(ncfg.BackOff) == 0 {
ncfg.BackOff = nil
}

return &ncfg, nil
}

func (c *consumerCmd) copyAndEditConsumer(cfg api.ConsumerConfig) (*api.ConsumerConfig, error) {
var err error

if c.inputFile != "" {
ncfg, err = c.loadConfigFile(c.inputFile)
if err != nil {
return err
}
} else {
err = json.Unmarshal(tj, &ncfg)
if err != nil {
return err
}
return c.loadConfigFile(c.inputFile)
}

if c.description != "" {
ncfg.Description = c.description
}
if c.description != "" {
cfg.Description = c.description
}

if c.inactiveThreshold != 0 {
ncfg.InactiveThreshold = c.inactiveThreshold
}
if c.inactiveThreshold != 0 {
cfg.InactiveThreshold = c.inactiveThreshold
}

if c.maxDeliver != 0 {
ncfg.MaxDeliver = c.maxDeliver
}
if c.maxDeliver != 0 {
cfg.MaxDeliver = c.maxDeliver
}

if c.maxAckPending != -1 {
ncfg.MaxAckPending = c.maxAckPending
}
if c.maxAckPending != -1 {
cfg.MaxAckPending = c.maxAckPending
}

if c.ackWait != -1*time.Second {
ncfg.AckWait = c.ackWait
}
if c.ackWait != -1*time.Second {
cfg.AckWait = c.ackWait
}

if c.maxWaiting != 0 {
ncfg.MaxWaiting = c.maxWaiting
}
if c.maxWaiting != 0 {
cfg.MaxWaiting = c.maxWaiting
}

if c.samplePct != -1 {
ncfg.SampleFrequency = c.sampleFreqFromInt(c.samplePct)
}
if c.samplePct != -1 {
cfg.SampleFrequency = c.sampleFreqFromInt(c.samplePct)
}

if c.maxPullBatch > 0 {
ncfg.MaxRequestBatch = c.maxPullBatch
}
if c.maxPullBatch > 0 {
cfg.MaxRequestBatch = c.maxPullBatch
}

if c.maxPullExpire > 0 {
ncfg.MaxRequestExpires = c.maxPullExpire
}
if c.maxPullExpire > 0 {
cfg.MaxRequestExpires = c.maxPullExpire
}

if c.maxPullBytes > 0 {
ncfg.MaxRequestMaxBytes = c.maxPullBytes
}
if c.maxPullBytes > 0 {
cfg.MaxRequestMaxBytes = c.maxPullBytes
}

if c.backoffMode != "" {
ncfg.BackOff, err = c.backoffPolicy()
if err != nil {
return fmt.Errorf("could not determine backoff policy: %v", err)
}
if c.backoffMode != "" {
cfg.BackOff, err = c.backoffPolicy()
if err != nil {
return &api.ConsumerConfig{}, fmt.Errorf("could not determine backoff policy: %v", err)
}
}

if c.delivery != "" {
ncfg.DeliverSubject = c.delivery
}
if c.delivery != "" {
cfg.DeliverSubject = c.delivery
}

if c.hdrsOnlySet {
ncfg.HeadersOnly = c.hdrsOnly
}
if c.hdrsOnlySet {
cfg.HeadersOnly = c.hdrsOnly
}

if len(c.filterSubjects) == 1 {
ncfg.FilterSubject = c.filterSubjects[0]
ncfg.FilterSubjects = nil
} else if len(c.filterSubjects) > 1 {
ncfg.FilterSubjects = c.filterSubjects
ncfg.FilterSubject = ""
}
if len(c.filterSubjects) == 1 {
cfg.FilterSubject = c.filterSubjects[0]
cfg.FilterSubjects = nil
} else if len(c.filterSubjects) > 1 {
cfg.FilterSubjects = c.filterSubjects
cfg.FilterSubject = ""
}

if c.replicas > 0 {
ncfg.Replicas = c.replicas
}
if c.replicas > 0 {
cfg.Replicas = c.replicas
}

if c.metadataIsSet {
ncfg.Metadata = c.metadata
}
if c.metadataIsSet {
cfg.Metadata = c.metadata
}

return &cfg, nil
}
func (c *consumerCmd) editAction(pc *fisk.ParseContext) error {
c.connectAndSetup(true, true)
var err error

if c.selectedConsumer == nil {
c.selectedConsumer, err = c.mgr.LoadConsumer(c.stream, c.consumer)
fisk.FatalIfError(err, "could not load Consumer")
}

if !c.selectedConsumer.IsDurable() {
return fmt.Errorf("only durable consumers can be edited")
}

// lazy deep copy
t := c.selectedConsumer.Configuration()
t.Metadata = iu.RemoveReservedMetadata(t.Metadata)

tj, err := json.Marshal(t)
if err != nil {
return err
}

var ncfg *api.ConsumerConfig
err = json.Unmarshal(tj, &ncfg)
if err != nil {
return err
}

if c.interactive {
ncfg, err = c.interactiveEdit(t)
fisk.FatalIfError(err, "could not create new configuration for Consumer %s", c.selectedConsumer.Name())
} else {
ncfg, err = c.copyAndEditConsumer(t)
fisk.FatalIfError(err, "could not create new configuration for Consumer %s", c.selectedConsumer.Name())
}

if len(ncfg.BackOff) > 0 && ncfg.AckWait != t.AckWait {
Expand Down