From f6758eace3d08d611f9fee06b3204c67097e1c82 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Fri, 22 Nov 2024 09:43:43 -0800 Subject: [PATCH] [exporter] Feature gate for queue batcher (#11721) #### Description This PR proceeds https://github.com/open-telemetry/opentelemetry-collector/pull/11637. It * Introduces a noop feature gate that will be used for queue batcher. * Updates exporter tests to run with both the feature gate on and off. #### Link to tracking issue https://github.com/open-telemetry/opentelemetry-collector/issues/10368 https://github.com/open-telemetry/opentelemetry-collector/issues/8122 #### Testing #### Documentation --- exporter/debugexporter/go.mod | 4 + exporter/debugexporter/go.sum | 2 + .../exporterhelperprofiles/go.mod | 4 + .../exporterhelperprofiles/go.sum | 2 + .../exporterhelper/internal/base_exporter.go | 8 + .../internal/base_exporter_test.go | 102 ++-- .../internal/batch_sender_test.go | 571 ++++++++++-------- .../internal/queue_sender_test.go | 553 ++++++++++------- .../internal/retry_sender_test.go | 571 ++++++++++-------- exporter/exporterhelper/internal/test_util.go | 20 + exporter/exporterprofiles/go.mod | 2 + exporter/exportertest/go.mod | 4 + exporter/exportertest/go.sum | 2 + exporter/go.mod | 4 + exporter/go.sum | 2 + exporter/nopexporter/go.mod | 2 + exporter/nopexporter/go.sum | 2 + exporter/otlpexporter/go.mod | 4 + exporter/otlpexporter/go.sum | 2 + exporter/otlphttpexporter/go.mod | 4 + exporter/otlphttpexporter/go.sum | 2 + 21 files changed, 1118 insertions(+), 749 deletions(-) create mode 100644 exporter/exporterhelper/internal/test_util.go diff --git a/exporter/debugexporter/go.mod b/exporter/debugexporter/go.mod index 8bca5d57efe..ea9f8254ecb 100644 --- a/exporter/debugexporter/go.mod +++ b/exporter/debugexporter/go.mod @@ -29,6 +29,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect @@ -45,6 +46,7 @@ require ( go.opentelemetry.io/collector/consumer/consumertest v0.114.0 // indirect go.opentelemetry.io/collector/extension v0.114.0 // indirect go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect + go.opentelemetry.io/collector/featuregate v1.20.0 // indirect go.opentelemetry.io/collector/pipeline v0.114.0 // indirect go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.114.0 // indirect go.opentelemetry.io/collector/receiver v0.114.0 // indirect @@ -115,3 +117,5 @@ replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/c replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest replace go.opentelemetry.io/collector/scraper => ../../scraper + +replace go.opentelemetry.io/collector/featuregate => ../../featuregate diff --git a/exporter/debugexporter/go.sum b/exporter/debugexporter/go.sum index 2a53303c08f..b66e027a4f7 100644 --- a/exporter/debugexporter/go.sum +++ b/exporter/debugexporter/go.sum @@ -17,6 +17,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/exporter/exporterhelper/exporterhelperprofiles/go.mod b/exporter/exporterhelper/exporterhelperprofiles/go.mod index ccb0df635f4..3da93d0516d 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/go.mod +++ b/exporter/exporterhelper/exporterhelperprofiles/go.mod @@ -31,6 +31,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -38,6 +39,7 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.114.0 // indirect go.opentelemetry.io/collector/extension v0.114.0 // indirect go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect + go.opentelemetry.io/collector/featuregate v1.20.0 // indirect go.opentelemetry.io/collector/pdata v1.20.0 // indirect go.opentelemetry.io/collector/pipeline v0.114.0 // indirect go.opentelemetry.io/collector/receiver v0.114.0 // indirect @@ -102,3 +104,5 @@ replace go.opentelemetry.io/collector/consumer/consumererror => ../../../consume replace go.opentelemetry.io/collector/extension/extensiontest => ../../../extension/extensiontest replace go.opentelemetry.io/collector/scraper => ../../../scraper + +replace go.opentelemetry.io/collector/featuregate => ../../../featuregate diff --git a/exporter/exporterhelper/exporterhelperprofiles/go.sum b/exporter/exporterhelper/exporterhelperprofiles/go.sum index ab12b8be855..11efd621040 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/go.sum +++ b/exporter/exporterhelper/exporterhelperprofiles/go.sum @@ -15,6 +15,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index f3c4e67113b..98b096ed9b7 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -21,9 +21,17 @@ import ( "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterqueue" // BaseExporter contains common fields between different exporter types. "go.opentelemetry.io/collector/exporter/internal" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pipeline" ) +var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegister( + "telemetry.UsePullingBasedExporterQueueBatcher", + featuregate.StageBeta, + featuregate.WithRegisterFromVersion("v0.114.0"), + featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"), +) + type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender // Option apply changes to BaseExporter. diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index 028b0127627..886f0ac7b58 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -38,52 +38,80 @@ func newNoopObsrepSender(*ObsReport) RequestSender { } func TestBaseExporter(t *testing.T) { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, be.Shutdown(context.Background())) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestBaseExporterWithOptions(t *testing.T) { - want := errors.New("my error") - be, err := NewBaseExporter( - defaultSettings, defaultSignal, newNoopObsrepSender, - WithStart(func(context.Context, component.Host) error { return want }), - WithShutdown(func(context.Context) error { return want }), - WithTimeout(NewDefaultTimeoutConfig()), - ) - require.NoError(t, err) - require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) - require.Equal(t, want, be.Shutdown(context.Background())) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + want := errors.New("my error") + be, err := NewBaseExporter( + defaultSettings, defaultSignal, newNoopObsrepSender, + WithStart(func(context.Context, component.Host) error { return want }), + WithShutdown(func(context.Context) error { return want }), + WithTimeout(NewDefaultTimeoutConfig()), + ) + require.NoError(t, err) + require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) + require.Equal(t, want, be.Shutdown(context.Background())) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueueOptionsWithRequestExporter(t *testing.T) { - bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, - WithRetry(configretry.NewDefaultBackOffConfig())) - require.NoError(t, err) - require.Nil(t, bs.Marshaler) - require.Nil(t, bs.Unmarshaler) - _, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, - WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig())) - require.Error(t, err) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, + WithRetry(configretry.NewDefaultBackOffConfig())) + require.NoError(t, err) + require.Nil(t, bs.Marshaler) + require.Nil(t, bs.Unmarshaler) + _, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, + WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig())) + require.Error(t, err) - _, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(configretry.NewDefaultBackOffConfig()), - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) - require.Error(t, err) + _, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(configretry.NewDefaultBackOffConfig()), + WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) + require.Error(t, err) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestBaseExporterLogging(t *testing.T) { - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.DebugLevel) - set.Logger = zap.New(logger) - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = false - bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, WithRetry(rCfg)) - require.NoError(t, err) - sendErr := bs.Send(context.Background(), newErrorRequest()) - require.Error(t, sendErr) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.Enabled = false + bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, WithRetry(rCfg)) + require.NoError(t, err) + sendErr := bs.Send(context.Background(), newErrorRequest()) + require.Error(t, sendErr) - require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1) + require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index f75febca205..61edb5cf9fd 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -45,13 +45,19 @@ func TestBatchSender_Merge(t *testing.T) { }(), }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + + runTest := func(testName string, enableQueueBatcher bool, tt struct { + name string + batcherOption Option + }) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) be := queueBatchExporter(t, tt.batcherOption) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() }) sink := newFakeRequestSink() @@ -81,6 +87,10 @@ func TestBatchSender_Merge(t *testing.T) { }, 100*time.Millisecond, 10*time.Millisecond) }) } + for _, tt := range tests { + runTest(tt.name+"_enable_queue_batcher", true, tt) + runTest(tt.name+"_disable_queue_batcher", false, tt) + } } func TestBatchSender_BatchExportError(t *testing.T) { @@ -115,13 +125,20 @@ func TestBatchSender_BatchExportError(t *testing.T) { expectedItems: 20, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + runTest := func(testName string, enableQueueBatcher bool, tt struct { + name string + batcherOption Option + expectedRequests int64 + expectedItems int64 + }) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) be := queueBatchExporter(t, tt.batcherOption) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() }) sink := newFakeRequestSink() @@ -146,85 +163,114 @@ func TestBatchSender_BatchExportError(t *testing.T) { }, 100*time.Millisecond, 10*time.Millisecond) }) } + for _, tt := range tests { + runTest(tt.name+"_enable_queue_batcher", true, tt) + runTest(tt.name+"_disable_queue_batcher", false, tt) + } } func TestBatchSender_MergeOrSplit(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 5 - cfg.MaxSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - be := queueBatchExporter(t, WithBatcher(cfg)) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) + cfg := exporterbatcher.NewDefaultConfig() + cfg.MinSizeItems = 5 + cfg.MaxSizeItems = 10 + cfg.FlushTimeout = 100 * time.Millisecond + be := queueBatchExporter(t, WithBatcher(cfg)) - sink := newFakeRequestSink() + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() + }) - // should be sent right away by reaching the minimum items size. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 - }, 50*time.Millisecond, 10*time.Millisecond) + sink := newFakeRequestSink() - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 17, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 - }, 50*time.Millisecond, 10*time.Millisecond) + // should be sent right away by reaching the minimum items size. + require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 50*time.Millisecond, 10*time.Millisecond) - // request that cannot be split should be dropped. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 11, sink: sink, - mergeErr: errors.New("split error")})) + // big request should be broken down into two requests, both are sent right away. + require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 17, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 + }, 50*time.Millisecond, 10*time.Millisecond) + + // request that cannot be split should be dropped. + require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 11, sink: sink, + mergeErr: errors.New("split error")})) - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 13, sink: sink})) + // big request should be broken down into two requests, both are sent right away. + require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 13, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 + }, 50*time.Millisecond, 10*time.Millisecond) + }) + } - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 - }, 50*time.Millisecond, 10*time.Millisecond) + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestBatchSender_Shutdown(t *testing.T) { - batchCfg := exporterbatcher.NewDefaultConfig() - batchCfg.MinSizeItems = 10 - be := queueBatchExporter(t, WithBatcher(batchCfg)) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + batchCfg := exporterbatcher.NewDefaultConfig() + batchCfg.MinSizeItems = 10 + be := queueBatchExporter(t, WithBatcher(batchCfg)) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + sink := newFakeRequestSink() + require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 3, sink: sink})) - sink := newFakeRequestSink() - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 3, sink: sink})) + // To make the request reached the batchSender before shutdown. + time.Sleep(50 * time.Millisecond) - // To make the request reached the batchSender before shutdown. - time.Sleep(50 * time.Millisecond) + require.NoError(t, be.Shutdown(context.Background())) - require.NoError(t, be.Shutdown(context.Background())) + // shutdown should force sending the batch + assert.Equal(t, int64(1), sink.requestsCount.Load()) + assert.Equal(t, int64(3), sink.itemsCount.Load()) + }) + } - // shutdown should force sending the batch - assert.Equal(t, int64(1), sink.requestsCount.Load()) - assert.Equal(t, int64(3), sink.itemsCount.Load()) + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestBatchSender_Disabled(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.Enabled = false - cfg.MaxSizeItems = 5 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(cfg)) - require.NotNil(t, be) - require.NoError(t, err) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = false + cfg.MaxSizeItems = 5 + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, + WithBatcher(cfg)) + require.NotNil(t, be) + require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) - sink := newFakeRequestSink() - // should be sent right away without splitting because batching is disabled. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Equal(t, int64(1), sink.requestsCount.Load()) - assert.Equal(t, int64(8), sink.itemsCount.Load()) + sink := newFakeRequestSink() + // should be sent right away without splitting because batching is disabled. + require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Equal(t, int64(1), sink.requestsCount.Load()) + assert.Equal(t, int64(8), sink.itemsCount.Load()) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } // func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { @@ -258,18 +304,25 @@ func TestBatchSender_Disabled(t *testing.T) { // } func TestBatchSender_PostShutdown(t *testing.T) { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig())) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, be.Shutdown(context.Background())) - - // Closed batch sender should act as a pass-through to not block queue draining. - sink := newFakeRequestSink() - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Equal(t, int64(1), sink.requestsCount.Load()) - assert.Equal(t, int64(8), sink.itemsCount.Load()) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, + WithBatcher(exporterbatcher.NewDefaultConfig())) + require.NotNil(t, be) + require.NoError(t, err) + assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, be.Shutdown(context.Background())) + + // Closed batch sender should act as a pass-through to not block queue draining. + sink := newFakeRequestSink() + require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) + assert.Equal(t, int64(1), sink.requestsCount.Load()) + assert.Equal(t, int64(8), sink.itemsCount.Load()) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { @@ -372,110 +425,138 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { } func TestBatchSender_BatchBlocking(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 3 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg)) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 6 blocking requests - wg := sync.WaitGroup{} - for i := 0; i < 6; i++ { - wg.Add(1) - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 3 + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, + WithBatcher(bCfg)) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - // should be sent in two batches since the batch size is 3 - assert.Equal(t, int64(2), sink.requestsCount.Load()) - assert.Equal(t, int64(6), sink.itemsCount.Load()) + sink := newFakeRequestSink() - require.NoError(t, be.Shutdown(context.Background())) + // send 6 blocking requests + wg := sync.WaitGroup{} + for i := 0; i < 6; i++ { + wg.Add(1) + go func() { + assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) + wg.Done() + }() + } + wg.Wait() + + // should be sent in two batches since the batch size is 3 + assert.Equal(t, int64(2), sink.requestsCount.Load()) + assert.Equal(t, int64(6), sink.itemsCount.Load()) + + require.NoError(t, be.Shutdown(context.Background())) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } // Validate that the batch is cancelled once the first request in the request is cancelled func TestBatchSender_BatchCancelled(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg)) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 2 blocking requests - wg := sync.WaitGroup{} - ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) - go func() { - assert.ErrorIs(t, be.Send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - wg.Add(1) - go func() { - time.Sleep(20 * time.Millisecond) // ensure this call is the second - assert.ErrorIs(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - cancel() // canceling the first request should cancel the whole batch - wg.Wait() - - // nothing should be delivered - assert.Equal(t, int64(0), sink.requestsCount.Load()) - assert.Equal(t, int64(0), sink.itemsCount.Load()) - - require.NoError(t, be.Shutdown(context.Background())) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 2 + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, + WithBatcher(bCfg)) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 2 blocking requests + wg := sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + wg.Add(1) + go func() { + assert.ErrorIs(t, be.Send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) + wg.Done() + }() + wg.Add(1) + go func() { + time.Sleep(20 * time.Millisecond) // ensure this call is the second + assert.ErrorIs(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) + wg.Done() + }() + cancel() // canceling the first request should cancel the whole batch + wg.Wait() + + // nothing should be delivered + assert.Equal(t, int64(0), sink.requestsCount.Load()) + assert.Equal(t, int64(0), sink.itemsCount.Load()) + + require.NoError(t, be.Shutdown(context.Background())) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestBatchSender_DrainActiveRequests(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg)) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 3 blocking requests with a timeout - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - - // give time for the first two requests to be batched - time.Sleep(20 * time.Millisecond) - - // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. - // It should take 120 milliseconds to complete. - require.NoError(t, be.Shutdown(context.Background())) - - assert.Equal(t, int64(2), sink.requestsCount.Load()) - assert.Equal(t, int64(3), sink.itemsCount.Load()) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 2 + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, + WithBatcher(bCfg)) + require.NotNil(t, be) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + sink := newFakeRequestSink() + + // send 3 blocking requests with a timeout + go func() { + assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + go func() { + assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + go func() { + assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) + }() + + // give time for the first two requests to be batched + time.Sleep(20 * time.Millisecond) + + // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. + // It should take 120 milliseconds to complete. + require.NoError(t, be.Shutdown(context.Background())) + + assert.Equal(t, int64(2), sink.requestsCount.Load()) + assert.Equal(t, int64(3), sink.itemsCount.Load()) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestBatchSender_UnstartedShutdown(t *testing.T) { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig())) - require.NoError(t, err) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, + WithBatcher(exporterbatcher.NewDefaultConfig())) + require.NoError(t, err) - err = be.Shutdown(context.Background()) - require.NoError(t, err) + err = be.Shutdown(context.Background()) + require.NoError(t, err) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } // TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being @@ -526,46 +607,53 @@ func TestBatchSender_UnstartedShutdown(t *testing.T) { // } func TestBatchSenderWithTimeout(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 10 - tCfg := NewDefaultTimeoutConfig() - tCfg.Timeout = 50 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg), - WithTimeout(tCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 10 + tCfg := NewDefaultTimeoutConfig() + tCfg.Timeout = 50 * time.Millisecond + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, + WithBatcher(bCfg), + WithTimeout(tCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() + sink := newFakeRequestSink() - // Send 3 concurrent requests that should be merged in one batch - wg := sync.WaitGroup{} - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - wg.Done() - }() - } - wg.Wait() - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) - - // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.Error(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() + // Send 3 concurrent requests that should be merged in one batch + wg := sync.WaitGroup{} + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) + wg.Done() + }() + } + wg.Wait() + assert.EqualValues(t, 1, sink.requestsCount.Load()) + assert.EqualValues(t, 12, sink.itemsCount.Load()) + + // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender + for i := 0; i < 3; i++ { + wg.Add(1) + go func() { + assert.Error(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) + wg.Done() + }() + } + wg.Wait() - require.NoError(t, be.Shutdown(context.Background())) + require.NoError(t, be.Shutdown(context.Background())) - // The sink should not change - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) + // The sink should not change + assert.EqualValues(t, 1, sink.requestsCount.Load()) + assert.EqualValues(t, 12, sink.itemsCount.Load()) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } // func TestBatchSenderTimerResetNoConflict(t *testing.T) { @@ -614,46 +702,53 @@ func TestBatchSenderWithTimeout(t *testing.T) { // } func TestBatchSenderTimerFlush(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + if runtime.GOOS == "windows" { + t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") + } + bCfg := exporterbatcher.NewDefaultConfig() + bCfg.MinSizeItems = 8 + bCfg.FlushTimeout = 100 * time.Millisecond + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, + WithBatcher(bCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + sink := newFakeRequestSink() + time.Sleep(50 * time.Millisecond) + + // Send 2 concurrent requests that should be merged in one batch and sent immediately + go func() { + assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + go func() { + assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + assert.EventuallyWithT(t, func(c *assert.CollectT) { + assert.LessOrEqual(c, int64(1), sink.requestsCount.Load()) + assert.EqualValues(c, 8, sink.itemsCount.Load()) + }, 30*time.Millisecond, 5*time.Millisecond) + + // Send another request that should be flushed after 100ms instead of 50ms since last flush + go func() { + assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) + }() + + // Confirm that it is not flushed in 50ms + time.Sleep(60 * time.Millisecond) + assert.LessOrEqual(t, int64(1), sink.requestsCount.Load()) + assert.EqualValues(t, 8, sink.itemsCount.Load()) + + // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) + time.Sleep(50 * time.Millisecond) + assert.LessOrEqual(t, int64(2), sink.requestsCount.Load()) + assert.EqualValues(t, 12, sink.itemsCount.Load()) + require.NoError(t, be.Shutdown(context.Background())) + }) } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 100 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - time.Sleep(50 * time.Millisecond) - - // Send 2 concurrent requests that should be merged in one batch and sent immediately - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, int64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 30*time.Millisecond, 5*time.Millisecond) - - // Send another request that should be flushed after 100ms instead of 50ms since last flush - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // Confirm that it is not flushed in 50ms - time.Sleep(60 * time.Millisecond) - assert.LessOrEqual(t, int64(1), sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) - - // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) - time.Sleep(50 * time.Millisecond) - assert.LessOrEqual(t, int64(2), sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) - require.NoError(t, be.Shutdown(context.Background())) + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func queueBatchExporter(t *testing.T, opts ...Option) *BaseExporter { diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 2c89d9c2c00..44735eaebfa 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -27,87 +27,112 @@ import ( ) func TestQueuedRetry_StopWhileWaiting(t *testing.T) { - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - firstMockR := newErrorRequest() - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), firstMockR)) - }) - - // Enqueue another request to ensure when calling shutdown we drain the queue. - secondMockR := newMockRequest(3, nil) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), secondMockR)) - }) - - require.LessOrEqual(t, 1, be.QueueSender.(*QueueSender).queue.Size()) - - require.NoError(t, be.Shutdown(context.Background())) - - secondMockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 3) - ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + rCfg := configretry.NewDefaultBackOffConfig() + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + firstMockR := newErrorRequest() + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), firstMockR)) + }) + + // Enqueue another request to ensure when calling shutdown we drain the queue. + secondMockR := newMockRequest(3, nil) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), secondMockR)) + }) + + require.LessOrEqual(t, 1, be.QueueSender.(*QueueSender).queue.Size()) + + require.NoError(t, be.Shutdown(context.Background())) + + secondMockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 3) + ocs.checkDroppedItemsCount(t, 7) + require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - ctx, cancelFunc := context.WithCancel(context.Background()) - cancelFunc() - mockR := newMockRequest(2, nil) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(ctx, mockR)) - }) - ocs.awaitAsyncProcessing() - - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + rCfg := configretry.NewDefaultBackOffConfig() + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() + }) + + ctx, cancelFunc := context.WithCancel(context.Background()) + cancelFunc() + mockR := newMockRequest(2, nil) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(ctx, mockR)) + }) + ocs.awaitAsyncProcessing() + + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) + }) + } + + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueuedRetry_RejectOnFull(t *testing.T) { - qCfg := NewDefaultQueueConfig() - qCfg.QueueSize = 0 - qCfg.NumConsumers = 0 - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.ErrorLevel) - set.Logger = zap.New(logger) - be, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - require.Error(t, be.Send(context.Background(), newMockRequest(2, nil))) - assert.Len(t, observed.All(), 1) - assert.Equal(t, "Exporting failed. Rejecting data.", observed.All()[0].Message) - assert.Equal(t, "sending queue is full", observed.All()[0].ContextMap()["error"]) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + qCfg.QueueSize = 0 + qCfg.NumConsumers = 0 + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() + }) + require.Error(t, be.Send(context.Background(), newMockRequest(2, nil))) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data.", observed.All()[0].Message) + assert.Equal(t, "sending queue is full", observed.All()[0].ContextMap()["error"]) + }) + } + + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueuedRetryHappyPath(t *testing.T) { @@ -162,8 +187,13 @@ func TestQueuedRetryHappyPath(t *testing.T) { }, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + + runTest := func(testName string, enableQueueBatcher bool, tt struct { + name string + queueOptions []Option + }) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) tel, err := componenttest.SetupTelemetry(defaultID) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) @@ -189,6 +219,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() }) // Wait until all batches received @@ -203,67 +234,94 @@ func TestQueuedRetryHappyPath(t *testing.T) { ocs.checkDroppedItemsCount(t, 0) }) } + for _, tt := range tests { + runTest(tt.name+"_enable_queue_batcher", true, tt) + runTest(tt.name+"_disable_queue_batcher", false, tt) + } } func TestQueuedRetry_QueueMetricsReported(t *testing.T) { - dataTypes := []pipeline.Signal{pipeline.SignalLogs, pipeline.SignalTraces, pipeline.SignalMetrics} - for _, dataType := range dataTypes { - tt, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 0 // to make every request go straight to the queue - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := NewBaseExporter(set, dataType, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize))) - - for i := 0; i < 7; i++ { - require.NoError(t, be.Send(context.Background(), newErrorRequest())) - } - require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7), - attribute.String(DataTypeKey, dataType.String()))) - - assert.NoError(t, be.Shutdown(context.Background())) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + dataTypes := []pipeline.Signal{pipeline.SignalLogs, pipeline.SignalTraces, pipeline.SignalMetrics} + for _, dataType := range dataTypes { + tt, err := componenttest.SetupTelemetry(defaultID) + require.NoError(t, err) + + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = -1 // to make QueueMetricsReportedvery request go straight to the queue + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} + be, err := NewBaseExporter(set, dataType, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize))) + + for i := 0; i < 7; i++ { + require.NoError(t, be.Send(context.Background(), newErrorRequest())) + } + require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7), + attribute.String(DataTypeKey, dataType.String()))) + + assert.NoError(t, be.Shutdown(context.Background())) + } + }) } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestNoCancellationContext(t *testing.T) { - deadline := time.Now().Add(1 * time.Second) - ctx, cancelFunc := context.WithDeadline(context.Background(), deadline) - cancelFunc() - require.Error(t, ctx.Err()) - d, ok := ctx.Deadline() - require.True(t, ok) - require.Equal(t, deadline, d) - - nctx := context.WithoutCancel(ctx) - require.NoError(t, nctx.Err()) - d, ok = nctx.Deadline() - assert.False(t, ok) - assert.True(t, d.IsZero()) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + deadline := time.Now().Add(1 * time.Second) + ctx, cancelFunc := context.WithDeadline(context.Background(), deadline) + cancelFunc() + require.Error(t, ctx.Err()) + d, ok := ctx.Deadline() + require.True(t, ok) + require.Equal(t, deadline, d) + + nctx := context.WithoutCancel(ctx) + require.NoError(t, nctx.Err()) + d, ok = nctx.Deadline() + assert.False(t, ok) + assert.True(t, d.IsZero()) + }) + } + + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueueConfig_Validate(t *testing.T) { - qCfg := NewDefaultQueueConfig() - require.NoError(t, qCfg.Validate()) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + require.NoError(t, qCfg.Validate()) - qCfg.QueueSize = 0 - require.EqualError(t, qCfg.Validate(), "queue size must be positive") + qCfg.QueueSize = 0 + require.EqualError(t, qCfg.Validate(), "queue size must be positive") - qCfg = NewDefaultQueueConfig() - qCfg.NumConsumers = 0 + qCfg = NewDefaultQueueConfig() + qCfg.NumConsumers = 0 - require.EqualError(t, qCfg.Validate(), "number of queue consumers must be positive") + require.EqualError(t, qCfg.Validate(), "number of queue consumers must be positive") + + // Confirm Validate doesn't return error with invalid config when feature is disabled + qCfg.Enabled = false + assert.NoError(t, qCfg.Validate()) + }) + } - // Confirm Validate doesn't return error with invalid config when feature is disabled - qCfg.Enabled = false - assert.NoError(t, qCfg.Validate()) + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueueRetryWithDisabledQueue(t *testing.T) { @@ -295,8 +353,12 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + runTest := func(testName string, enableQueueBatcher bool, tt struct { + name string + queueOptions []Option + }) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) set := exportertest.NewNopSettings() logger, observed := observer.New(zap.ErrorLevel) set.Logger = zap.New(logger) @@ -317,126 +379,171 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) }) } + for _, tt := range tests { + runTest(tt.name+"_enable_queue_batcher", true, tt) + runTest(tt.name+"_disable_queue_batcher", false, tt) + } } func TestQueueFailedRequestDropped(t *testing.T) { - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.ErrorLevel) - set.Logger = zap.New(logger) - be, err := NewBaseExporter(set, pipeline.SignalLogs, newNoopObsrepSender, - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - mockR := newMockRequest(2, errors.New("some error")) - require.NoError(t, be.Send(context.Background(), mockR)) - require.NoError(t, be.Shutdown(context.Background())) - mockR.checkNumRequests(t, 1) - assert.Len(t, observed.All(), 1) - assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := NewBaseExporter(set, pipeline.SignalLogs, newNoopObsrepSender, + WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + mockR := newMockRequest(2, errors.New("some error")) + require.NoError(t, be.Send(context.Background(), mockR)) + require.NoError(t, be.Shutdown(context.Background())) + mockR.checkNumRequests(t, 1) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message) + }) + } + + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueuedRetryPersistenceEnabled(t *testing.T) { - tt, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - qCfg := NewDefaultQueueConfig() - storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID // enable persistence - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - - var extensions = map[component.ID]component.Component{ - storageID: queue.NewMockStorageExtension(nil), + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + tt, err := componenttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := NewDefaultQueueConfig() + storageID := component.MustNewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} + be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + + var extensions = map[component.ID]component.Component{ + storageID: queue.NewMockStorageExtension(nil), + } + host := &MockHost{Ext: extensions} + + // we start correctly with a file storage extension + require.NoError(t, be.Start(context.Background(), host)) + require.NoError(t, be.Shutdown(context.Background())) + }) } - host := &MockHost{Ext: extensions} - // we start correctly with a file storage extension - require.NoError(t, be.Start(context.Background(), host)) - require.NoError(t, be.Shutdown(context.Background())) + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { - storageError := errors.New("could not get storage client") - tt, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - qCfg := NewDefaultQueueConfig() - storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID // enable persistence - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - - var extensions = map[component.ID]component.Component{ - storageID: queue.NewMockStorageExtension(storageError), + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + storageError := errors.New("could not get storage client") + tt, err := componenttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := NewDefaultQueueConfig() + storageID := component.MustNewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} + be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, WithMarshaler(mockRequestMarshaler), + WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + + var extensions = map[component.ID]component.Component{ + storageID: queue.NewMockStorageExtension(storageError), + } + host := &MockHost{Ext: extensions} + + // we fail to start if we get an error creating the storage client + require.Error(t, be.Start(context.Background(), host), "could not get storage client") + }) } - host := &MockHost{Ext: extensions} - // we fail to start if we get an error creating the storage client - require.Error(t, be.Start(context.Background(), host), "could not get storage client") + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown - - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = time.Millisecond - rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered - - mockReq := newErrorRequest() - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(mockReq)), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - - var extensions = map[component.ID]component.Component{ - storageID: queue.NewMockStorageExtension(nil), - } - host := &MockHost{Ext: extensions} + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + storageID := component.MustNewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown + + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = time.Millisecond + rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered + + mockReq := newErrorRequest() + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithMarshaler(mockRequestMarshaler), + WithUnmarshaler(mockRequestUnmarshaler(mockReq)), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), host)) + var extensions = map[component.ID]component.Component{ + storageID: queue.NewMockStorageExtension(nil), + } + host := &MockHost{Ext: extensions} - // Invoke queuedRetrySender so the producer will put the item for consumer to poll - require.NoError(t, be.Send(context.Background(), mockReq)) + require.NoError(t, be.Start(context.Background(), host)) - // first wait for the item to be consumed from the queue - assert.Eventually(t, func() bool { - return be.QueueSender.(*QueueSender).queue.Size() == 0 - }, time.Second, 1*time.Millisecond) + // Invoke queuedRetrySender so the producer will put the item for consumer to poll + require.NoError(t, be.Send(context.Background(), mockReq)) - // shuts down the exporter, unsent data should be preserved as in-flight data in the persistent queue. - require.NoError(t, be.Shutdown(context.Background())) + // first wait for the item to be consumed from the queue + assert.Eventually(t, func() bool { + return be.QueueSender.(*QueueSender).queue.Size() == 0 + }, time.Second, 1*time.Millisecond) - // start the exporter again replacing the preserved mockRequest in the unmarshaler with a new one that doesn't fail. - replacedReq := newMockRequest(1, nil) - be, err = NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(replacedReq)), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), host)) - t.Cleanup(func() { require.NoError(t, be.Shutdown(context.Background())) }) + // shuts down the exporter, unsent data should be preserved as in-flight data in the persistent queue. + require.NoError(t, be.Shutdown(context.Background())) - // wait for the item to be consumed from the queue - replacedReq.checkNumRequests(t, 1) + // start the exporter again replacing the preserved mockRequest in the unmarshaler with a new one that doesn't fail. + replacedReq := newMockRequest(1, nil) + be, err = NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithMarshaler(mockRequestMarshaler), + WithUnmarshaler(mockRequestUnmarshaler(replacedReq)), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), host)) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() + }) + + // wait for the item to be consumed from the queue + replacedReq.checkNumRequests(t, 1) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueueSenderNoStartShutdown(t *testing.T) { - queue := queue.NewBoundedMemoryQueue[internal.Request](queue.MemoryQueueSettings[internal.Request]{}) - set := exportertest.NewNopSettings() - obsrep, err := NewExporter(ObsReportSettings{ - ExporterID: exporterID, - ExporterCreateSettings: exportertest.NewNopSettings(), - }) - require.NoError(t, err) - qs := NewQueueSender(queue, set, 1, "", obsrep) - assert.NoError(t, qs.Shutdown(context.Background())) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + queue := queue.NewBoundedMemoryQueue[internal.Request](queue.MemoryQueueSettings[internal.Request]{}) + set := exportertest.NewNopSettings() + obsrep, err := NewExporter(ObsReportSettings{ + ExporterID: exporterID, + ExporterCreateSettings: exportertest.NewNopSettings(), + }) + require.NoError(t, err) + qs := NewQueueSender(queue, set, 1, "", obsrep) + assert.NoError(t, qs.Shutdown(context.Background())) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 9ebf4b1f5ad..e9ff0dbd94b 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -38,123 +38,155 @@ func mockRequestMarshaler(internal.Request) ([]byte, error) { } func TestQueuedRetry_DropOnPermanentError(t *testing.T) { - qCfg := NewDefaultQueueConfig() - rCfg := configretry.NewDefaultBackOffConfig() - mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(mockR)), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + rCfg := configretry.NewDefaultBackOffConfig() + mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data"))) + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(mockR)), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() + }) + + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueuedRetry_DropOnNoRetry(t *testing.T) { - qCfg := NewDefaultQueueConfig() - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = false - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error")))), - WithQueue(qCfg), WithRetry(rCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - mockR := newMockRequest(2, errors.New("transient error")) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.Enabled = false + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, WithMarshaler(mockRequestMarshaler), + WithUnmarshaler(mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error")))), + WithQueue(qCfg), WithRetry(rCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() + }) + + mockR := newMockRequest(2, errors.New("transient error")) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueuedRetry_OnError(t *testing.T) { - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = 0 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(2, traceErr) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = 0 + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() + }) + + traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) + mockR := newMockRequest(2, traceErr) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockR.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueuedRetry_MaxElapsedTime(t *testing.T) { - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = time.Millisecond - rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - ocs.run(func() { - // Add an item that will always fail. - require.NoError(t, be.Send(context.Background(), newErrorRequest())) - }) - - mockR := newMockRequest(2, nil) - start := time.Now() - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // We should ensure that we wait for more than 50ms but less than 150ms (50% less and 50% more than max elapsed). - waitingTime := time.Since(start) - assert.Less(t, 50*time.Millisecond, waitingTime) - assert.Less(t, waitingTime, 150*time.Millisecond) - - // In the newMockConcurrentExporter we count requests and items even for failed requests. - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = time.Millisecond + rCfg.MaxElapsedTime = 100 * time.Millisecond + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() + }) + + ocs.run(func() { + // Add an item that will always fail. + require.NoError(t, be.Send(context.Background(), newErrorRequest())) + }) + + mockR := newMockRequest(2, nil) + start := time.Now() + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + + // We should ensure that we wait for more than 50ms but less than 150ms (50% less and 50% more than max elapsed). + waitingTime := time.Since(start) + assert.Less(t, 50*time.Millisecond, waitingTime) + assert.Less(t, waitingTime, 150*time.Millisecond) + + // In the newMockConcurrentExporter we count requests and items even for failed requests. + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 7) + require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } type wrappedError struct { @@ -166,161 +198,198 @@ func (e wrappedError) Unwrap() error { } func TestQueuedRetry_ThrottleError(t *testing.T) { - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = 10 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - retry := NewThrottleRetry(errors.New("throttle error"), 100*time.Millisecond) - mockR := newMockRequest(2, wrappedError{retry}) - start := time.Now() - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // The initial backoff is 10ms, but because of the throttle this should wait at least 100ms. - assert.Less(t, 100*time.Millisecond, time.Since(start)) - - mockR.checkNumRequests(t, 2) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = 10 * time.Millisecond + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() + }) + + retry := NewThrottleRetry(errors.New("throttle error"), 100*time.Millisecond) + mockR := newMockRequest(2, wrappedError{retry}) + start := time.Now() + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + + // The initial backoff is 10ms, but because of the throttle this should wait at least 100ms. + assert.Less(t, 100*time.Millisecond, time.Since(start)) + + mockR.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueuedRetry_RetryOnError(t *testing.T) { - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - qCfg.QueueSize = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = 0 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - mockR := newMockRequest(2, errors.New("transient error")) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 1 + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = 0 + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + resetFeatureGate() + }) + + mockR := newMockRequest(2, errors.New("transient error")) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockR.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueueRetryWithNoQueue(t *testing.T) { - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.MaxElapsedTime = time.Nanosecond // fail fast - be, err := NewBaseExporter(exportertest.NewNopSettings(), pipeline.SignalLogs, newObservabilityConsumerSender, WithRetry(rCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(2, errors.New("some error")) - ocs.run(func() { - require.Error(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - require.NoError(t, be.Shutdown(context.Background())) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.MaxElapsedTime = time.Nanosecond // fail fast + be, err := NewBaseExporter(exportertest.NewNopSettings(), pipeline.SignalLogs, newObservabilityConsumerSender, WithRetry(rCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(2, errors.New("some error")) + ocs.run(func() { + require.Error(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestQueueRetryWithDisabledRetires(t *testing.T) { - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = false - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.ErrorLevel) - set.Logger = zap.New(logger) - be, err := NewBaseExporter(set, pipeline.SignalLogs, newObservabilityConsumerSender, WithRetry(rCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(2, errors.New("some error")) - ocs.run(func() { - require.Error(t, be.Send(context.Background(), mockR)) - }) - assert.Len(t, observed.All(), 1) - assert.Equal(t, "Exporting failed. Rejecting data. "+ - "Try enabling retry_on_failure config option to retry on retryable errors.", observed.All()[0].Message) - ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - require.NoError(t, be.Shutdown(context.Background())) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.Enabled = false + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := NewBaseExporter(set, pipeline.SignalLogs, newObservabilityConsumerSender, WithRetry(rCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(2, errors.New("some error")) + ocs.run(func() { + require.Error(t, be.Send(context.Background(), mockR)) + }) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data. "+ + "Try enabling retry_on_failure config option to retry on retryable errors.", observed.All()[0].Message) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } func TestRetryWithContextTimeout(t *testing.T) { - const testTimeout = 10 * time.Second - - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = true - - // First attempt after 100ms is attempted - rCfg.InitialInterval = 100 * time.Millisecond - rCfg.RandomizationFactor = 0 - // Second attempt is at twice the testTimeout - rCfg.Multiplier = float64(2 * testTimeout / rCfg.InitialInterval) - qCfg := exporterqueue.NewDefaultConfig() - qCfg.Enabled = false - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.InfoLevel) - set.Logger = zap.New(logger) - be, err := NewBaseExporter( - set, - pipeline.SignalLogs, - newObservabilityConsumerSender, - WithRetry(rCfg), - WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()), - ) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - mockR := newErrorRequest() - - start := time.Now() - ocs.run(func() { - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() - err := be.Send(ctx, mockR) - require.Error(t, err) - require.Equal(t, "request will be cancelled before next retry: transient error", err.Error()) - }) - assert.Len(t, observed.All(), 2) - assert.Equal(t, "Exporting failed. Will retry the request after interval.", observed.All()[0].Message) - assert.Equal(t, "Exporting failed. Rejecting data. "+ - "Try enabling sending_queue to survive temporary failures.", observed.All()[1].Message) - ocs.awaitAsyncProcessing() - ocs.checkDroppedItemsCount(t, 7) - require.Equal(t, 2, mockR.(*mockErrorRequest).getNumRequests()) - require.NoError(t, be.Shutdown(context.Background())) - - // There should be no delay, because the initial interval is - // longer than the context timeout. Merely checking that no - // delays on the order of either the context timeout or the - // retry interval were introduced, i.e., fail fast. - elapsed := time.Since(start) - require.Less(t, elapsed, testTimeout/2) + runTest := func(testName string, enableQueueBatcher bool) { + t.Run(testName, func(t *testing.T) { + defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + const testTimeout = 10 * time.Second + + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.Enabled = true + + // First attempt after 100ms is attempted + rCfg.InitialInterval = 100 * time.Millisecond + rCfg.RandomizationFactor = 0 + // Second attempt is at twice the testTimeout + rCfg.Multiplier = float64(2 * testTimeout / rCfg.InitialInterval) + qCfg := exporterqueue.NewDefaultConfig() + qCfg.Enabled = false + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.InfoLevel) + set.Logger = zap.New(logger) + be, err := NewBaseExporter( + set, + pipeline.SignalLogs, + newObservabilityConsumerSender, + WithRetry(rCfg), + WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()), + ) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + mockR := newErrorRequest() + + start := time.Now() + ocs.run(func() { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + err := be.Send(ctx, mockR) + require.Error(t, err) + require.Equal(t, "request will be cancelled before next retry: transient error", err.Error()) + }) + assert.Len(t, observed.All(), 2) + assert.Equal(t, "Exporting failed. Will retry the request after interval.", observed.All()[0].Message) + assert.Equal(t, "Exporting failed. Rejecting data. "+ + "Try enabling sending_queue to survive temporary failures.", observed.All()[1].Message) + ocs.awaitAsyncProcessing() + ocs.checkDroppedItemsCount(t, 7) + require.Equal(t, 2, mockR.(*mockErrorRequest).getNumRequests()) + require.NoError(t, be.Shutdown(context.Background())) + + // There should be no delay, because the initial interval is + // longer than the context timeout. Merely checking that no + // delays on the order of either the context timeout or the + // retry interval were introduced, i.e., fail fast. + elapsed := time.Since(start) + require.Less(t, elapsed, testTimeout/2) + }) + } + runTest("enable_queue_batcher", true) + runTest("disable_queue_batcher", false) } type mockErrorRequest struct { diff --git a/exporter/exporterhelper/internal/test_util.go b/exporter/exporterhelper/internal/test_util.go new file mode 100644 index 00000000000..6b94ccf2d65 --- /dev/null +++ b/exporter/exporterhelper/internal/test_util.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/featuregate" +) + +func setFeatureGateForTest(t testing.TB, gate *featuregate.Gate, enabled bool) func() { + originalValue := gate.IsEnabled() + require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), enabled)) + return func() { + require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), originalValue)) + } +} diff --git a/exporter/exporterprofiles/go.mod b/exporter/exporterprofiles/go.mod index 7f5c3984a65..9382654f126 100644 --- a/exporter/exporterprofiles/go.mod +++ b/exporter/exporterprofiles/go.mod @@ -77,3 +77,5 @@ replace go.opentelemetry.io/collector/receiver/receivertest => ../../receiver/re replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest replace go.opentelemetry.io/collector/scraper => ../../scraper + +replace go.opentelemetry.io/collector/featuregate => ../../featuregate diff --git a/exporter/exportertest/go.mod b/exporter/exportertest/go.mod index 37b7166a81b..95993327e59 100644 --- a/exporter/exportertest/go.mod +++ b/exporter/exportertest/go.mod @@ -27,6 +27,7 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -35,6 +36,7 @@ require ( go.opentelemetry.io/collector/consumer/consumerprofiles v0.114.0 // indirect go.opentelemetry.io/collector/extension v0.114.0 // indirect go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect + go.opentelemetry.io/collector/featuregate v1.20.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.114.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect @@ -92,3 +94,5 @@ replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/c replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest replace go.opentelemetry.io/collector/scraper => ../../scraper + +replace go.opentelemetry.io/collector/featuregate => ../../featuregate diff --git a/exporter/exportertest/go.sum b/exporter/exportertest/go.sum index ab12b8be855..11efd621040 100644 --- a/exporter/exportertest/go.sum +++ b/exporter/exportertest/go.sum @@ -15,6 +15,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/exporter/go.mod b/exporter/go.mod index a5915c6d425..d63ee7d9d16 100644 --- a/exporter/go.mod +++ b/exporter/go.mod @@ -15,6 +15,7 @@ require ( go.opentelemetry.io/collector/exporter/exportertest v0.114.0 go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 go.opentelemetry.io/collector/extension/extensiontest v0.114.0 + go.opentelemetry.io/collector/featuregate v1.20.0 go.opentelemetry.io/collector/pdata v1.20.0 go.opentelemetry.io/collector/pdata/pprofile v0.114.0 go.opentelemetry.io/collector/pdata/testdata v0.114.0 @@ -35,6 +36,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -97,3 +99,5 @@ replace go.opentelemetry.io/collector/consumer/consumererror => ../consumer/cons replace go.opentelemetry.io/collector/extension/extensiontest => ../extension/extensiontest replace go.opentelemetry.io/collector/scraper => ../scraper + +replace go.opentelemetry.io/collector/featuregate => ../featuregate diff --git a/exporter/go.sum b/exporter/go.sum index ab12b8be855..11efd621040 100644 --- a/exporter/go.sum +++ b/exporter/go.sum @@ -15,6 +15,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/exporter/nopexporter/go.mod b/exporter/nopexporter/go.mod index e2781ca90ca..8445cd75c30 100644 --- a/exporter/nopexporter/go.mod +++ b/exporter/nopexporter/go.mod @@ -101,3 +101,5 @@ replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/c replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension/extensiontest replace go.opentelemetry.io/collector/scraper => ../../scraper + +replace go.opentelemetry.io/collector/featuregate => ../../featuregate diff --git a/exporter/nopexporter/go.sum b/exporter/nopexporter/go.sum index 2a53303c08f..b66e027a4f7 100644 --- a/exporter/nopexporter/go.sum +++ b/exporter/nopexporter/go.sum @@ -17,6 +17,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/exporter/otlpexporter/go.mod b/exporter/otlpexporter/go.mod index c7fdede9985..4fa5d0430cc 100644 --- a/exporter/otlpexporter/go.mod +++ b/exporter/otlpexporter/go.mod @@ -40,6 +40,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect @@ -61,6 +62,7 @@ require ( go.opentelemetry.io/collector/extension v0.114.0 // indirect go.opentelemetry.io/collector/extension/auth v0.114.0 // indirect go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect + go.opentelemetry.io/collector/featuregate v1.20.0 // indirect go.opentelemetry.io/collector/pipeline v0.114.0 // indirect go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.114.0 // indirect go.opentelemetry.io/collector/receiver v0.114.0 // indirect @@ -157,3 +159,5 @@ replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension replace go.opentelemetry.io/collector/extension/auth/authtest => ../../extension/auth/authtest replace go.opentelemetry.io/collector/scraper => ../../scraper + +replace go.opentelemetry.io/collector/featuregate => ../../featuregate diff --git a/exporter/otlpexporter/go.sum b/exporter/otlpexporter/go.sum index 20ad8a027dd..9db2fbfc8a3 100644 --- a/exporter/otlpexporter/go.sum +++ b/exporter/otlpexporter/go.sum @@ -21,6 +21,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/exporter/otlphttpexporter/go.mod b/exporter/otlphttpexporter/go.mod index 4e9f02904a7..fdddca7c960 100644 --- a/exporter/otlphttpexporter/go.mod +++ b/exporter/otlphttpexporter/go.mod @@ -39,6 +39,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect @@ -61,6 +62,7 @@ require ( go.opentelemetry.io/collector/extension v0.114.0 // indirect go.opentelemetry.io/collector/extension/auth v0.114.0 // indirect go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 // indirect + go.opentelemetry.io/collector/featuregate v1.20.0 // indirect go.opentelemetry.io/collector/pipeline v0.114.0 // indirect go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.114.0 // indirect go.opentelemetry.io/collector/receiver v0.114.0 // indirect @@ -155,3 +157,5 @@ replace go.opentelemetry.io/collector/extension/extensiontest => ../../extension replace go.opentelemetry.io/collector/extension/auth/authtest => ../../extension/auth/authtest replace go.opentelemetry.io/collector/scraper => ../../scraper + +replace go.opentelemetry.io/collector/featuregate => ../../featuregate diff --git a/exporter/otlphttpexporter/go.sum b/exporter/otlphttpexporter/go.sum index 1502232cde3..502920bff56 100644 --- a/exporter/otlphttpexporter/go.sum +++ b/exporter/otlphttpexporter/go.sum @@ -23,6 +23,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=