From 0b106abed908de139a3b003be6fee2b7915633ea Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Fri, 23 Aug 2024 16:30:37 -0700 Subject: [PATCH] poc --- exporter/exporterbatcher/batcher.go | 35 + exporter/exporterhelper/batch_sender_test.go | 1414 ++++++++--------- exporter/exporterhelper/queue_sender.go | 28 +- .../internal/queue/bounded_memory_queue.go | 16 +- exporter/internal/queue/consumers.go | 6 +- exporter/internal/queue/persistent_queue.go | 52 +- exporter/internal/queue/queue.go | 4 + exporter/internal/queue/queue_batcher.go | 82 + exporter/internal/queue/sized_channel.go | 44 +- 9 files changed, 937 insertions(+), 744 deletions(-) create mode 100644 exporter/exporterbatcher/batcher.go create mode 100644 exporter/internal/queue/queue_batcher.go diff --git a/exporter/exporterbatcher/batcher.go b/exporter/exporterbatcher/batcher.go new file mode 100644 index 00000000000..7b9b3e1c598 --- /dev/null +++ b/exporter/exporterbatcher/batcher.go @@ -0,0 +1,35 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher" + +// type Batcher[T any] struct { +// batches batch* + +// flushTimeout int // TODO +// flushFunc func(req T) err +// } + +// func (b *Batcher[T]) FlushIfNecessary() error { +// mu.Lock() + +// var batchToExport +// var now = time.Now() +// if now - lastFlushTime > flushTimeout || activeBatch.size() > minBatchSize { +// lastFlushTime = now +// batchToExport = activeBatch +// activeBatch = pendingBatches[0] +// pendingBatches = pendingBatches[1:] +// } +// qc.timer.Reset(batcher.FlushTimeout) +// mu.Unlock() +// flushFunc(req) +// } + + +// func (b *Batcher[T]) Push(item T) error { +// if maxSize != 0 && batches[0].size() + item.size() > maxSize: +// sdlfj + + +// } diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index 5b93dd1466a..c8bcdd26754 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -3,710 +3,710 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" -import ( - "context" - "errors" - "runtime" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/exporter/exporterbatcher" - "go.opentelemetry.io/collector/exporter/exporterqueue" -) - -func TestBatchSender_Merge(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10758") - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - - tests := []struct { - name string - batcherOption Option - }{ - { - name: "split_disabled", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - }, - { - name: "split_high_limit", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 1000 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) - - // the first two requests should be merged into one and sent by reaching the minimum items size - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 - }, 50*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink})) - - // the third and fifth requests should be sent by reaching the timeout - // the fourth request should be ignored because of the merge error. - time.Sleep(50 * time.Millisecond) - - // should be ignored because of the merge error. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink, - mergeErr: errors.New("merge error")})) - - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_BatchExportError(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - tests := []struct { - name string - batcherOption Option - expectedRequests uint64 - expectedItems uint64 - }{ - { - name: "merge_only", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - }, - { - name: "merge_without_split_triggered", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 200 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - }, - { - name: "merge_with_split_triggered", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 20 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - expectedRequests: 1, - expectedItems: 20, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - - // the first two requests should be blocked by the batchSender. - time.Sleep(50 * time.Millisecond) - assert.Equal(t, uint64(0), sink.requestsCount.Load()) - - // the third request should trigger the export and cause an error. - errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} - require.NoError(t, be.send(context.Background(), errReq)) - - // the batch should be dropped since the queue doesn't have requeuing enabled. - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == tt.expectedRequests && - sink.itemsCount.Load() == tt.expectedItems && - be.batchSender.(*batchSender).activeRequests.Load() == 0 && - be.queueSender.(*queueSender).queue.Size() == 0 - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_MergeOrSplit(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 5 - cfg.MaxSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - // should be sent right away by reaching the minimum items size. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 - }, 50*time.Millisecond, 10*time.Millisecond) - - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 17, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 - }, 50*time.Millisecond, 10*time.Millisecond) - - // request that cannot be split should be dropped. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 11, sink: sink, - mergeErr: errors.New("split error")})) - - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 13, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 - }, 50*time.Millisecond, 10*time.Millisecond) -} - -func TestBatchSender_Shutdown(t *testing.T) { - batchCfg := exporterbatcher.NewDefaultConfig() - batchCfg.MinSizeItems = 10 - be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) - - // To make the request reached the batchSender before shutdown. - time.Sleep(50 * time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) - - // shutdown should force sending the batch - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(3), sink.itemsCount.Load()) -} - -func TestBatchSender_Disabled(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.Enabled = false - cfg.MaxSizeItems = 5 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // should be sent right away without splitting because batching is disabled. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(8), sink.itemsCount.Load()) -} - -func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { - invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ Request, req2 Request) ([]Request, - error) { - // reply with invalid 0 length slice if req2 is more than 20 items - if req2.(*fakeRequest).items > 20 { - return []Request{}, nil - } - // otherwise reply with a single request. - return []Request{req2}, nil - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 20 - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // first request should be ignored due to invalid merge/split function. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 30, sink: sink})) - // second request should be sent after reaching the timeout. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 15, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) -} - -func TestBatchSender_PostShutdown(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, be.Shutdown(context.Background())) - - // Closed batch sender should act as a pass-through to not block queue draining. - sink := newFakeRequestSink() - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(8), sink.itemsCount.Load()) -} - -func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810") - } - tests := []struct { - name string - batcherCfg exporterbatcher.Config - expectedRequests uint64 - expectedItems uint64 - }{ - { - name: "merge_only", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 20 * time.Millisecond - return cfg - }(), - expectedRequests: 6, - expectedItems: 51, - }, - { - name: "merge_without_split_triggered", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 20 * time.Millisecond - cfg.MaxSizeItems = 200 - return cfg - }(), - expectedRequests: 6, - expectedItems: 51, - }, - { - name: "merge_with_split_triggered", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 10 - return cfg - }(), - expectedRequests: 8, - expectedItems: 51, - }, - } - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - qCfg := exporterqueue.NewDefaultConfig() - qCfg.NumConsumers = 2 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // the 1st and 2nd request should be flushed in the same batched request by max concurrency limit. - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4 - }, 100*time.Millisecond, 10*time.Millisecond) - - // the 3rd request should be flushed by itself due to flush interval - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 6 - }, 100*time.Millisecond, 10*time.Millisecond) - - // the 4th and 5th request should be flushed in the same batched request by max concurrency limit. - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 10 - }, 100*time.Millisecond, 10*time.Millisecond) - - // do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling. - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 5, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) - if tt.batcherCfg.MaxSizeItems == 10 { - // in case of MaxSizeItems=10, wait for the leftover request to send - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 21 - }, 50*time.Millisecond, 10*time.Millisecond) - } - - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 20, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_BatchBlocking(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 3 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 6 blocking requests - wg := sync.WaitGroup{} - for i := 0; i < 6; i++ { - wg.Add(1) - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - // should be sent in two batches since the batch size is 3 - assert.Equal(t, uint64(2), sink.requestsCount.Load()) - assert.Equal(t, uint64(6), sink.itemsCount.Load()) - - require.NoError(t, be.Shutdown(context.Background())) -} - -// Validate that the batch is cancelled once the first request in the request is cancelled -func TestBatchSender_BatchCancelled(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 2 blocking requests - wg := sync.WaitGroup{} - ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) - go func() { - assert.ErrorIs(t, be.send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - wg.Add(1) - go func() { - time.Sleep(20 * time.Millisecond) // ensure this call is the second - assert.ErrorIs(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - cancel() // canceling the first request should cancel the whole batch - wg.Wait() - - // nothing should be delivered - assert.Equal(t, uint64(0), sink.requestsCount.Load()) - assert.Equal(t, uint64(0), sink.itemsCount.Load()) - - require.NoError(t, be.Shutdown(context.Background())) -} - -func TestBatchSender_DrainActiveRequests(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 3 blocking requests with a timeout - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - - // give time for the first two requests to be batched - time.Sleep(20 * time.Millisecond) - - // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. - // It should take 120 milliseconds to complete. - require.NoError(t, be.Shutdown(context.Background())) - - assert.Equal(t, uint64(2), sink.requestsCount.Load()) - assert.Equal(t, uint64(3), sink.itemsCount.Load()) -} - -func TestBatchSender_WithBatcherOption(t *testing.T) { - tests := []struct { - name string - opts []Option - expectedErr bool - }{ - { - name: "no_funcs_set", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: true, - }, - { - name: "funcs_set_internally", - opts: []Option{withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: false, - }, - { - name: "funcs_set_twice", - opts: []Option{ - withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc)), - }, - expectedErr: true, - }, - { - name: "nil_funcs", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))}, - expectedErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, tt.opts...) - if tt.expectedErr { - assert.Nil(t, be) - assert.Error(t, err) - } else { - assert.NotNil(t, be) - assert.NoError(t, err) - } - }) - } -} - -func TestBatchSender_UnstartedShutdown(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - - err = be.Shutdown(context.Background()) - require.NoError(t, err) -} - -// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being -// merged. -func TestBatchSender_ShutdownDeadlock(t *testing.T) { - blockMerge := make(chan struct{}) - waitMerge := make(chan struct{}, 10) - - // blockedBatchMergeFunc blocks until the blockMerge channel is closed - blockedBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { - waitMerge <- struct{}{} - <-blockMerge - r1.(*fakeRequest).items += r2.(*fakeRequest).items - return r1, nil - } - - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // Send 2 concurrent requests - go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - - // Wait for the requests to enter the merge function - <-waitMerge - - // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, - // then wait for the exporter to finish. - startShutdown := make(chan struct{}) - doneShutdown := make(chan struct{}) - go func() { - close(startShutdown) - require.Nil(t, be.Shutdown(context.Background())) - close(doneShutdown) - }() - <-startShutdown - close(blockMerge) - <-doneShutdown - - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) -} - -func TestBatchSenderWithTimeout(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 10 - tCfg := NewDefaultTimeoutSettings() - tCfg.Timeout = 50 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - WithTimeout(tCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // Send 3 concurrent requests that should be merged in one batch - wg := sync.WaitGroup{} - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - wg.Done() - }() - } - wg.Wait() - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) - - // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.Error(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - assert.NoError(t, be.Shutdown(context.Background())) - - // The sink should not change - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) -} - -func TestBatchSenderTimerResetNoConflict(t *testing.T) { - delayBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { - time.Sleep(30 * time.Millisecond) - if r1 == nil { - return r2, nil - } - fr1 := r1.(*fakeRequest) - fr2 := r2.(*fakeRequest) - if fr2.mergeErr != nil { - return nil, fr2.mergeErr - } - return &fakeRequest{ - items: fr1.items + fr2.items, - sink: fr1.sink, - exportErr: fr2.exportErr, - delay: fr1.delay + fr2.delay, - }, nil - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 50 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - - // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - time.Sleep(30 * time.Millisecond) - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 200*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) -} - -func TestBatchSenderTimerFlush(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - time.Sleep(50 * time.Millisecond) - - // Send 2 concurrent requests that should be merged in one batch and sent immediately - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 30*time.Millisecond, 5*time.Millisecond) - - // Send another request that should be flushed after 100ms instead of 50ms since last flush - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // Confirm that it is not flushed in 50ms - time.Sleep(60 * time.Millisecond) - assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) - - // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) - time.Sleep(50 * time.Millisecond) - assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) - require.NoError(t, be.Shutdown(context.Background())) -} - -func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) - require.NotNil(t, be) - require.NoError(t, err) - return be -} +// import ( +// "context" +// "errors" +// "runtime" +// "sync" +// "testing" +// "time" + +// "github.com/stretchr/testify/assert" +// "github.com/stretchr/testify/require" + +// "go.opentelemetry.io/collector/component/componenttest" +// "go.opentelemetry.io/collector/exporter/exporterbatcher" +// "go.opentelemetry.io/collector/exporter/exporterqueue" +// ) + +// func TestBatchSender_Merge(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10758") +// } +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 10 +// cfg.FlushTimeout = 100 * time.Millisecond + +// tests := []struct { +// name string +// batcherOption Option +// }{ +// { +// name: "split_disabled", +// batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// }, +// { +// name: "split_high_limit", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 1000 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be := queueBatchExporter(t, tt.batcherOption) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + +// // the first two requests should be merged into one and sent by reaching the minimum items size +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink})) + +// // the third and fifth requests should be sent by reaching the timeout +// // the fourth request should be ignored because of the merge error. +// time.Sleep(50 * time.Millisecond) + +// // should be ignored because of the merge error. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink, +// mergeErr: errors.New("merge error")})) + +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_BatchExportError(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 10 +// tests := []struct { +// name string +// batcherOption Option +// expectedRequests uint64 +// expectedItems uint64 +// }{ +// { +// name: "merge_only", +// batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// }, +// { +// name: "merge_without_split_triggered", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 200 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// }, +// { +// name: "merge_with_split_triggered", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 20 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// expectedRequests: 1, +// expectedItems: 20, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be := queueBatchExporter(t, tt.batcherOption) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + +// // the first two requests should be blocked by the batchSender. +// time.Sleep(50 * time.Millisecond) +// assert.Equal(t, uint64(0), sink.requestsCount.Load()) + +// // the third request should trigger the export and cause an error. +// errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} +// require.NoError(t, be.send(context.Background(), errReq)) + +// // the batch should be dropped since the queue doesn't have requeuing enabled. +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == tt.expectedRequests && +// sink.itemsCount.Load() == tt.expectedItems && +// be.batchSender.(*batchSender).activeRequests.Load() == 0 && +// be.queueSender.(*queueSender).queue.Size() == 0 +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_MergeOrSplit(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 5 +// cfg.MaxSizeItems = 10 +// cfg.FlushTimeout = 100 * time.Millisecond +// be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// // should be sent right away by reaching the minimum items size. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// // big request should be broken down into two requests, both are sent right away. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 17, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// // request that cannot be split should be dropped. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 11, sink: sink, +// mergeErr: errors.New("split error")})) + +// // big request should be broken down into two requests, both are sent right away. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 13, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 +// }, 50*time.Millisecond, 10*time.Millisecond) +// } + +// func TestBatchSender_Shutdown(t *testing.T) { +// batchCfg := exporterbatcher.NewDefaultConfig() +// batchCfg.MinSizeItems = 10 +// be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + +// // To make the request reached the batchSender before shutdown. +// time.Sleep(50 * time.Millisecond) + +// require.NoError(t, be.Shutdown(context.Background())) + +// // shutdown should force sending the batch +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(3), sink.itemsCount.Load()) +// } + +// func TestBatchSender_Disabled(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.Enabled = false +// cfg.MaxSizeItems = 5 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // should be sent right away without splitting because batching is disabled. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(8), sink.itemsCount.Load()) +// } + +// func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { +// invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ Request, req2 Request) ([]Request, +// error) { +// // reply with invalid 0 length slice if req2 is more than 20 items +// if req2.(*fakeRequest).items > 20 { +// return []Request{}, nil +// } +// // otherwise reply with a single request. +// return []Request{req2}, nil +// } +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 50 * time.Millisecond +// cfg.MaxSizeItems = 20 +// be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // first request should be ignored due to invalid merge/split function. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 30, sink: sink})) +// // second request should be sent after reaching the timeout. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 15, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 +// }, 100*time.Millisecond, 10*time.Millisecond) +// } + +// func TestBatchSender_PostShutdown(t *testing.T) { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, +// fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// assert.NoError(t, be.Shutdown(context.Background())) + +// // Closed batch sender should act as a pass-through to not block queue draining. +// sink := newFakeRequestSink() +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(8), sink.itemsCount.Load()) +// } + +// func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810") +// } +// tests := []struct { +// name string +// batcherCfg exporterbatcher.Config +// expectedRequests uint64 +// expectedItems uint64 +// }{ +// { +// name: "merge_only", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 20 * time.Millisecond +// return cfg +// }(), +// expectedRequests: 6, +// expectedItems: 51, +// }, +// { +// name: "merge_without_split_triggered", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 20 * time.Millisecond +// cfg.MaxSizeItems = 200 +// return cfg +// }(), +// expectedRequests: 6, +// expectedItems: 51, +// }, +// { +// name: "merge_with_split_triggered", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 50 * time.Millisecond +// cfg.MaxSizeItems = 10 +// return cfg +// }(), +// expectedRequests: 8, +// expectedItems: 51, +// }, +// } +// for _, tt := range tests { +// tt := tt +// t.Run(tt.name, func(t *testing.T) { +// qCfg := exporterqueue.NewDefaultConfig() +// qCfg.NumConsumers = 2 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) +// require.NotNil(t, be) +// require.NoError(t, err) +// assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// assert.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // the 1st and 2nd request should be flushed in the same batched request by max concurrency limit. +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // the 3rd request should be flushed by itself due to flush interval +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 6 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // the 4th and 5th request should be flushed in the same batched request by max concurrency limit. +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 10 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling. +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 5, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) +// if tt.batcherCfg.MaxSizeItems == 10 { +// // in case of MaxSizeItems=10, wait for the leftover request to send +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 21 +// }, 50*time.Millisecond, 10*time.Millisecond) +// } + +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 20, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_BatchBlocking(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 3 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 6 blocking requests +// wg := sync.WaitGroup{} +// for i := 0; i < 6; i++ { +// wg.Add(1) +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) +// wg.Done() +// }() +// } +// wg.Wait() + +// // should be sent in two batches since the batch size is 3 +// assert.Equal(t, uint64(2), sink.requestsCount.Load()) +// assert.Equal(t, uint64(6), sink.itemsCount.Load()) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// // Validate that the batch is cancelled once the first request in the request is cancelled +// func TestBatchSender_BatchCancelled(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 2 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 2 blocking requests +// wg := sync.WaitGroup{} +// ctx, cancel := context.WithCancel(context.Background()) +// wg.Add(1) +// go func() { +// assert.ErrorIs(t, be.send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) +// wg.Done() +// }() +// wg.Add(1) +// go func() { +// time.Sleep(20 * time.Millisecond) // ensure this call is the second +// assert.ErrorIs(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) +// wg.Done() +// }() +// cancel() // canceling the first request should cancel the whole batch +// wg.Wait() + +// // nothing should be delivered +// assert.Equal(t, uint64(0), sink.requestsCount.Load()) +// assert.Equal(t, uint64(0), sink.itemsCount.Load()) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func TestBatchSender_DrainActiveRequests(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 2 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 3 blocking requests with a timeout +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() + +// // give time for the first two requests to be batched +// time.Sleep(20 * time.Millisecond) + +// // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. +// // It should take 120 milliseconds to complete. +// require.NoError(t, be.Shutdown(context.Background())) + +// assert.Equal(t, uint64(2), sink.requestsCount.Load()) +// assert.Equal(t, uint64(3), sink.itemsCount.Load()) +// } + +// func TestBatchSender_WithBatcherOption(t *testing.T) { +// tests := []struct { +// name string +// opts []Option +// expectedErr bool +// }{ +// { +// name: "no_funcs_set", +// opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig())}, +// expectedErr: true, +// }, +// { +// name: "funcs_set_internally", +// opts: []Option{withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, +// expectedErr: false, +// }, +// { +// name: "funcs_set_twice", +// opts: []Option{ +// withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, +// fakeBatchMergeSplitFunc)), +// }, +// expectedErr: true, +// }, +// { +// name: "nil_funcs", +// opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))}, +// expectedErr: true, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, tt.opts...) +// if tt.expectedErr { +// assert.Nil(t, be) +// assert.Error(t, err) +// } else { +// assert.NotNil(t, be) +// assert.NoError(t, err) +// } +// }) +// } +// } + +// func TestBatchSender_UnstartedShutdown(t *testing.T) { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) + +// err = be.Shutdown(context.Background()) +// require.NoError(t, err) +// } + +// // TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being +// // merged. +// func TestBatchSender_ShutdownDeadlock(t *testing.T) { +// blockMerge := make(chan struct{}) +// waitMerge := make(chan struct{}, 10) + +// // blockedBatchMergeFunc blocks until the blockMerge channel is closed +// blockedBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { +// waitMerge <- struct{}{} +// <-blockMerge +// r1.(*fakeRequest).items += r2.(*fakeRequest).items +// return r1, nil +// } + +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests +// go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() +// go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() + +// // Wait for the requests to enter the merge function +// <-waitMerge + +// // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, +// // then wait for the exporter to finish. +// startShutdown := make(chan struct{}) +// doneShutdown := make(chan struct{}) +// go func() { +// close(startShutdown) +// require.Nil(t, be.Shutdown(context.Background())) +// close(doneShutdown) +// }() +// <-startShutdown +// close(blockMerge) +// <-doneShutdown + +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 8, sink.itemsCount.Load()) +// } + +// func TestBatchSenderWithTimeout(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 10 +// tCfg := NewDefaultTimeoutSettings() +// tCfg.Timeout = 50 * time.Millisecond +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// WithTimeout(tCfg)) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // Send 3 concurrent requests that should be merged in one batch +// wg := sync.WaitGroup{} +// for i := 0; i < 3; i++ { +// wg.Add(1) +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// wg.Done() +// }() +// } +// wg.Wait() +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) + +// // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender +// for i := 0; i < 3; i++ { +// wg.Add(1) +// go func() { +// assert.Error(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) +// wg.Done() +// }() +// } +// wg.Wait() + +// assert.NoError(t, be.Shutdown(context.Background())) + +// // The sink should not change +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) +// } + +// func TestBatchSenderTimerResetNoConflict(t *testing.T) { +// delayBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { +// time.Sleep(30 * time.Millisecond) +// if r1 == nil { +// return r2, nil +// } +// fr1 := r1.(*fakeRequest) +// fr2 := r2.(*fakeRequest) +// if fr2.mergeErr != nil { +// return nil, fr2.mergeErr +// } +// return &fakeRequest{ +// items: fr1.items + fr2.items, +// sink: fr1.sink, +// exportErr: fr2.exportErr, +// delay: fr1.delay + fr2.delay, +// }, nil +// } +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 8 +// bCfg.FlushTimeout = 50 * time.Millisecond +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// time.Sleep(30 * time.Millisecond) +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() + +// // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict +// assert.EventuallyWithT(t, func(c *assert.CollectT) { +// assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(c, 8, sink.itemsCount.Load()) +// }, 200*time.Millisecond, 10*time.Millisecond) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func TestBatchSenderTimerFlush(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") +// } +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 8 +// bCfg.FlushTimeout = 100 * time.Millisecond +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// sink := newFakeRequestSink() +// time.Sleep(50 * time.Millisecond) + +// // Send 2 concurrent requests that should be merged in one batch and sent immediately +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// assert.EventuallyWithT(t, func(c *assert.CollectT) { +// assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(c, 8, sink.itemsCount.Load()) +// }, 30*time.Millisecond, 5*time.Millisecond) + +// // Send another request that should be flushed after 100ms instead of 50ms since last flush +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() + +// // Confirm that it is not flushed in 50ms +// time.Sleep(60 * time.Millisecond) +// assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(t, 8, sink.itemsCount.Load()) + +// // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) +// time.Sleep(50 * time.Millisecond) +// assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, +// WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) +// require.NotNil(t, be) +// require.NoError(t, err) +// return be +// } diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 060edab813a..de9f8428d86 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -72,6 +72,7 @@ type queueSender struct { numConsumers int traceAttribute attribute.KeyValue consumers *queue.Consumers[Request] + batcher *queue.QueueBatcher[Request] obsrep *obsReport exporterID component.ID @@ -86,7 +87,19 @@ func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numCo obsrep: obsrep, exporterID: set.ID, } - consumeFunc := func(ctx context.Context, req Request) error { + + // consumeFunc := func(ctx context.Context, req Request) error { + // err := qs.nextSender.send(ctx, req) + // if err != nil { + // set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, + // zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + // } + // return err + // } + + // qs.consumers = queue.NewQueueConsumers[Request](q, numConsumers, consumeFunc) + + exportFunc := func(ctx context.Context, req Request) error { err := qs.nextSender.send(ctx, req) if err != nil { set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, @@ -94,13 +107,17 @@ func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numCo } return err } - qs.consumers = queue.NewQueueConsumers[Request](q, numConsumers, consumeFunc) + qs.batcher = queue.NewQueueBatcher[Request](q, numConsumers, exportFunc) return qs } // Start is invoked during service startup. func (qs *queueSender) Start(ctx context.Context, host component.Host) error { - if err := qs.consumers.Start(ctx, host); err != nil { + // if err := qs.consumers.Start(ctx, host); err != nil { + // return err + // } + + if err := qs.batcher.Start(ctx, host); err != nil { return err } @@ -117,7 +134,10 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error { func (qs *queueSender) Shutdown(ctx context.Context) error { // Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. - return qs.consumers.Shutdown(ctx) + + // return qs.consumers.Shutdown(ctx) + + return qs.batcher.Shutdown(ctx) } // send implements the requestSender interface. It puts the request in the queue. diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 98e1b281176..d0fce6abf22 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -40,19 +40,33 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) } +func (q *boundedMemoryQueue[T]) ClaimAndRead(onClaim func()) (T, bool, func(error)) { + item, ok := q.sizedChannel.pop() + if !ok { + return item.req, ok, nil + } + q.sizedChannel.updateSize(-q.sizer.Sizeof(item.req)) + onClaim() + return item.req, ok, nil +} + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) + item, ok := q.sizedChannel.pop() if !ok { return false } + q.sizedChannel.updateSize(-q.sizer.Sizeof(item.req)) // the memory queue doesn't handle consume errors _ = consumeFunc(item.ctx, item.req) return true } +func (pq *boundedMemoryQueue[T]) CommitConsume(ctx context.Context, index uint64) { +} + // Shutdown closes the queue channel to initiate draining of the queue. func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { q.sizedChannel.shutdown() diff --git a/exporter/internal/queue/consumers.go b/exporter/internal/queue/consumers.go index 7c57fea9620..b98a3745609 100644 --- a/exporter/internal/queue/consumers.go +++ b/exporter/internal/queue/consumers.go @@ -11,10 +11,10 @@ import ( ) type Consumers[T any] struct { - queue Queue[T] + queue Queue[T] numConsumers int - consumeFunc func(context.Context, T) error - stopWG sync.WaitGroup + consumeFunc func(context.Context, T) error + stopWG sync.WaitGroup } func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *Consumers[T] { diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 7dd646c6ef3..a09ce17eb8c 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -188,33 +188,51 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) ( return bytesToItemIndex(val) } + +func (pq *persistentQueue[T]) ClaimAndRead(onClaim func()) (T, bool, func(error)) { + if _, ok := pq.sizedChannel.pop(); !ok { + var t T + return t, false, nil + } + onClaim() + + var ( + req T + onProcessingFinished func(error) + consumed bool + ) + req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + if consumed { + pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req)) + } + return req, true, onProcessingFinished +} + +func (pq *persistentQueue[T]) CommitConsume(ctx context.Context, index uint64) { + if err := pq.itemDispatchingFinish(ctx, index); err != nil { + pq.logger.Error("Error deleting item from queue", zap.Error(err)) + } +} + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { for { - var ( - req T - onProcessingFinished func(error) - consumed bool - ) - // If we are stopped we still process all the other events in the channel before, but we // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { - req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) - if !consumed { - return 0 - } - return pq.set.Sizer.Sizeof(req) - }) - if !ok { + if _, ok := pq.sizedChannel.pop(); !ok { return false } - if consumed { - onProcessingFinished(consumeFunc(context.Background(), req)) - return true + + req, onProcessingFinished, ok := pq.getNextItem(context.Background()) + if !ok { + return false } + + pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req)) + onProcessingFinished(consumeFunc(context.Background(), req)) + return true } } diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 35bc504579e..652a0a93be8 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -25,6 +25,10 @@ type Queue[T any] interface { // without violating capacity restrictions. If success returns no error. // It returns ErrQueueIsFull if no space is currently available. Offer(ctx context.Context, item T) error + + ClaimAndRead(onClaim func()) (T, bool, func(error)) + CommitConsume(ctx context.Context, index uint64) + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. diff --git a/exporter/internal/queue/queue_batcher.go b/exporter/internal/queue/queue_batcher.go new file mode 100644 index 00000000000..ae37b6eb62b --- /dev/null +++ b/exporter/internal/queue/queue_batcher.go @@ -0,0 +1,82 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "sync" + // "time" + + "go.opentelemetry.io/collector/component" +) + +type QueueBatcher[T any] struct { + queue Queue[T] + numWorkers int + exportFunc func(context.Context, T) error + stopWG sync.WaitGroup +} + +func NewQueueBatcher[T any](q Queue[T], numWorkers int, exportFunc func(context.Context, T) error) *QueueBatcher[T] { + return &QueueBatcher[T]{ + queue: q, + numWorkers: numWorkers, + exportFunc: exportFunc, + stopWG: sync.WaitGroup{}, + } +} + +// Start ensures that queue and all consumers are started. +func (qb *QueueBatcher[T]) Start(ctx context.Context, host component.Host) error { + if err := qb.queue.Start(ctx, host); err != nil { + return err + } + + // timer := time.NewTimer(1 * time.Second) // TODO + // allocConsumer := make(chan bool, 2) + + // go func() { + // allocConsumer <- true + // }() + + var startWG sync.WaitGroup + for i := 0; i < qb.numWorkers; i++ { + qb.stopWG.Add(1) + startWG.Add(1) + go func() { + startWG.Done() + defer qb.stopWG.Done() + for { + // select { + // case <-timer.C: + // qb.batcher.FlushIfNecessary() // TODO + // case <- allocConsumer: + req, ok, onProcessingFinished := qb.queue.ClaimAndRead(func(){ }) + if !ok { + return + } + err := qb.exportFunc(ctx, req) + if onProcessingFinished != nil { + onProcessingFinished(err) + } + // Put item into the batch + // qb.batcher.Push(item) // TODO + // qb.batcher.FlushIfNecessary() // TODO + // } + } + }() + } + startWG.Wait() + + return nil +} + +// Shutdown ensures that queue and all QueueBatcher are stopped. +func (qb *QueueBatcher[T]) Shutdown(ctx context.Context) error { + if err := qb.queue.Shutdown(ctx); err != nil { + return err + } + qb.stopWG.Wait() + return nil +} diff --git a/exporter/internal/queue/sized_channel.go b/exporter/internal/queue/sized_channel.go index 1702a38ac2f..9a0aae783ac 100644 --- a/exporter/internal/queue/sized_channel.go +++ b/exporter/internal/queue/sized_channel.go @@ -7,6 +7,7 @@ import "sync/atomic" // sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. // The channel will accept elements until the total size of the elements reaches the capacity. +// Not thread safe. type sizedChannel[T any] struct { used *atomic.Int64 @@ -59,27 +60,46 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error return nil } -// pop removes the element from the queue and returns it. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped and emptied. -// The callback is called before the element is removed from the queue. It must return the size of the element. -func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { - el, ok := <-vcq.ch - if !ok { - return el, false - } +// REMOVE +// NOTE: the main change in size_channel is that pop() is seprated into pop() and updateSize() +// This is because we want to parallize "reading" from the queue, and we only know the item size +// after reading. We also need to update the queue size in case the batch end up failing. - size := callback(el) +func (vcq *sizedChannel[T]) pop() (T, bool) { + el, ok := <-vcq.ch + return el, ok +} +func (vcq *sizedChannel[T]) updateSize(deltaSize int64) { // The used size and the channel size might be not in sync with the queue in case it's restored from the disk // because we don't flush the current queue size on the disk on every read/write. // In that case we need to make sure it doesn't go below 0. - if vcq.used.Add(-size) < 0 { + if vcq.used.Add(deltaSize) < 0 { vcq.used.Store(0) } - return el, true } +// // pop removes the element from the queue and returns it. +// // The call blocks until there is an item available or the queue is stopped. +// // The function returns true when an item is consumed or false if the queue is stopped and emptied. +// // The callback is called before the element is removed from the queue. It must return the size of the element. +// func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { +// el, ok := <-vcq.ch +// if !ok { +// return el, false +// } + +// size := callback(el) + +// // The used size and the channel size might be not in sync with the queue in case it's restored from the disk +// // because we don't flush the current queue size on the disk on every read/write. +// // In that case we need to make sure it doesn't go below 0. +// if vcq.used.Add(-size) < 0 { +// vcq.used.Store(0) +// } +// return el, true +// } + // syncSize updates the used size to 0 if the queue is empty. // The caller must ensure that this call is not called concurrently with push. // It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always