Skip to content

Commit

Permalink
Merge pull request #1144 from ploubser/issue_1130
Browse files Browse the repository at this point in the history
(#1130) Add --jetstream flag to publish
  • Loading branch information
ripienaar authored Sep 9, 2024
2 parents f49ac82 + 6cc5c42 commit 3ad589d
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions cli/pub_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/choria-io/fisk"
"github.com/gosuri/uiprogress"
"github.com/nats-io/jsm.go"
"github.com/nats-io/nats.go"
terminal "golang.org/x/term"
)
Expand All @@ -39,6 +40,7 @@ type pubCmd struct {
replyTimeout time.Duration
forceStdin bool
translate string
jetstream bool
}

func configurePubCommand(app commandHost) {
Expand Down Expand Up @@ -74,6 +76,7 @@ Available template functions are:
pub.Flag("count", "Publish multiple messages").Default("1").IntVar(&c.cnt)
pub.Flag("sleep", "When publishing multiple messages, sleep between publishes").DurationVar(&c.sleep)
pub.Flag("force-stdin", "Force reading from stdin").UnNegatableBoolVar(&c.forceStdin)
pub.Flag("jetstream", "Publish messages to jetstream").Short('J').UnNegatableBoolVar(&c.jetstream)

requestHelp := `Body and Header values of the messages may use Go templates to
create unique messages.
Expand Down Expand Up @@ -229,6 +232,53 @@ func (c *pubCmd) doReq(nc *nats.Conn, progress *uiprogress.Bar) error {
return nil
}

func (c *pubCmd) doJetstream(nc *nats.Conn, progress *uiprogress.Bar) error {
for i := 1; i <= c.cnt; i++ {
start := time.Now()
body, err := pubReplyBodyTemplate(c.body, "", i)
if err != nil {
log.Printf("Could not parse body template: %s", err)
}

msg, err := c.prepareMsg(body, i)
if err != nil {
return err
}

msg.Subject = c.subject

resp, err := nc.RequestMsg(msg, opts().Timeout)
if err != nil {
return err
}

ack, err := jsm.ParsePubAck(resp)
if err != nil {
return err
}

if opts().Trace {
fmt.Printf("<<< %+v\n", string(resp.Data))
}

if progress != nil {
progress.Incr()
}

fmt.Printf(">>> Stream: %v Sequence: %s Duplicate: %t Domain: %q\n", ack.Stream, f(ack.Sequence), ack.Duplicate, ack.Domain)

// If applicable, account for the wait duration in a publish sleep.
if c.cnt > 1 && c.sleep > 0 {
st := c.sleep - time.Since(start)
if st > 0 {
time.Sleep(st)
}
}
}

return nil
}

func (c *pubCmd) publish(_ *fisk.ParseContext) error {
nc, err := newNatsConn("", natsOpts()...)
if err != nil {
Expand Down Expand Up @@ -263,6 +313,10 @@ func (c *pubCmd) publish(_ *fisk.ParseContext) error {
defer func() { uiprogress.Stop(); fmt.Println() }()
}

if c.jetstream {
return c.doJetstream(nc, progress)
}

if c.req || c.replyCount >= 1 {
return c.doReq(nc, progress)
}
Expand Down

0 comments on commit 3ad589d

Please sign in to comment.