Skip to content
This repository has been archived by the owner on Dec 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #150 from odacremolbap/task/make-redis-tracking-id…
Browse files Browse the repository at this point in the history
…-opt-in

Redis: make redis tracking id opt-in
  • Loading branch information
Pablo Mercado authored May 3, 2023
2 parents 1b85530 + 66d8b2f commit d0dc100
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ redis.tls-skip-verify | REDIS_TLS_SKIP_VERIFY | false | TLS skippi
redis.tls-ca-certificate | REDIS_TLS_CA_CERTIFICATE | | TLS CA certificate used to connect to Redis.
redis.tls-certificate | REDIS_TLS_CERTIFICATE | | TLS certificate used to authenticate with Redis.
redis.tls-key | REDIS_TLS_KEY | | TLS key used to authenticate with Redis.
redis.tracking-id-enabled | REDIS_TRACKING_ID_ENABLED | false | Adds the Redis ID for the event as `triggermeshbackendid` CloudEvents attribute.
redis.stream | REDIS_STREAM | triggermesh | Stream name that stores the broker's CloudEvents.
redis.group | REDIS_GROUP | default | Redis stream consumer group name.
redis.stream-max-len | REDIS_STREAM_MAX_LEN | 1000 | Limit the number of items in a stream by trimming it. Set to 0 for unlimited.
Expand Down
3 changes: 2 additions & 1 deletion pkg/backend/impl/redis/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type RedisArgs struct {
// Instance at the Redis stream consumer group. Copied from the InstanceName at the global args.
Instance string `kong:"-"`

StreamMaxLen int `help:"Limit the number of items in a stream by trimming it. Set to 0 for unlimited." env:"STREAM_MAX_LEN" default:"1000"`
StreamMaxLen int `help:"Limit the number of items in a stream by trimming it. Set to 0 for unlimited." env:"STREAM_MAX_LEN" default:"1000"`
TrackingIDEnabled bool `help:"Enables adding Redis ID as a CloudEvent attribute." env:"TRACKING_ID_ENABLED" default:"false"`
}

func (ra *RedisArgs) Validate() error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/backend/impl/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ func (s *redis) Subscribe(name string, ccb backend.ConsumerDispatcher) error {
name: name,
group: group,

trackingEnabled: s.args.TrackingIDEnabled,

// caller's callback for dispatching events from Redis.
ccbDispatch: ccb,

Expand Down
20 changes: 11 additions & 9 deletions pkg/backend/impl/redis/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type subscription struct {
name string
group string

trackingEnabled bool

// caller's callback for dispatching events from Redis.
ccbDispatch backend.ConsumerDispatcher

Expand Down Expand Up @@ -132,20 +134,20 @@ func (s *subscription) start() {
continue
}

if err = ce.Context.SetExtension(BackendIDAttribute, msg.ID); err != nil {
s.logger.Errorw(fmt.Sprintf("could not set %s attributes for the Redis message %s. Tracking will not be possible.", BackendIDAttribute, msg.ID),
zap.Error(err))
if s.trackingEnabled {
if err = ce.Context.SetExtension(BackendIDAttribute, msg.ID); err != nil {
s.logger.Errorw(fmt.Sprintf("could not set %s attributes for the Redis message %s. Tracking will not be possible.", BackendIDAttribute, msg.ID),
zap.Error(err))
}
}

go func() {
go func(msgID string) {
s.ccbDispatch(ce)
id := ce.Extensions()[BackendIDAttribute].(string)

if err := s.ack(id); err != nil {
s.logger.Errorw(fmt.Sprintf("could not ACK the Redis message %s containing CloudEvent %s", id, ce.Context.GetID()),
if err := s.ack(msgID); err != nil {
s.logger.Errorw(fmt.Sprintf("could not ACK the Redis message %s containing CloudEvent %s", msgID, ce.Context.GetID()),
zap.Error(err))
}
}()
}(msg.ID)

// If we are processing pending messages the ACK might take a
// while to be sent. We need to set the message ID so that the
Expand Down

0 comments on commit d0dc100

Please sign in to comment.