Skip to content

Commit

Permalink
Add option to ignore update descriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
ldechoux committed Nov 21, 2023
1 parent 29aa55c commit 85e2790
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 24 deletions.
17 changes: 9 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ type MongoDB struct {
}

type MongoDBOptions struct {
BatchSize int32 `config:"MONGODB_OPTION_BATCH_SIZE"`
FullDocument bool `config:"MONGODB_OPTION_FULL_DOCUMENT"`
MaxAwaitTime time.Duration `config:"MONGODB_OPTION_MAX_AWAIT_TIME"`
ResumeAfter string `config:"MONGODB_OPTION_RESUME_AFTER"`
StartAtOperationTimeI uint32 `config:"MONGODB_OPTION_START_AT_OPERATION_TIME_I"`
StartAtOperationTimeT uint32 `config:"MONGODB_OPTION_START_AT_OPERATION_TIME_T"`
WatchRetryDelay time.Duration `config:"MONGODB_OPTION_WATCH_RETRY_DELAY"`
WatchMaxRetries int32 `config:"MONGODB_OPTION_WATCH_MAX_RETRIES"`
BatchSize int32 `config:"MONGODB_OPTION_BATCH_SIZE"`
FullDocument bool `config:"MONGODB_OPTION_FULL_DOCUMENT"`
IgnoreUpdateDescription bool `config:"MONGODB_OPTION_IGNORE_UPDATE_DESCRIPTION"`
MaxAwaitTime time.Duration `config:"MONGODB_OPTION_MAX_AWAIT_TIME"`
ResumeAfter string `config:"MONGODB_OPTION_RESUME_AFTER"`
StartAtOperationTimeI uint32 `config:"MONGODB_OPTION_START_AT_OPERATION_TIME_I"`
StartAtOperationTimeT uint32 `config:"MONGODB_OPTION_START_AT_OPERATION_TIME_T"`
WatchRetryDelay time.Duration `config:"MONGODB_OPTION_WATCH_RETRY_DELAY"`
WatchMaxRetries int32 `config:"MONGODB_OPTION_WATCH_MAX_RETRIES"`
}

// Kafka is the configuration provider for Kafka
Expand Down
43 changes: 27 additions & 16 deletions internal/mongo/watch_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (w *WatchProducer) GetProducer(o ...WatchOption) ChangeEventProducer {
w.logger.Info("Context canceled")
cursor.Close(ctx)
return
case startAfter := <-w.sendEvents(ctx, cursor, events):
case startAfter := <-w.sendEvents(ctx, cursor, events, config.ignoreUpdateDescription):
w.logger.Info("Mongo client : Retry to watch collection", logger.String("collection", w.collection.Name()), logger.Any("start_after", startAfter))
cursor.Close(ctx)
if config.maxRetries == 0 {
Expand Down Expand Up @@ -103,7 +103,7 @@ func (w *WatchProducer) watch(ctx context.Context, pipeline bson.A, config *Watc
return
}

func (w *WatchProducer) sendEvents(ctx context.Context, cursor StreamCursor, events chan *ChangeEvent) <-chan bson.Raw {
func (w *WatchProducer) sendEvents(ctx context.Context, cursor StreamCursor, events chan *ChangeEvent, ignoreUpdateDescription bool) <-chan bson.Raw {
resumeToken := make(chan bson.Raw, 1)

go func() {
Expand All @@ -122,6 +122,9 @@ func (w *WatchProducer) sendEvents(ctx context.Context, cursor StreamCursor, eve
w.logger.Error("Mongo client: Unable to decode change event value from cursor", logger.Error("error", err))
continue
}
if ignoreUpdateDescription {
event.Updates = nil
}
events <- event
}
resumeToken <- cursor.ResumeToken()
Expand All @@ -141,13 +144,14 @@ func NewWatchProducer(adapter CollectionAdapter, logger logger.LoggerInterface,
type WatchOption func(*WatchConfig)

type WatchConfig struct {
batchSize int32
fullDocumentEnabled bool
maxAwaitTime time.Duration
resumeAfter bson.M
startAtOperationTime *primitive.Timestamp
maxRetries int32
retryDelay time.Duration
batchSize int32
fullDocumentEnabled bool
ignoreUpdateDescription bool
maxAwaitTime time.Duration
resumeAfter bson.M
startAtOperationTime *primitive.Timestamp
maxRetries int32
retryDelay time.Duration
}

func (o *WatchConfig) apply(options ...WatchOption) {
Expand All @@ -158,13 +162,14 @@ func (o *WatchConfig) apply(options ...WatchOption) {

func NewWatchConfig(o ...WatchOption) *WatchConfig {
watchOptions := &WatchConfig{
batchSize: 0,
fullDocumentEnabled: false,
maxAwaitTime: 0,
resumeAfter: bson.M{},
startAtOperationTime: nil,
maxRetries: 3,
retryDelay: 250 * time.Millisecond,
batchSize: 0,
fullDocumentEnabled: false,
ignoreUpdateDescription: false,
maxAwaitTime: 0,
resumeAfter: bson.M{},
startAtOperationTime: nil,
maxRetries: 3,
retryDelay: 250 * time.Millisecond,
}
watchOptions.apply(o...)
return watchOptions
Expand Down Expand Up @@ -206,6 +211,12 @@ func WithResumeAfter(resumeAfter []byte) WatchOption {
}
}

func WithIgnoreUpdateDescription(ignore bool) WatchOption {
return func(w *WatchConfig) {
w.ignoreUpdateDescription = ignore
}
}

// WithStartAtOperationTime allows to specify the timestamp for the change stream to only
// return changes that occurred at or after the given timestamp.
func WithStartAtOperationTime(startAtOperationTime primitive.Timestamp) WatchOption {
Expand Down
1 change: 1 addition & 0 deletions internal/service/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (container *Container) getWatchOptions() []mongo.WatchOption {
}),
mongo.WithMaxRetries(configOptions.WatchMaxRetries),
mongo.WithRetryDelay(configOptions.WatchRetryDelay),
mongo.WithIgnoreUpdateDescription(configOptions.IgnoreUpdateDescription),
}
}

Expand Down

0 comments on commit 85e2790

Please sign in to comment.