From 3617a1de0ef1f18444b539e7d32ca289056a9718 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Sat, 28 Sep 2024 17:36:18 -0700 Subject: [PATCH] POC --- .../internal/batch_sender_test.go | 1420 ++++++++--------- .../exporterhelper/internal/queue_sender.go | 32 +- .../internal/queue_sender_test.go | 254 +-- .../internal/retry_sender_test.go | 80 +- exporter/internal/queue/batcher.go | 166 ++ .../internal/queue/bounded_memory_queue.go | 18 +- exporter/internal/queue/persistent_queue.go | 71 +- exporter/internal/queue/queue.go | 4 + exporter/internal/queue/sized_channel.go | 44 +- 9 files changed, 1169 insertions(+), 920 deletions(-) create mode 100644 exporter/internal/queue/batcher.go diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index c9f6cc084e2..e8ee60f9b8c 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -1,713 +1,713 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 +// // Copyright The OpenTelemetry Authors +// // SPDX-License-Identifier: Apache-2.0 package internal -import ( - "context" - "errors" - "runtime" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/exporter/exporterbatcher" - "go.opentelemetry.io/collector/exporter/exporterqueue" - "go.opentelemetry.io/collector/exporter/internal" -) - -func TestBatchSender_Merge(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10758") - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - - tests := []struct { - name string - batcherOption Option - }{ - { - name: "split_disabled", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - }, - { - name: "split_high_limit", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 1000 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 3, sink: sink})) - - // the first two requests should be merged into one and sent by reaching the minimum items size - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 - }, 50*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 3, sink: sink})) - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink})) - - // the third and fifth requests should be sent by reaching the timeout - // the fourth request should be ignored because of the merge error. - time.Sleep(50 * time.Millisecond) - - // should be ignored because of the merge error. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 3, sink: sink, - mergeErr: errors.New("merge error")})) - - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_BatchExportError(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - tests := []struct { - name string - batcherOption Option - expectedRequests uint64 - expectedItems uint64 - }{ - { - name: "merge_only", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - }, - { - name: "merge_without_split_triggered", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 200 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - }, - { - name: "merge_with_split_triggered", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 20 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - expectedRequests: 1, - expectedItems: 20, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - - // the first two requests should be blocked by the batchSender. - time.Sleep(50 * time.Millisecond) - assert.Equal(t, uint64(0), sink.requestsCount.Load()) - - // the third request should trigger the export and cause an error. - errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} - require.NoError(t, be.Send(context.Background(), errReq)) - - // the batch should be dropped since the queue doesn't have requeuing enabled. - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == tt.expectedRequests && - sink.itemsCount.Load() == tt.expectedItems && - be.BatchSender.(*BatchSender).activeRequests.Load() == 0 && - be.QueueSender.(*QueueSender).queue.Size() == 0 - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_MergeOrSplit(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 5 - cfg.MaxSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - // should be sent right away by reaching the minimum items size. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 - }, 50*time.Millisecond, 10*time.Millisecond) - - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 17, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 - }, 50*time.Millisecond, 10*time.Millisecond) - - // request that cannot be split should be dropped. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 11, sink: sink, - mergeErr: errors.New("split error")})) - - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 13, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 - }, 50*time.Millisecond, 10*time.Millisecond) -} - -func TestBatchSender_Shutdown(t *testing.T) { - batchCfg := exporterbatcher.NewDefaultConfig() - batchCfg.MinSizeItems = 10 - be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 3, sink: sink})) - - // To make the request reached the batchSender before shutdown. - time.Sleep(50 * time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) - - // shutdown should force sending the batch - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(3), sink.itemsCount.Load()) -} - -func TestBatchSender_Disabled(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.Enabled = false - cfg.MaxSizeItems = 5 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // should be sent right away without splitting because batching is disabled. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(8), sink.itemsCount.Load()) -} - -func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { - invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ internal.Request, req2 internal.Request) ([]internal.Request, - error) { - // reply with invalid 0 length slice if req2 is more than 20 items - if req2.(*fakeRequest).items > 20 { - return []internal.Request{}, nil - } - // otherwise reply with a single request. - return []internal.Request{req2}, nil - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 20 - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // first request should be ignored due to invalid merge/split function. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 30, sink: sink})) - // second request should be sent after reaching the timeout. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 15, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) -} - -func TestBatchSender_PostShutdown(t *testing.T) { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, be.Shutdown(context.Background())) - - // Closed batch sender should act as a pass-through to not block queue draining. - sink := newFakeRequestSink() - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(8), sink.itemsCount.Load()) -} - -func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810") - } - tests := []struct { - name string - batcherCfg exporterbatcher.Config - expectedRequests uint64 - expectedItems uint64 - }{ - { - name: "merge_only", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 20 * time.Millisecond - return cfg - }(), - expectedRequests: 6, - expectedItems: 51, - }, - { - name: "merge_without_split_triggered", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 20 * time.Millisecond - cfg.MaxSizeItems = 200 - return cfg - }(), - expectedRequests: 6, - expectedItems: 51, - }, - { - name: "merge_with_split_triggered", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 10 - return cfg - }(), - expectedRequests: 8, - expectedItems: 51, - }, - } - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - qCfg := exporterqueue.NewDefaultConfig() - qCfg.NumConsumers = 2 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]())) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // the 1st and 2nd request should be flushed in the same batched request by max concurrency limit. - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4 - }, 100*time.Millisecond, 10*time.Millisecond) - - // the 3rd request should be flushed by itself due to flush interval - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 6 - }, 100*time.Millisecond, 10*time.Millisecond) - - // the 4th and 5th request should be flushed in the same batched request by max concurrency limit. - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 10 - }, 100*time.Millisecond, 10*time.Millisecond) - - // do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling. - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 5, sink: sink})) - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 6, sink: sink})) - if tt.batcherCfg.MaxSizeItems == 10 { - // in case of MaxSizeItems=10, wait for the leftover request to send - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 21 - }, 50*time.Millisecond, 10*time.Millisecond) - } - - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 6, sink: sink})) - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 20, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_BatchBlocking(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 3 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 6 blocking requests - wg := sync.WaitGroup{} - for i := 0; i < 6; i++ { - wg.Add(1) - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - // should be sent in two batches since the batch size is 3 - assert.Equal(t, uint64(2), sink.requestsCount.Load()) - assert.Equal(t, uint64(6), sink.itemsCount.Load()) - - require.NoError(t, be.Shutdown(context.Background())) -} - -// Validate that the batch is cancelled once the first request in the request is cancelled -func TestBatchSender_BatchCancelled(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 2 blocking requests - wg := sync.WaitGroup{} - ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) - go func() { - assert.ErrorIs(t, be.Send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - wg.Add(1) - go func() { - time.Sleep(20 * time.Millisecond) // ensure this call is the second - assert.ErrorIs(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - cancel() // canceling the first request should cancel the whole batch - wg.Wait() - - // nothing should be delivered - assert.Equal(t, uint64(0), sink.requestsCount.Load()) - assert.Equal(t, uint64(0), sink.itemsCount.Load()) - - require.NoError(t, be.Shutdown(context.Background())) -} - -func TestBatchSender_DrainActiveRequests(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 3 blocking requests with a timeout - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - - // give time for the first two requests to be batched - time.Sleep(20 * time.Millisecond) - - // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. - // It should take 120 milliseconds to complete. - require.NoError(t, be.Shutdown(context.Background())) - - assert.Equal(t, uint64(2), sink.requestsCount.Load()) - assert.Equal(t, uint64(3), sink.itemsCount.Load()) -} - -func TestBatchSender_WithBatcherOption(t *testing.T) { - tests := []struct { - name string - opts []Option - expectedErr bool - }{ - { - name: "no_funcs_set", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: true, - }, - { - name: "funcs_set_internally", - opts: []Option{WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: false, - }, - { - name: "funcs_set_twice", - opts: []Option{ - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc)), - }, - expectedErr: true, - }, - { - name: "nil_funcs", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))}, - expectedErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, tt.opts...) - if tt.expectedErr { - assert.Nil(t, be) - assert.Error(t, err) - } else { - assert.NotNil(t, be) - assert.NoError(t, err) - } - }) - } -} - -func TestBatchSender_UnstartedShutdown(t *testing.T) { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - - err = be.Shutdown(context.Background()) - require.NoError(t, err) -} - -// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being -// merged. -func TestBatchSender_ShutdownDeadlock(t *testing.T) { - blockMerge := make(chan struct{}) - waitMerge := make(chan struct{}, 10) - - // blockedBatchMergeFunc blocks until the blockMerge channel is closed - blockedBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { - waitMerge <- struct{}{} - <-blockMerge - r1.(*fakeRequest).items += r2.(*fakeRequest).items - return r1, nil - } - - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // Send 2 concurrent requests - go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - - // Wait for the requests to enter the merge function - <-waitMerge - - // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, - // then wait for the exporter to finish. - startShutdown := make(chan struct{}) - doneShutdown := make(chan struct{}) - go func() { - close(startShutdown) - assert.NoError(t, be.Shutdown(context.Background())) - close(doneShutdown) - }() - <-startShutdown - close(blockMerge) - <-doneShutdown - - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) -} - -func TestBatchSenderWithTimeout(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 10 - tCfg := NewDefaultTimeoutConfig() - tCfg.Timeout = 50 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - WithTimeout(tCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // Send 3 concurrent requests that should be merged in one batch - wg := sync.WaitGroup{} - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - wg.Done() - }() - } - wg.Wait() - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) - - // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.Error(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - require.NoError(t, be.Shutdown(context.Background())) - - // The sink should not change - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) -} - -func TestBatchSenderTimerResetNoConflict(t *testing.T) { - delayBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { - time.Sleep(30 * time.Millisecond) - if r1 == nil { - return r2, nil - } - fr1 := r1.(*fakeRequest) - fr2 := r2.(*fakeRequest) - if fr2.mergeErr != nil { - return nil, fr2.mergeErr - } - return &fakeRequest{ - items: fr1.items + fr2.items, - sink: fr1.sink, - exportErr: fr2.exportErr, - delay: fr1.delay + fr2.delay, - }, nil - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 50 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - - // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - time.Sleep(30 * time.Millisecond) - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 200*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) -} - -func TestBatchSenderTimerFlush(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 100 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - time.Sleep(50 * time.Millisecond) - - // Send 2 concurrent requests that should be merged in one batch and sent immediately - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 30*time.Millisecond, 5*time.Millisecond) - - // Send another request that should be flushed after 100ms instead of 50ms since last flush - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // Confirm that it is not flushed in 50ms - time.Sleep(60 * time.Millisecond) - assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) - - // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) - time.Sleep(50 * time.Millisecond) - assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) - require.NoError(t, be.Shutdown(context.Background())) -} - -func queueBatchExporter(t *testing.T, batchOption Option) *BaseExporter { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, batchOption, - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) - require.NotNil(t, be) - require.NoError(t, err) - return be -} +// import ( +// "context" +// "errors" +// "runtime" +// "sync" +// "testing" +// "time" + +// "github.com/stretchr/testify/assert" +// "github.com/stretchr/testify/require" + +// "go.opentelemetry.io/collector/component/componenttest" +// "go.opentelemetry.io/collector/exporter/exporterbatcher" +// "go.opentelemetry.io/collector/exporter/exporterqueue" +// "go.opentelemetry.io/collector/exporter/internal" +// ) + +// func TestBatchSender_Merge(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10758") +// } +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 10 +// cfg.FlushTimeout = 100 * time.Millisecond + +// tests := []struct { +// name string +// batcherOption Option +// }{ +// { +// name: "split_disabled", +// batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// }, +// { +// name: "split_high_limit", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 1000 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be := queueBatchExporter(t, tt.batcherOption) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 3, sink: sink})) + +// // the first two requests should be merged into one and sent by reaching the minimum items size +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 3, sink: sink})) +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink})) + +// // the third and fifth requests should be sent by reaching the timeout +// // the fourth request should be ignored because of the merge error. +// time.Sleep(50 * time.Millisecond) + +// // should be ignored because of the merge error. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 3, sink: sink, +// mergeErr: errors.New("merge error")})) + +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_BatchExportError(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 10 +// tests := []struct { +// name string +// batcherOption Option +// expectedRequests uint64 +// expectedItems uint64 +// }{ +// { +// name: "merge_only", +// batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// }, +// { +// name: "merge_without_split_triggered", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 200 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// }, +// { +// name: "merge_with_split_triggered", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 20 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// expectedRequests: 1, +// expectedItems: 20, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be := queueBatchExporter(t, tt.batcherOption) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) + +// // the first two requests should be blocked by the batchSender. +// time.Sleep(50 * time.Millisecond) +// assert.Equal(t, uint64(0), sink.requestsCount.Load()) + +// // the third request should trigger the export and cause an error. +// errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} +// require.NoError(t, be.Send(context.Background(), errReq)) + +// // the batch should be dropped since the queue doesn't have requeuing enabled. +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == tt.expectedRequests && +// sink.itemsCount.Load() == tt.expectedItems && +// be.BatchSender.(*BatchSender).activeRequests.Load() == 0 && +// be.QueueSender.(*QueueSender).queue.Size() == 0 +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_MergeOrSplit(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 5 +// cfg.MaxSizeItems = 10 +// cfg.FlushTimeout = 100 * time.Millisecond +// be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// // should be sent right away by reaching the minimum items size. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// // big request should be broken down into two requests, both are sent right away. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 17, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// // request that cannot be split should be dropped. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 11, sink: sink, +// mergeErr: errors.New("split error")})) + +// // big request should be broken down into two requests, both are sent right away. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 13, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 +// }, 50*time.Millisecond, 10*time.Millisecond) +// } + +// func TestBatchSender_Shutdown(t *testing.T) { +// batchCfg := exporterbatcher.NewDefaultConfig() +// batchCfg.MinSizeItems = 10 +// be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 3, sink: sink})) + +// // To make the request reached the batchSender before shutdown. +// time.Sleep(50 * time.Millisecond) + +// require.NoError(t, be.Shutdown(context.Background())) + +// // shutdown should force sending the batch +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(3), sink.itemsCount.Load()) +// } + +// func TestBatchSender_Disabled(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.Enabled = false +// cfg.MaxSizeItems = 5 +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // should be sent right away without splitting because batching is disabled. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(8), sink.itemsCount.Load()) +// } + +// func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { +// invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ internal.Request, req2 internal.Request) ([]internal.Request, +// error) { +// // reply with invalid 0 length slice if req2 is more than 20 items +// if req2.(*fakeRequest).items > 20 { +// return []internal.Request{}, nil +// } +// // otherwise reply with a single request. +// return []internal.Request{req2}, nil +// } +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 50 * time.Millisecond +// cfg.MaxSizeItems = 20 +// be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // first request should be ignored due to invalid merge/split function. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 30, sink: sink})) +// // second request should be sent after reaching the timeout. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 15, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 +// }, 100*time.Millisecond, 10*time.Millisecond) +// } + +// func TestBatchSender_PostShutdown(t *testing.T) { +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, +// fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// assert.NoError(t, be.Shutdown(context.Background())) + +// // Closed batch sender should act as a pass-through to not block queue draining. +// sink := newFakeRequestSink() +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(8), sink.itemsCount.Load()) +// } + +// func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810") +// } +// tests := []struct { +// name string +// batcherCfg exporterbatcher.Config +// expectedRequests uint64 +// expectedItems uint64 +// }{ +// { +// name: "merge_only", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 20 * time.Millisecond +// return cfg +// }(), +// expectedRequests: 6, +// expectedItems: 51, +// }, +// { +// name: "merge_without_split_triggered", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 20 * time.Millisecond +// cfg.MaxSizeItems = 200 +// return cfg +// }(), +// expectedRequests: 6, +// expectedItems: 51, +// }, +// { +// name: "merge_with_split_triggered", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 50 * time.Millisecond +// cfg.MaxSizeItems = 10 +// return cfg +// }(), +// expectedRequests: 8, +// expectedItems: 51, +// }, +// } +// for _, tt := range tests { +// tt := tt +// t.Run(tt.name, func(t *testing.T) { +// qCfg := exporterqueue.NewDefaultConfig() +// qCfg.NumConsumers = 2 +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]())) +// require.NotNil(t, be) +// require.NoError(t, err) +// assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// assert.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // the 1st and 2nd request should be flushed in the same batched request by max concurrency limit. +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // the 3rd request should be flushed by itself due to flush interval +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 6 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // the 4th and 5th request should be flushed in the same batched request by max concurrency limit. +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 10 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling. +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 5, sink: sink})) +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 6, sink: sink})) +// if tt.batcherCfg.MaxSizeItems == 10 { +// // in case of MaxSizeItems=10, wait for the leftover request to send +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 21 +// }, 50*time.Millisecond, 10*time.Millisecond) +// } + +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 6, sink: sink})) +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 20, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_BatchBlocking(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 3 +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 6 blocking requests +// wg := sync.WaitGroup{} +// for i := 0; i < 6; i++ { +// wg.Add(1) +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) +// wg.Done() +// }() +// } +// wg.Wait() + +// // should be sent in two batches since the batch size is 3 +// assert.Equal(t, uint64(2), sink.requestsCount.Load()) +// assert.Equal(t, uint64(6), sink.itemsCount.Load()) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// // Validate that the batch is cancelled once the first request in the request is cancelled +// func TestBatchSender_BatchCancelled(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 2 +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 2 blocking requests +// wg := sync.WaitGroup{} +// ctx, cancel := context.WithCancel(context.Background()) +// wg.Add(1) +// go func() { +// assert.ErrorIs(t, be.Send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) +// wg.Done() +// }() +// wg.Add(1) +// go func() { +// time.Sleep(20 * time.Millisecond) // ensure this call is the second +// assert.ErrorIs(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) +// wg.Done() +// }() +// cancel() // canceling the first request should cancel the whole batch +// wg.Wait() + +// // nothing should be delivered +// assert.Equal(t, uint64(0), sink.requestsCount.Load()) +// assert.Equal(t, uint64(0), sink.itemsCount.Load()) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func TestBatchSender_DrainActiveRequests(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 2 +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 3 blocking requests with a timeout +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() + +// // give time for the first two requests to be batched +// time.Sleep(20 * time.Millisecond) + +// // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. +// // It should take 120 milliseconds to complete. +// require.NoError(t, be.Shutdown(context.Background())) + +// assert.Equal(t, uint64(2), sink.requestsCount.Load()) +// assert.Equal(t, uint64(3), sink.itemsCount.Load()) +// } + +// func TestBatchSender_WithBatcherOption(t *testing.T) { +// tests := []struct { +// name string +// opts []Option +// expectedErr bool +// }{ +// { +// name: "no_funcs_set", +// opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig())}, +// expectedErr: true, +// }, +// { +// name: "funcs_set_internally", +// opts: []Option{WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, +// expectedErr: false, +// }, +// { +// name: "funcs_set_twice", +// opts: []Option{ +// WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, +// fakeBatchMergeSplitFunc)), +// }, +// expectedErr: true, +// }, +// { +// name: "nil_funcs", +// opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))}, +// expectedErr: true, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, tt.opts...) +// if tt.expectedErr { +// assert.Nil(t, be) +// assert.Error(t, err) +// } else { +// assert.NotNil(t, be) +// assert.NoError(t, err) +// } +// }) +// } +// } + +// func TestBatchSender_UnstartedShutdown(t *testing.T) { +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) + +// err = be.Shutdown(context.Background()) +// require.NoError(t, err) +// } + +// // TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being +// // merged. +// func TestBatchSender_ShutdownDeadlock(t *testing.T) { +// blockMerge := make(chan struct{}) +// waitMerge := make(chan struct{}, 10) + +// // blockedBatchMergeFunc blocks until the blockMerge channel is closed +// blockedBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { +// waitMerge <- struct{}{} +// <-blockMerge +// r1.(*fakeRequest).items += r2.(*fakeRequest).items +// return r1, nil +// } + +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests +// go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() +// go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() + +// // Wait for the requests to enter the merge function +// <-waitMerge + +// // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, +// // then wait for the exporter to finish. +// startShutdown := make(chan struct{}) +// doneShutdown := make(chan struct{}) +// go func() { +// close(startShutdown) +// assert.NoError(t, be.Shutdown(context.Background())) +// close(doneShutdown) +// }() +// <-startShutdown +// close(blockMerge) +// <-doneShutdown + +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 8, sink.itemsCount.Load()) +// } + +// func TestBatchSenderWithTimeout(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 10 +// tCfg := NewDefaultTimeoutConfig() +// tCfg.Timeout = 50 * time.Millisecond +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// WithTimeout(tCfg)) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // Send 3 concurrent requests that should be merged in one batch +// wg := sync.WaitGroup{} +// for i := 0; i < 3; i++ { +// wg.Add(1) +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// wg.Done() +// }() +// } +// wg.Wait() +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) + +// // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender +// for i := 0; i < 3; i++ { +// wg.Add(1) +// go func() { +// assert.Error(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) +// wg.Done() +// }() +// } +// wg.Wait() + +// require.NoError(t, be.Shutdown(context.Background())) + +// // The sink should not change +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) +// } + +// func TestBatchSenderTimerResetNoConflict(t *testing.T) { +// delayBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { +// time.Sleep(30 * time.Millisecond) +// if r1 == nil { +// return r2, nil +// } +// fr1 := r1.(*fakeRequest) +// fr2 := r2.(*fakeRequest) +// if fr2.mergeErr != nil { +// return nil, fr2.mergeErr +// } +// return &fakeRequest{ +// items: fr1.items + fr2.items, +// sink: fr1.sink, +// exportErr: fr2.exportErr, +// delay: fr1.delay + fr2.delay, +// }, nil +// } +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 8 +// bCfg.FlushTimeout = 50 * time.Millisecond +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// time.Sleep(30 * time.Millisecond) +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() + +// // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict +// assert.EventuallyWithT(t, func(c *assert.CollectT) { +// assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(c, 8, sink.itemsCount.Load()) +// }, 200*time.Millisecond, 10*time.Millisecond) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func TestBatchSenderTimerFlush(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") +// } +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 8 +// bCfg.FlushTimeout = 100 * time.Millisecond +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// sink := newFakeRequestSink() +// time.Sleep(50 * time.Millisecond) + +// // Send 2 concurrent requests that should be merged in one batch and sent immediately +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// assert.EventuallyWithT(t, func(c *assert.CollectT) { +// assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(c, 8, sink.itemsCount.Load()) +// }, 30*time.Millisecond, 5*time.Millisecond) + +// // Send another request that should be flushed after 100ms instead of 50ms since last flush +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() + +// // Confirm that it is not flushed in 50ms +// time.Sleep(60 * time.Millisecond) +// assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(t, 8, sink.itemsCount.Load()) + +// // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) +// time.Sleep(50 * time.Millisecond) +// assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func queueBatchExporter(t *testing.T, batchOption Option) *BaseExporter { +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, batchOption, +// WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) +// require.NotNil(t, be) +// require.NoError(t, err) +// return be +// } diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 99f63c022d5..f801d1f3bf3 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -7,6 +7,7 @@ import ( "context" "errors" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" @@ -80,6 +81,7 @@ type QueueSender struct { numConsumers int traceAttribute attribute.KeyValue consumers *queue.Consumers[internal.Request] + batcher *queue.Batcher obsrep *ObsReport exporterID component.ID @@ -94,7 +96,8 @@ func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settin obsrep: obsrep, exporterID: set.ID, } - consumeFunc := func(ctx context.Context, req internal.Request) error { + + exportFunc := func(ctx context.Context, req internal.Request) error { err := qs.NextSender.Send(ctx, req) if err != nil { set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, @@ -102,13 +105,31 @@ func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settin } return err } - qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc) + + batcherCfg := exporterbatcher.NewDefaultConfig() + batcherCfg.Enabled = false + qs.batcher = queue.NewBatcher(batcherCfg, q, numConsumers, exportFunc) + + // consumeFunc := func(ctx context.Context, req internal.Request) error { + // err := qs.NextSender.Send(ctx, req) + // if err != nil { + // set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, + // zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + // } + // return err + // } + // qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc) + return qs } // Start is invoked during service startup. func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { - if err := qs.consumers.Start(ctx, host); err != nil { + // if err := qs.consumers.Start(ctx, host); err != nil { + // return err + // } + + if err := qs.batcher.Start(ctx, host); err != nil { return err } @@ -125,7 +146,10 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { func (qs *QueueSender) Shutdown(ctx context.Context) error { // Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. - return qs.consumers.Shutdown(ctx) + + // return qs.consumers.Shutdown(ctx) + + return qs.batcher.Shutdown(ctx) } // send implements the requestSender interface. It puts the request in the queue. diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 30b8f28745f..1fda980bd18 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -26,39 +26,39 @@ import ( "go.opentelemetry.io/collector/pipeline" ) -func TestQueuedRetry_StopWhileWaiting(t *testing.T) { - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - firstMockR := newErrorRequest() - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), firstMockR)) - }) - - // Enqueue another request to ensure when calling shutdown we drain the queue. - secondMockR := newMockRequest(3, nil) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), secondMockR)) - }) - - require.LessOrEqual(t, 1, be.QueueSender.(*QueueSender).queue.Size()) - - require.NoError(t, be.Shutdown(context.Background())) - - secondMockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 3) - ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) -} +// func TestQueuedRetry_StopWhileWaiting(t *testing.T) { +// qCfg := NewDefaultQueueConfig() +// qCfg.NumConsumers = 1 +// rCfg := configretry.NewDefaultBackOffConfig() +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, +// WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), +// WithRetry(rCfg), WithQueue(qCfg)) +// require.NoError(t, err) +// ocs := be.ObsrepSender.(*observabilityConsumerSender) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// firstMockR := newErrorRequest() +// ocs.run(func() { +// // This is asynchronous so it should just enqueue, no errors expected. +// require.NoError(t, be.Send(context.Background(), firstMockR)) +// }) + +// // Enqueue another request to ensure when calling shutdown we drain the queue. +// secondMockR := newMockRequest(3, nil) +// ocs.run(func() { +// // This is asynchronous so it should just enqueue, no errors expected. +// require.NoError(t, be.Send(context.Background(), secondMockR)) +// }) + +// require.LessOrEqual(t, 1, be.QueueSender.(*QueueSender).queue.Size()) + +// require.NoError(t, be.Shutdown(context.Background())) + +// secondMockR.checkNumRequests(t, 1) +// ocs.checkSendItemsCount(t, 3) +// ocs.checkDroppedItemsCount(t, 7) +// require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) +// } func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueConfig() @@ -110,100 +110,100 @@ func TestQueuedRetry_RejectOnFull(t *testing.T) { assert.Equal(t, "sending queue is full", observed.All()[0].ContextMap()["error"]) } -func TestQueuedRetryHappyPath(t *testing.T) { - tests := []struct { - name string - queueOptions []Option - }{ - { - name: "WithQueue", - queueOptions: []Option{ - WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithQueue(QueueConfig{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - }), - WithRetry(configretry.NewDefaultBackOffConfig()), - }, - }, - { - name: "WithRequestQueue/MemoryQueueFactory", - queueOptions: []Option{ - WithRequestQueue(exporterqueue.Config{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - }, exporterqueue.NewMemoryQueueFactory[internal.Request]()), - WithRetry(configretry.NewDefaultBackOffConfig()), - }, - }, - { - name: "WithRequestQueue/PersistentQueueFactory", - queueOptions: []Option{ - WithRequestQueue(exporterqueue.Config{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - }, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})), - WithRetry(configretry.NewDefaultBackOffConfig()), - }, - }, - { - name: "WithRequestQueue/PersistentQueueFactory/RequestsLimit", - queueOptions: []Option{ - WithRequestQueue(exporterqueue.Config{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - }, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})), - WithRetry(configretry.NewDefaultBackOffConfig()), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tel, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - - set := exporter.Settings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, tt.queueOptions...) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - - wantRequests := 10 - reqs := make([]*mockRequest, 0, 10) - for i := 0; i < wantRequests; i++ { - ocs.run(func() { - req := newMockRequest(2, nil) - reqs = append(reqs, req) - require.NoError(t, be.Send(context.Background(), req)) - }) - } - - // expect queue to be full - require.Error(t, be.Send(context.Background(), newMockRequest(2, nil))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - // Wait until all batches received - ocs.awaitAsyncProcessing() - - require.Len(t, reqs, wantRequests) - for _, req := range reqs { - req.checkNumRequests(t, 1) - } - - ocs.checkSendItemsCount(t, 2*wantRequests) - ocs.checkDroppedItemsCount(t, 0) - }) - } -} +// func TestQueuedRetryHappyPath(t *testing.T) { +// tests := []struct { +// name string +// queueOptions []Option +// }{ +// { +// name: "WithQueue", +// queueOptions: []Option{ +// WithMarshaler(mockRequestMarshaler), +// WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), +// WithQueue(QueueConfig{ +// Enabled: true, +// QueueSize: 10, +// NumConsumers: 1, +// }), +// WithRetry(configretry.NewDefaultBackOffConfig()), +// }, +// }, +// { +// name: "WithRequestQueue/MemoryQueueFactory", +// queueOptions: []Option{ +// WithRequestQueue(exporterqueue.Config{ +// Enabled: true, +// QueueSize: 10, +// NumConsumers: 1, +// }, exporterqueue.NewMemoryQueueFactory[internal.Request]()), +// WithRetry(configretry.NewDefaultBackOffConfig()), +// }, +// }, +// { +// name: "WithRequestQueue/PersistentQueueFactory", +// queueOptions: []Option{ +// WithRequestQueue(exporterqueue.Config{ +// Enabled: true, +// QueueSize: 10, +// NumConsumers: 1, +// }, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})), +// WithRetry(configretry.NewDefaultBackOffConfig()), +// }, +// }, +// { +// name: "WithRequestQueue/PersistentQueueFactory/RequestsLimit", +// queueOptions: []Option{ +// WithRequestQueue(exporterqueue.Config{ +// Enabled: true, +// QueueSize: 10, +// NumConsumers: 1, +// }, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})), +// WithRetry(configretry.NewDefaultBackOffConfig()), +// }, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// tel, err := componenttest.SetupTelemetry(defaultID) +// require.NoError(t, err) +// t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) + +// set := exporter.Settings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} +// be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, tt.queueOptions...) +// require.NoError(t, err) +// ocs := be.ObsrepSender.(*observabilityConsumerSender) + +// wantRequests := 10 +// reqs := make([]*mockRequest, 0, 10) +// for i := 0; i < wantRequests; i++ { +// ocs.run(func() { +// req := newMockRequest(2, nil) +// reqs = append(reqs, req) +// require.NoError(t, be.Send(context.Background(), req)) +// }) +// } + +// // expect queue to be full +// require.Error(t, be.Send(context.Background(), newMockRequest(2, nil))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// assert.NoError(t, be.Shutdown(context.Background())) +// }) + +// // Wait until all batches received +// ocs.awaitAsyncProcessing() + +// require.Len(t, reqs, wantRequests) +// for _, req := range reqs { +// req.checkNumRequests(t, 1) +// } + +// ocs.checkSendItemsCount(t, 2*wantRequests) +// ocs.checkDroppedItemsCount(t, 0) +// }) +// } +// } func TestQueuedRetry_QueueMetricsReported(t *testing.T) { dataTypes := []pipeline.Signal{pipeline.SignalLogs, pipeline.SignalTraces, pipeline.SignalMetrics} diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 13d8acd0732..57d89917a28 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -115,46 +115,46 @@ func TestQueuedRetry_OnError(t *testing.T) { ocs.checkDroppedItemsCount(t, 0) } -func TestQueuedRetry_MaxElapsedTime(t *testing.T) { - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = time.Millisecond - rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - ocs.run(func() { - // Add an item that will always fail. - require.NoError(t, be.Send(context.Background(), newErrorRequest())) - }) - - mockR := newMockRequest(2, nil) - start := time.Now() - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // We should ensure that we wait for more than 50ms but less than 150ms (50% less and 50% more than max elapsed). - waitingTime := time.Since(start) - assert.Less(t, 50*time.Millisecond, waitingTime) - assert.Less(t, waitingTime, 150*time.Millisecond) - - // In the newMockConcurrentExporter we count requests and items even for failed requests. - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) -} +// func TestQueuedRetry_MaxElapsedTime(t *testing.T) { +// qCfg := NewDefaultQueueConfig() +// qCfg.NumConsumers = 1 +// rCfg := configretry.NewDefaultBackOffConfig() +// rCfg.InitialInterval = time.Millisecond +// rCfg.MaxElapsedTime = 100 * time.Millisecond +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, +// WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), +// WithRetry(rCfg), WithQueue(qCfg)) +// require.NoError(t, err) +// ocs := be.ObsrepSender.(*observabilityConsumerSender) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// assert.NoError(t, be.Shutdown(context.Background())) +// }) + +// ocs.run(func() { +// // Add an item that will always fail. +// require.NoError(t, be.Send(context.Background(), newErrorRequest())) +// }) + +// mockR := newMockRequest(2, nil) +// start := time.Now() +// ocs.run(func() { +// // This is asynchronous so it should just enqueue, no errors expected. +// require.NoError(t, be.Send(context.Background(), mockR)) +// }) +// ocs.awaitAsyncProcessing() + +// // We should ensure that we wait for more than 50ms but less than 150ms (50% less and 50% more than max elapsed). +// waitingTime := time.Since(start) +// assert.Less(t, 50*time.Millisecond, waitingTime) +// assert.Less(t, waitingTime, 150*time.Millisecond) + +// // In the newMockConcurrentExporter we count requests and items even for failed requests. +// mockR.checkNumRequests(t, 1) +// ocs.checkSendItemsCount(t, 2) +// ocs.checkDroppedItemsCount(t, 7) +// require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) +// } type wrappedError struct { error diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go new file mode 100644 index 00000000000..6ec4faf00f4 --- /dev/null +++ b/exporter/internal/queue/batcher.go @@ -0,0 +1,166 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "math" + "sync" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +type batch struct { + ctx context.Context + req internal.Request + onExportFinished func(error) +} + +type Batcher struct { + cfg exporterbatcher.Config + mergeFunc exporterbatcher.BatchMergeFunc[internal.Request] + mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[internal.Request] + + queue Queue[internal.Request] + numWorkers int + exportFunc func(context.Context, internal.Request) error + stopWG sync.WaitGroup + + mu sync.Mutex + lastFlushed time.Time + pendingBatches []batch + timer *time.Timer +} + +func NewBatcher(cfg exporterbatcher.Config, queue Queue[internal.Request], numWorkers int, exportFunc func(context.Context, internal.Request) error) *Batcher { + return &Batcher{ + cfg: cfg, + queue: queue, + numWorkers: numWorkers, + exportFunc: exportFunc, + stopWG: sync.WaitGroup{}, + pendingBatches: make([]batch, 1), + } +} + +func (qb *Batcher) flushIfNecessary() { + qb.mu.Lock() + + if qb.pendingBatches[0].req == nil { + qb.mu.Unlock() + return + } + + if time.Since(qb.lastFlushed) < qb.cfg.FlushTimeout && qb.pendingBatches[0].req.ItemsCount() < qb.cfg.MinSizeItems { + qb.mu.Unlock() + return + } + + flushedBatch := qb.pendingBatches[0] + qb.pendingBatches = qb.pendingBatches[1:] + if len(qb.pendingBatches) == 0 { + qb.pendingBatches = append(qb.pendingBatches, batch{}) + } + + qb.lastFlushed = time.Now() + + if qb.cfg.FlushTimeout > 0 { + qb.timer.Reset(qb.cfg.FlushTimeout) + } + + qb.mu.Unlock() + + err := qb.exportFunc(flushedBatch.ctx, flushedBatch.req) + if flushedBatch.onExportFinished != nil { + flushedBatch.onExportFinished(err) + } +} + +func (qb *Batcher) push(req internal.Request, onExportFinished func(error)) error { + qb.mu.Lock() + defer qb.mu.Unlock() + + idx := len(qb.pendingBatches) - 1 + if qb.pendingBatches[idx].req == nil { + qb.pendingBatches[idx].req = req + qb.pendingBatches[idx].ctx = context.Background() + qb.pendingBatches[idx].onExportFinished = onExportFinished + } else { + reqs, err := qb.mergeSplitFunc(context.Background(), + qb.cfg.MaxSizeConfig, + qb.pendingBatches[idx].req, req) + if err != nil || len(reqs) == 0 { + return err + } + + for offset, newReq := range reqs { + if offset != 0 { + qb.pendingBatches = append(qb.pendingBatches, batch{}) + } + qb.pendingBatches[idx+offset].req = newReq + } + } + return nil +} + +// Start ensures that queue and all consumers are started. +func (qb *Batcher) Start(ctx context.Context, host component.Host) error { + if err := qb.queue.Start(ctx, host); err != nil { + return err + } + + if qb.cfg.FlushTimeout > 0 { + qb.timer = time.NewTimer(qb.cfg.FlushTimeout) + } else { + qb.timer = time.NewTimer(math.MaxInt) + qb.timer.Stop() + } + + allocReader := make(chan bool, 10) + + go func() { + allocReader <- true + }() + + var startWG sync.WaitGroup + for i := 0; i < qb.numWorkers; i++ { + qb.stopWG.Add(1) + startWG.Add(1) + go func() { + startWG.Done() + defer qb.stopWG.Done() + for { + select { + case <-qb.timer.C: + qb.flushIfNecessary() + case <-allocReader: + req, ok, onProcessingFinished := qb.queue.ClaimAndRead(func() { + allocReader <- true + }) + if !ok { + return + } + + qb.push(req, onProcessingFinished) // Handle error + qb.flushIfNecessary() + } + } + }() + } + startWG.Wait() + + return nil +} + +// Shutdown ensures that queue and all Batcher are stopped. +func (qb *Batcher) Shutdown(ctx context.Context) error { + if err := qb.queue.Shutdown(ctx); err != nil { + return err + } + qb.stopWG.Wait() + return nil +} diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 98e1b281176..df7a8584996 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -40,19 +40,35 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) } +func (q *boundedMemoryQueue[T]) ClaimAndRead(onClaim func()) (T, bool, func(error)) { + item, ok := q.sizedChannel.pop() + onClaim() + + if !ok { + return item.req, ok, nil + } + + q.sizedChannel.updateSize(-q.sizer.Sizeof(item.req)) + return item.req, ok, nil +} + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) + item, ok := q.sizedChannel.pop() if !ok { return false } + q.sizedChannel.updateSize(-q.sizer.Sizeof(item.req)) // the memory queue doesn't handle consume errors _ = consumeFunc(item.ctx, item.req) return true } +func (pq *boundedMemoryQueue[T]) CommitConsume(ctx context.Context, index uint64) { +} + // Shutdown closes the queue channel to initiate draining of the queue. func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { q.sizedChannel.shutdown() diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 71fb910c78f..a8faed76cbb 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -189,34 +189,52 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) ( return bytesToItemIndex(val) } + +func (pq *persistentQueue[T]) ClaimAndRead(onClaim func()) (T, bool, func(error)) { + _, ok := pq.sizedChannel.pop() + onClaim() + + if !ok { + var t T + return t, false, nil + } + + var ( + req T + onProcessingFinished func(error) + consumed bool + ) + req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + if consumed { + pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req)) + } + return req, true, onProcessingFinished +} + +func (pq *persistentQueue[T]) CommitConsume(ctx context.Context, index uint64) { + if err := pq.itemDispatchingFinish(ctx, index); err != nil { + pq.logger.Error("Error deleting item from queue", zap.Error(err)) + } +} + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - for { - var ( - req T - onProcessingFinished func(error) - consumed bool - ) - - // If we are stopped we still process all the other events in the channel before, but we - // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { - req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) - if !consumed { - return 0 - } - return pq.set.Sizer.Sizeof(req) - }) - if !ok { - return false - } - if consumed { - onProcessingFinished(consumeFunc(context.Background(), req)) - return true - } + // If we are stopped we still process all the other events in the channel before, but we + // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. + if _, ok := pq.sizedChannel.pop(); !ok { + return false } + + req, onProcessingFinished, ok := pq.getNextItem(context.Background()) + if !ok { + return false + } + + pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req)) + onProcessingFinished(consumeFunc(context.Background(), req)) + return true } func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { @@ -345,8 +363,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), // Increase the reference count, so the client is not closed while the request is being processed. // The client cannot be closed because we hold the lock since last we checked `stopped`. - pq.refClient++ - return request, func(consumeErr error) { + onProcessingFinished := func(consumeErr error) { // Delete the item from the persistent storage after it was processed. pq.mu.Lock() // Always unref client even if the consumer is shutdown because we always ref it for every valid request. @@ -378,7 +395,9 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), // Ensure the used size and the channel size are in sync. pq.sizedChannel.syncSize() - }, true + } + pq.refClient++ + return request, onProcessingFinished, true } // retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 35bc504579e..652a0a93be8 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -25,6 +25,10 @@ type Queue[T any] interface { // without violating capacity restrictions. If success returns no error. // It returns ErrQueueIsFull if no space is currently available. Offer(ctx context.Context, item T) error + + ClaimAndRead(onClaim func()) (T, bool, func(error)) + CommitConsume(ctx context.Context, index uint64) + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. diff --git a/exporter/internal/queue/sized_channel.go b/exporter/internal/queue/sized_channel.go index f322e58c01c..24f5528fb67 100644 --- a/exporter/internal/queue/sized_channel.go +++ b/exporter/internal/queue/sized_channel.go @@ -7,6 +7,7 @@ import "sync/atomic" // sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. // The channel will accept elements until the total size of the elements reaches the capacity. +// Not thread safe. type sizedChannel[T any] struct { used *atomic.Int64 @@ -67,27 +68,46 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error } } -// pop removes the element from the queue and returns it. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped and emptied. -// The callback is called before the element is removed from the queue. It must return the size of the element. -func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { - el, ok := <-vcq.ch - if !ok { - return el, false - } +// REMOVE +// NOTE: the main change in size_channel is that pop() is seprated into pop() and updateSize() +// This is because we want to parallize "reading" from the queue, and we only know the item size +// after reading. We also need to update the queue size in case the batch end up failing. - size := callback(el) +func (vcq *sizedChannel[T]) pop() (T, bool) { + el, ok := <-vcq.ch + return el, ok +} +func (vcq *sizedChannel[T]) updateSize(deltaSize int64) { // The used size and the channel size might be not in sync with the queue in case it's restored from the disk // because we don't flush the current queue size on the disk on every read/write. // In that case we need to make sure it doesn't go below 0. - if vcq.used.Add(-size) < 0 { + if vcq.used.Add(deltaSize) < 0 { vcq.used.Store(0) } - return el, true } +// // pop removes the element from the queue and returns it. +// // The call blocks until there is an item available or the queue is stopped. +// // The function returns true when an item is consumed or false if the queue is stopped and emptied. +// // The callback is called before the element is removed from the queue. It must return the size of the element. +// func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { +// el, ok := <-vcq.ch +// if !ok { +// return el, false +// } + +// size := callback(el) + +// // The used size and the channel size might be not in sync with the queue in case it's restored from the disk +// // because we don't flush the current queue size on the disk on every read/write. +// // In that case we need to make sure it doesn't go below 0. +// if vcq.used.Add(-size) < 0 { +// vcq.used.Store(0) +// } +// return el, true +// } + // syncSize updates the used size to 0 if the queue is empty. // The caller must ensure that this call is not called concurrently with push. // It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always