diff --git a/cli/pub_command.go b/cli/pub_command.go index 10e368ed..22141919 100644 --- a/cli/pub_command.go +++ b/cli/pub_command.go @@ -22,6 +22,7 @@ import ( "github.com/choria-io/fisk" "github.com/gosuri/uiprogress" + "github.com/nats-io/jsm.go" "github.com/nats-io/nats.go" terminal "golang.org/x/term" ) @@ -39,6 +40,7 @@ type pubCmd struct { replyTimeout time.Duration forceStdin bool translate string + jetstream bool } func configurePubCommand(app commandHost) { @@ -74,6 +76,7 @@ Available template functions are: pub.Flag("count", "Publish multiple messages").Default("1").IntVar(&c.cnt) pub.Flag("sleep", "When publishing multiple messages, sleep between publishes").DurationVar(&c.sleep) pub.Flag("force-stdin", "Force reading from stdin").UnNegatableBoolVar(&c.forceStdin) + pub.Flag("jetstream", "Publish messages to jetstream").Short('J').UnNegatableBoolVar(&c.jetstream) requestHelp := `Body and Header values of the messages may use Go templates to create unique messages. @@ -229,6 +232,53 @@ func (c *pubCmd) doReq(nc *nats.Conn, progress *uiprogress.Bar) error { return nil } +func (c *pubCmd) doJetstream(nc *nats.Conn, progress *uiprogress.Bar) error { + for i := 1; i <= c.cnt; i++ { + start := time.Now() + body, err := pubReplyBodyTemplate(c.body, "", i) + if err != nil { + log.Printf("Could not parse body template: %s", err) + } + + msg, err := c.prepareMsg(body, i) + if err != nil { + return err + } + + msg.Subject = c.subject + + resp, err := nc.RequestMsg(msg, opts().Timeout) + if err != nil { + return err + } + + ack, err := jsm.ParsePubAck(resp) + if err != nil { + return err + } + + if opts().Trace { + fmt.Printf("<<< %+v\n", string(resp.Data)) + } + + if progress != nil { + progress.Incr() + } + + fmt.Printf(">>> Stream: %v Sequence: %s Duplicate: %t Domain: %q\n", ack.Stream, f(ack.Sequence), ack.Duplicate, ack.Domain) + + // If applicable, account for the wait duration in a publish sleep. + if c.cnt > 1 && c.sleep > 0 { + st := c.sleep - time.Since(start) + if st > 0 { + time.Sleep(st) + } + } + } + + return nil +} + func (c *pubCmd) publish(_ *fisk.ParseContext) error { nc, err := newNatsConn("", natsOpts()...) if err != nil { @@ -263,6 +313,10 @@ func (c *pubCmd) publish(_ *fisk.ParseContext) error { defer func() { uiprogress.Stop(); fmt.Println() }() } + if c.jetstream { + return c.doJetstream(nc, progress) + } + if c.req || c.replyCount >= 1 { return c.doReq(nc, progress) }