diff --git a/config/config.go b/config/config.go index bd3a3f52..3090d884 100644 --- a/config/config.go +++ b/config/config.go @@ -54,14 +54,15 @@ type MongoDB struct { } type MongoDBOptions struct { - BatchSize int32 `config:"MONGODB_OPTION_BATCH_SIZE"` - FullDocument bool `config:"MONGODB_OPTION_FULL_DOCUMENT"` - MaxAwaitTime time.Duration `config:"MONGODB_OPTION_MAX_AWAIT_TIME"` - ResumeAfter string `config:"MONGODB_OPTION_RESUME_AFTER"` - StartAtOperationTimeI uint32 `config:"MONGODB_OPTION_START_AT_OPERATION_TIME_I"` - StartAtOperationTimeT uint32 `config:"MONGODB_OPTION_START_AT_OPERATION_TIME_T"` - WatchRetryDelay time.Duration `config:"MONGODB_OPTION_WATCH_RETRY_DELAY"` - WatchMaxRetries int32 `config:"MONGODB_OPTION_WATCH_MAX_RETRIES"` + BatchSize int32 `config:"MONGODB_OPTION_BATCH_SIZE"` + FullDocument bool `config:"MONGODB_OPTION_FULL_DOCUMENT"` + IgnoreUpdateDescription bool `config:"MONGODB_OPTION_IGNORE_UPDATE_DESCRIPTION"` + MaxAwaitTime time.Duration `config:"MONGODB_OPTION_MAX_AWAIT_TIME"` + ResumeAfter string `config:"MONGODB_OPTION_RESUME_AFTER"` + StartAtOperationTimeI uint32 `config:"MONGODB_OPTION_START_AT_OPERATION_TIME_I"` + StartAtOperationTimeT uint32 `config:"MONGODB_OPTION_START_AT_OPERATION_TIME_T"` + WatchRetryDelay time.Duration `config:"MONGODB_OPTION_WATCH_RETRY_DELAY"` + WatchMaxRetries int32 `config:"MONGODB_OPTION_WATCH_MAX_RETRIES"` } // Kafka is the configuration provider for Kafka diff --git a/internal/mongo/watch_producer.go b/internal/mongo/watch_producer.go index 5267c81a..5dd7b5e8 100644 --- a/internal/mongo/watch_producer.go +++ b/internal/mongo/watch_producer.go @@ -48,7 +48,7 @@ func (w *WatchProducer) GetProducer(o ...WatchOption) ChangeEventProducer { w.logger.Info("Context canceled") cursor.Close(ctx) return - case startAfter := <-w.sendEvents(ctx, cursor, events): + case startAfter := <-w.sendEvents(ctx, cursor, events, config.ignoreUpdateDescription): w.logger.Info("Mongo client : Retry to watch collection", logger.String("collection", w.collection.Name()), logger.Any("start_after", startAfter)) cursor.Close(ctx) if config.maxRetries == 0 { @@ -103,7 +103,7 @@ func (w *WatchProducer) watch(ctx context.Context, pipeline bson.A, config *Watc return } -func (w *WatchProducer) sendEvents(ctx context.Context, cursor StreamCursor, events chan *ChangeEvent) <-chan bson.Raw { +func (w *WatchProducer) sendEvents(ctx context.Context, cursor StreamCursor, events chan *ChangeEvent, ignoreUpdateDescription bool) <-chan bson.Raw { resumeToken := make(chan bson.Raw, 1) go func() { @@ -122,6 +122,9 @@ func (w *WatchProducer) sendEvents(ctx context.Context, cursor StreamCursor, eve w.logger.Error("Mongo client: Unable to decode change event value from cursor", logger.Error("error", err)) continue } + if ignoreUpdateDescription { + event.Updates = nil + } events <- event } resumeToken <- cursor.ResumeToken() @@ -141,13 +144,14 @@ func NewWatchProducer(adapter CollectionAdapter, logger logger.LoggerInterface, type WatchOption func(*WatchConfig) type WatchConfig struct { - batchSize int32 - fullDocumentEnabled bool - maxAwaitTime time.Duration - resumeAfter bson.M - startAtOperationTime *primitive.Timestamp - maxRetries int32 - retryDelay time.Duration + batchSize int32 + fullDocumentEnabled bool + ignoreUpdateDescription bool + maxAwaitTime time.Duration + resumeAfter bson.M + startAtOperationTime *primitive.Timestamp + maxRetries int32 + retryDelay time.Duration } func (o *WatchConfig) apply(options ...WatchOption) { @@ -158,13 +162,14 @@ func (o *WatchConfig) apply(options ...WatchOption) { func NewWatchConfig(o ...WatchOption) *WatchConfig { watchOptions := &WatchConfig{ - batchSize: 0, - fullDocumentEnabled: false, - maxAwaitTime: 0, - resumeAfter: bson.M{}, - startAtOperationTime: nil, - maxRetries: 3, - retryDelay: 250 * time.Millisecond, + batchSize: 0, + fullDocumentEnabled: false, + ignoreUpdateDescription: false, + maxAwaitTime: 0, + resumeAfter: bson.M{}, + startAtOperationTime: nil, + maxRetries: 3, + retryDelay: 250 * time.Millisecond, } watchOptions.apply(o...) return watchOptions @@ -206,6 +211,12 @@ func WithResumeAfter(resumeAfter []byte) WatchOption { } } +func WithIgnoreUpdateDescription(ignore bool) WatchOption { + return func(w *WatchConfig) { + w.ignoreUpdateDescription = ignore + } +} + // WithStartAtOperationTime allows to specify the timestamp for the change stream to only // return changes that occurred at or after the given timestamp. func WithStartAtOperationTime(startAtOperationTime primitive.Timestamp) WatchOption { diff --git a/internal/service/mongo.go b/internal/service/mongo.go index 6a63b216..9417ad9b 100644 --- a/internal/service/mongo.go +++ b/internal/service/mongo.go @@ -66,6 +66,7 @@ func (container *Container) getWatchOptions() []mongo.WatchOption { }), mongo.WithMaxRetries(configOptions.WatchMaxRetries), mongo.WithRetryDelay(configOptions.WatchRetryDelay), + mongo.WithIgnoreUpdateDescription(configOptions.IgnoreUpdateDescription), } }