Skip to content

Commit

Permalink
nats: change default delivery policy (#132)
Browse files Browse the repository at this point in the history
Previously the default was to set the Delivery Policy to all, however if
connecting to a consumer which already existed with a different delivery
policy, nats returns an error for a mismatch.

This change removes the default when no delivery policy is defined, to
not specify any delivery policy.

Additionally, I've added the remaining delivery policy options.

Signed-off-by: Mike Mason <[email protected]>
  • Loading branch information
mikemrm authored Aug 8, 2023
1 parent 1fb9cfe commit c9e198f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
16 changes: 13 additions & 3 deletions events/nats_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type NATSConfig struct {

SubscriberDeliveryPolicy string
SubscriberStartSequence uint64
SubscriberStartTime time.Time

logger *zap.SugaredLogger
connectOptions []nats.Option
Expand All @@ -66,7 +67,7 @@ func (c NATSConfig) Validate() error {
}

switch c.SubscriberDeliveryPolicy {
case "", "all", "start-sequence":
case "", "all", "last", "last-per-subject", "new", "start-sequence", "start-time":
default:
err = multierr.Append(err, ErrNATSInvalidDeliveryPolicy)
}
Expand Down Expand Up @@ -101,10 +102,18 @@ func (c NATSConfig) WithDefaults() NATSConfig {
}

switch c.SubscriberDeliveryPolicy {
case "all":
c.subscribeOptions = append(c.subscribeOptions, nats.DeliverAll())
case "last":
c.subscribeOptions = append(c.subscribeOptions, nats.DeliverLast())
case "last-per-subject":
c.subscribeOptions = append(c.subscribeOptions, nats.DeliverLastPerSubject())
case "new":
c.subscribeOptions = append(c.subscribeOptions, nats.DeliverNew())
case "start-sequence":
c.subscribeOptions = append(c.subscribeOptions, nats.StartSequence(c.SubscriberStartSequence))
default:
c.subscribeOptions = append(c.subscribeOptions, nats.DeliverAll())
case "start-time":
c.subscribeOptions = append(c.subscribeOptions, nats.StartTime(c.SubscriberStartTime))
}

if c.ShutdownTimeout == 0 {
Expand Down Expand Up @@ -187,6 +196,7 @@ func MustViperFlagsForNATS(v *viper.Viper, flags *pflag.FlagSet, appName string)
v.MustBindEnv("events.nats.subscriberNoManualAck")
v.MustBindEnv("events.nats.subscriberDeliveryPolicy")
v.MustBindEnv("events.nats.subscriberStartSequence")
v.MustBindEnv("events.nats.subscriberStartTime")

v.SetDefault("events.nats.connectTimeout", defaultTimeout)
v.SetDefault("events.nats.source", appName)
Expand Down
2 changes: 1 addition & 1 deletion events/nats_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ var (
ErrNATSInvalidAuthConfiguration = errors.New("invalid nats confinguration, both token and creds file are specified")

// ErrNATSInvalidDeliveryPolicy is returned when an incorrect delivery policy is provided.
ErrNATSInvalidDeliveryPolicy = errors.New("invalid delivery policy")
ErrNATSInvalidDeliveryPolicy = errors.New("invalid delivery policy, expected all|last|last-per-subject|new|start-sequence|start-time")

// ErrNATSMessageNoReplySubject is returned when calling ReplyAuthRelationshipRequest when the request has no reply subject defined.
ErrNATSMessageNoReplySubject = errors.New("unable to reply to auth relationship request, no reply subject specified")
Expand Down

0 comments on commit c9e198f

Please sign in to comment.