From 8a38fa65b1f6f73230f3134047872405ec2347e6 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 12 Sep 2024 17:23:46 -0700 Subject: [PATCH] POC --- exporter/exporterbatcher/batcher.go | 35 + exporter/exporterhelper/batch_sender.go | 82 +- exporter/exporterhelper/batch_sender_test.go | 1414 ++++++++--------- exporter/exporterhelper/queue_sender.go | 46 +- exporter/internal/queue/batcher.go | 152 ++ .../internal/queue/bounded_memory_queue.go | 18 +- exporter/internal/queue/persistent_queue.go | 71 +- exporter/internal/queue/queue.go | 4 + exporter/internal/queue/sized_channel.go | 44 +- 9 files changed, 1074 insertions(+), 792 deletions(-) create mode 100644 exporter/exporterbatcher/batcher.go create mode 100644 exporter/internal/queue/batcher.go diff --git a/exporter/exporterbatcher/batcher.go b/exporter/exporterbatcher/batcher.go new file mode 100644 index 00000000000..7b9b3e1c598 --- /dev/null +++ b/exporter/exporterbatcher/batcher.go @@ -0,0 +1,35 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher" + +// type Batcher[T any] struct { +// batches batch* + +// flushTimeout int // TODO +// flushFunc func(req T) err +// } + +// func (b *Batcher[T]) FlushIfNecessary() error { +// mu.Lock() + +// var batchToExport +// var now = time.Now() +// if now - lastFlushTime > flushTimeout || activeBatch.size() > minBatchSize { +// lastFlushTime = now +// batchToExport = activeBatch +// activeBatch = pendingBatches[0] +// pendingBatches = pendingBatches[1:] +// } +// qc.timer.Reset(batcher.FlushTimeout) +// mu.Unlock() +// flushFunc(req) +// } + + +// func (b *Batcher[T]) Push(item T) error { +// if maxSize != 0 && batches[0].size() + item.size() > maxSize: +// sdlfj + + +// } diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go index 4d9635195e2..6eaf72b0ec5 100644 --- a/exporter/exporterhelper/batch_sender.go +++ b/exporter/exporterhelper/batch_sender.go @@ -61,42 +61,42 @@ func newBatchSender(cfg exporterbatcher.Config, set exporter.Settings, } func (bs *batchSender) Start(_ context.Context, _ component.Host) error { - bs.shutdownCh = make(chan struct{}) - timer := time.NewTimer(bs.cfg.FlushTimeout) - go func() { - for { - select { - case <-bs.shutdownCh: - // There is a minimal chance that another request is added after the shutdown signal. - // This loop will handle that case. - for bs.activeRequests.Load() > 0 { - bs.mu.Lock() - if bs.activeBatch.request != nil { - bs.exportActiveBatch() - } - bs.mu.Unlock() - } - if !timer.Stop() { - <-timer.C - } - close(bs.shutdownCompleteCh) - return - case <-timer.C: - bs.mu.Lock() - nextFlush := bs.cfg.FlushTimeout - if bs.activeBatch.request != nil { - sinceLastFlush := time.Since(bs.lastFlushed) - if sinceLastFlush >= bs.cfg.FlushTimeout { - bs.exportActiveBatch() - } else { - nextFlush = bs.cfg.FlushTimeout - sinceLastFlush - } - } - bs.mu.Unlock() - timer.Reset(nextFlush) - } - } - }() + // bs.shutdownCh = make(chan struct{}) + // timer := time.NewTimer(bs.cfg.FlushTimeout) + // go func() { + // for { + // select { + // case <-bs.shutdownCh: + // // There is a minimal chance that another request is added after the shutdown signal. + // // This loop will handle that case. + // for bs.activeRequests.Load() > 0 { + // bs.mu.Lock() + // if bs.activeBatch.request != nil { + // bs.exportActiveBatch() + // } + // bs.mu.Unlock() + // } + // if !timer.Stop() { + // <-timer.C + // } + // close(bs.shutdownCompleteCh) + // return + // case <-timer.C: + // bs.mu.Lock() + // nextFlush := bs.cfg.FlushTimeout + // if bs.activeBatch.request != nil { + // sinceLastFlush := time.Since(bs.lastFlushed) + // if sinceLastFlush >= bs.cfg.FlushTimeout { + // bs.exportActiveBatch() + // } else { + // nextFlush = bs.cfg.FlushTimeout - sinceLastFlush + // } + // } + // bs.mu.Unlock() + // timer.Reset(nextFlush) + // } + // } + // }() return nil } @@ -231,10 +231,10 @@ func (bs *batchSender) updateActiveBatch(ctx context.Context, req Request) { } func (bs *batchSender) Shutdown(context.Context) error { - bs.stopped.Store(true) - if bs.shutdownCh != nil { - close(bs.shutdownCh) - <-bs.shutdownCompleteCh - } + // bs.stopped.Store(true) + // if bs.shutdownCh != nil { + // close(bs.shutdownCh) + // <-bs.shutdownCompleteCh + // } return nil } diff --git a/exporter/exporterhelper/batch_sender_test.go b/exporter/exporterhelper/batch_sender_test.go index 5f5d49bec9a..454a59c8bf4 100644 --- a/exporter/exporterhelper/batch_sender_test.go +++ b/exporter/exporterhelper/batch_sender_test.go @@ -3,710 +3,710 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" -import ( - "context" - "errors" - "runtime" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/exporter/exporterbatcher" - "go.opentelemetry.io/collector/exporter/exporterqueue" -) - -func TestBatchSender_Merge(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10758") - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - - tests := []struct { - name string - batcherOption Option - }{ - { - name: "split_disabled", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - }, - { - name: "split_high_limit", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 1000 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) - - // the first two requests should be merged into one and sent by reaching the minimum items size - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 - }, 50*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink})) - - // the third and fifth requests should be sent by reaching the timeout - // the fourth request should be ignored because of the merge error. - time.Sleep(50 * time.Millisecond) - - // should be ignored because of the merge error. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink, - mergeErr: errors.New("merge error")})) - - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_BatchExportError(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - tests := []struct { - name string - batcherOption Option - expectedRequests uint64 - expectedItems uint64 - }{ - { - name: "merge_only", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - }, - { - name: "merge_without_split_triggered", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 200 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - }, - { - name: "merge_with_split_triggered", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 20 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) - }(), - expectedRequests: 1, - expectedItems: 20, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - - // the first two requests should be blocked by the batchSender. - time.Sleep(50 * time.Millisecond) - assert.Equal(t, uint64(0), sink.requestsCount.Load()) - - // the third request should trigger the export and cause an error. - errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} - require.NoError(t, be.send(context.Background(), errReq)) - - // the batch should be dropped since the queue doesn't have requeuing enabled. - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == tt.expectedRequests && - sink.itemsCount.Load() == tt.expectedItems && - be.batchSender.(*batchSender).activeRequests.Load() == 0 && - be.queueSender.(*queueSender).queue.Size() == 0 - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_MergeOrSplit(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 5 - cfg.MaxSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - - // should be sent right away by reaching the minimum items size. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 - }, 50*time.Millisecond, 10*time.Millisecond) - - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 17, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 - }, 50*time.Millisecond, 10*time.Millisecond) - - // request that cannot be split should be dropped. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 11, sink: sink, - mergeErr: errors.New("split error")})) - - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 13, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 - }, 50*time.Millisecond, 10*time.Millisecond) -} - -func TestBatchSender_Shutdown(t *testing.T) { - batchCfg := exporterbatcher.NewDefaultConfig() - batchCfg.MinSizeItems = 10 - be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) - - // To make the request reached the batchSender before shutdown. - time.Sleep(50 * time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) - - // shutdown should force sending the batch - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(3), sink.itemsCount.Load()) -} - -func TestBatchSender_Disabled(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.Enabled = false - cfg.MaxSizeItems = 5 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // should be sent right away without splitting because batching is disabled. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(8), sink.itemsCount.Load()) -} - -func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { - invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ Request, req2 Request) ([]Request, - error) { - // reply with invalid 0 length slice if req2 is more than 20 items - if req2.(*fakeRequest).items > 20 { - return []Request{}, nil - } - // otherwise reply with a single request. - return []Request{req2}, nil - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 20 - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // first request should be ignored due to invalid merge/split function. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 30, sink: sink})) - // second request should be sent after reaching the timeout. - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 15, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) -} - -func TestBatchSender_PostShutdown(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, be.Shutdown(context.Background())) - - // Closed batch sender should act as a pass-through to not block queue draining. - sink := newFakeRequestSink() - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) - assert.Equal(t, uint64(1), sink.requestsCount.Load()) - assert.Equal(t, uint64(8), sink.itemsCount.Load()) -} - -func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810") - } - tests := []struct { - name string - batcherCfg exporterbatcher.Config - expectedRequests uint64 - expectedItems uint64 - }{ - { - name: "merge_only", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 20 * time.Millisecond - return cfg - }(), - expectedRequests: 6, - expectedItems: 51, - }, - { - name: "merge_without_split_triggered", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 20 * time.Millisecond - cfg.MaxSizeItems = 200 - return cfg - }(), - expectedRequests: 6, - expectedItems: 51, - }, - { - name: "merge_with_split_triggered", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 10 - return cfg - }(), - expectedRequests: 8, - expectedItems: 51, - }, - } - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - qCfg := exporterqueue.NewDefaultConfig() - qCfg.NumConsumers = 2 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // the 1st and 2nd request should be flushed in the same batched request by max concurrency limit. - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4 - }, 100*time.Millisecond, 10*time.Millisecond) - - // the 3rd request should be flushed by itself due to flush interval - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 6 - }, 100*time.Millisecond, 10*time.Millisecond) - - // the 4th and 5th request should be flushed in the same batched request by max concurrency limit. - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 10 - }, 100*time.Millisecond, 10*time.Millisecond) - - // do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling. - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 5, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) - if tt.batcherCfg.MaxSizeItems == 10 { - // in case of MaxSizeItems=10, wait for the leftover request to send - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 21 - }, 50*time.Millisecond, 10*time.Millisecond) - } - - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 20, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_BatchBlocking(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 3 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 6 blocking requests - wg := sync.WaitGroup{} - for i := 0; i < 6; i++ { - wg.Add(1) - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - // should be sent in two batches since the batch size is 3 - assert.Equal(t, uint64(2), sink.requestsCount.Load()) - assert.Equal(t, uint64(6), sink.itemsCount.Load()) - - require.NoError(t, be.Shutdown(context.Background())) -} - -// Validate that the batch is cancelled once the first request in the request is cancelled -func TestBatchSender_BatchCancelled(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 2 blocking requests - wg := sync.WaitGroup{} - ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) - go func() { - assert.ErrorIs(t, be.send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - wg.Add(1) - go func() { - time.Sleep(20 * time.Millisecond) // ensure this call is the second - assert.ErrorIs(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - cancel() // canceling the first request should cancel the whole batch - wg.Wait() - - // nothing should be delivered - assert.Equal(t, uint64(0), sink.requestsCount.Load()) - assert.Equal(t, uint64(0), sink.itemsCount.Load()) - - require.NoError(t, be.Shutdown(context.Background())) -} - -func TestBatchSender_DrainActiveRequests(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // send 3 blocking requests with a timeout - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) - }() - - // give time for the first two requests to be batched - time.Sleep(20 * time.Millisecond) - - // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. - // It should take 120 milliseconds to complete. - require.NoError(t, be.Shutdown(context.Background())) - - assert.Equal(t, uint64(2), sink.requestsCount.Load()) - assert.Equal(t, uint64(3), sink.itemsCount.Load()) -} - -func TestBatchSender_WithBatcherOption(t *testing.T) { - tests := []struct { - name string - opts []Option - expectedErr bool - }{ - { - name: "no_funcs_set", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: true, - }, - { - name: "funcs_set_internally", - opts: []Option{withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: false, - }, - { - name: "funcs_set_twice", - opts: []Option{ - withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc)), - }, - expectedErr: true, - }, - { - name: "nil_funcs", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))}, - expectedErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, tt.opts...) - if tt.expectedErr { - assert.Nil(t, be) - assert.Error(t, err) - } else { - assert.NotNil(t, be) - assert.NoError(t, err) - } - }) - } -} - -func TestBatchSender_UnstartedShutdown(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - - err = be.Shutdown(context.Background()) - require.NoError(t, err) -} - -// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being -// merged. -func TestBatchSender_ShutdownDeadlock(t *testing.T) { - blockMerge := make(chan struct{}) - waitMerge := make(chan struct{}, 10) - - // blockedBatchMergeFunc blocks until the blockMerge channel is closed - blockedBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { - waitMerge <- struct{}{} - <-blockMerge - r1.(*fakeRequest).items += r2.(*fakeRequest).items - return r1, nil - } - - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // Send 2 concurrent requests - go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - - // Wait for the requests to enter the merge function - <-waitMerge - - // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, - // then wait for the exporter to finish. - startShutdown := make(chan struct{}) - doneShutdown := make(chan struct{}) - go func() { - close(startShutdown) - require.NoError(t, be.Shutdown(context.Background())) - close(doneShutdown) - }() - <-startShutdown - close(blockMerge) - <-doneShutdown - - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) -} - -func TestBatchSenderWithTimeout(t *testing.T) { - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 10 - tCfg := NewDefaultTimeoutConfig() - tCfg.Timeout = 50 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), - WithTimeout(tCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // Send 3 concurrent requests that should be merged in one batch - wg := sync.WaitGroup{} - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - wg.Done() - }() - } - wg.Wait() - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) - - // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.Error(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - assert.NoError(t, be.Shutdown(context.Background())) - - // The sink should not change - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) -} - -func TestBatchSenderTimerResetNoConflict(t *testing.T) { - delayBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { - time.Sleep(30 * time.Millisecond) - if r1 == nil { - return r2, nil - } - fr1 := r1.(*fakeRequest) - fr2 := r2.(*fakeRequest) - if fr2.mergeErr != nil { - return nil, fr2.mergeErr - } - return &fakeRequest{ - items: fr1.items + fr2.items, - sink: fr1.sink, - exportErr: fr2.exportErr, - delay: fr1.delay + fr2.delay, - }, nil - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 50 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - - // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - time.Sleep(30 * time.Millisecond) - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 200*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) -} - -func TestBatchSenderTimerFlush(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - time.Sleep(50 * time.Millisecond) - - // Send 2 concurrent requests that should be merged in one batch and sent immediately - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 30*time.Millisecond, 5*time.Millisecond) - - // Send another request that should be flushed after 100ms instead of 50ms since last flush - go func() { - require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // Confirm that it is not flushed in 50ms - time.Sleep(60 * time.Millisecond) - assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) - - // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) - time.Sleep(50 * time.Millisecond) - assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load()) - assert.EqualValues(t, 12, sink.itemsCount.Load()) - require.NoError(t, be.Shutdown(context.Background())) -} - -func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { - be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) - require.NotNil(t, be) - require.NoError(t, err) - return be -} +// import ( +// "context" +// "errors" +// "runtime" +// "sync" +// "testing" +// "time" + +// "github.com/stretchr/testify/assert" +// "github.com/stretchr/testify/require" + +// "go.opentelemetry.io/collector/component/componenttest" +// "go.opentelemetry.io/collector/exporter/exporterbatcher" +// "go.opentelemetry.io/collector/exporter/exporterqueue" +// ) + +// func TestBatchSender_Merge(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10758") +// } +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 10 +// cfg.FlushTimeout = 100 * time.Millisecond + +// tests := []struct { +// name string +// batcherOption Option +// }{ +// { +// name: "split_disabled", +// batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// }, +// { +// name: "split_high_limit", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 1000 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be := queueBatchExporter(t, tt.batcherOption) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + +// // the first two requests should be merged into one and sent by reaching the minimum items size +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 11 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink})) + +// // the third and fifth requests should be sent by reaching the timeout +// // the fourth request should be ignored because of the merge error. +// time.Sleep(50 * time.Millisecond) + +// // should be ignored because of the merge error. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink, +// mergeErr: errors.New("merge error")})) + +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 15 +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_BatchExportError(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 10 +// tests := []struct { +// name string +// batcherOption Option +// expectedRequests uint64 +// expectedItems uint64 +// }{ +// { +// name: "merge_only", +// batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// }, +// { +// name: "merge_without_split_triggered", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 200 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// }, +// { +// name: "merge_with_split_triggered", +// batcherOption: func() Option { +// c := cfg +// c.MaxSizeItems = 20 +// return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) +// }(), +// expectedRequests: 1, +// expectedItems: 20, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be := queueBatchExporter(t, tt.batcherOption) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) + +// // the first two requests should be blocked by the batchSender. +// time.Sleep(50 * time.Millisecond) +// assert.Equal(t, uint64(0), sink.requestsCount.Load()) + +// // the third request should trigger the export and cause an error. +// errReq := &fakeRequest{items: 20, exportErr: errors.New("transient error"), sink: sink} +// require.NoError(t, be.send(context.Background(), errReq)) + +// // the batch should be dropped since the queue doesn't have requeuing enabled. +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == tt.expectedRequests && +// sink.itemsCount.Load() == tt.expectedItems && +// be.batchSender.(*batchSender).activeRequests.Load() == 0 && +// be.queueSender.(*queueSender).queue.Size() == 0 +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_MergeOrSplit(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.MinSizeItems = 5 +// cfg.MaxSizeItems = 10 +// cfg.FlushTimeout = 100 * time.Millisecond +// be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() + +// // should be sent right away by reaching the minimum items size. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// // big request should be broken down into two requests, both are sent right away. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 17, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 +// }, 50*time.Millisecond, 10*time.Millisecond) + +// // request that cannot be split should be dropped. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 11, sink: sink, +// mergeErr: errors.New("split error")})) + +// // big request should be broken down into two requests, both are sent right away. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 13, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 38 +// }, 50*time.Millisecond, 10*time.Millisecond) +// } + +// func TestBatchSender_Shutdown(t *testing.T) { +// batchCfg := exporterbatcher.NewDefaultConfig() +// batchCfg.MinSizeItems = 10 +// be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 3, sink: sink})) + +// // To make the request reached the batchSender before shutdown. +// time.Sleep(50 * time.Millisecond) + +// require.NoError(t, be.Shutdown(context.Background())) + +// // shutdown should force sending the batch +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(3), sink.itemsCount.Load()) +// } + +// func TestBatchSender_Disabled(t *testing.T) { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.Enabled = false +// cfg.MaxSizeItems = 5 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // should be sent right away without splitting because batching is disabled. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(8), sink.itemsCount.Load()) +// } + +// func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { +// invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ Request, req2 Request) ([]Request, +// error) { +// // reply with invalid 0 length slice if req2 is more than 20 items +// if req2.(*fakeRequest).items > 20 { +// return []Request{}, nil +// } +// // otherwise reply with a single request. +// return []Request{req2}, nil +// } +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 50 * time.Millisecond +// cfg.MaxSizeItems = 20 +// be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // first request should be ignored due to invalid merge/split function. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 30, sink: sink})) +// // second request should be sent after reaching the timeout. +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 15, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 +// }, 100*time.Millisecond, 10*time.Millisecond) +// } + +// func TestBatchSender_PostShutdown(t *testing.T) { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, +// fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// assert.NoError(t, be.Shutdown(context.Background())) + +// // Closed batch sender should act as a pass-through to not block queue draining. +// sink := newFakeRequestSink() +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 8, sink: sink})) +// assert.Equal(t, uint64(1), sink.requestsCount.Load()) +// assert.Equal(t, uint64(8), sink.itemsCount.Load()) +// } + +// func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810") +// } +// tests := []struct { +// name string +// batcherCfg exporterbatcher.Config +// expectedRequests uint64 +// expectedItems uint64 +// }{ +// { +// name: "merge_only", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 20 * time.Millisecond +// return cfg +// }(), +// expectedRequests: 6, +// expectedItems: 51, +// }, +// { +// name: "merge_without_split_triggered", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 20 * time.Millisecond +// cfg.MaxSizeItems = 200 +// return cfg +// }(), +// expectedRequests: 6, +// expectedItems: 51, +// }, +// { +// name: "merge_with_split_triggered", +// batcherCfg: func() exporterbatcher.Config { +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 50 * time.Millisecond +// cfg.MaxSizeItems = 10 +// return cfg +// }(), +// expectedRequests: 8, +// expectedItems: 51, +// }, +// } +// for _, tt := range tests { +// tt := tt +// t.Run(tt.name, func(t *testing.T) { +// qCfg := exporterqueue.NewDefaultConfig() +// qCfg.NumConsumers = 2 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[Request]())) +// require.NotNil(t, be) +// require.NoError(t, err) +// assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// assert.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // the 1st and 2nd request should be flushed in the same batched request by max concurrency limit. +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) + +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 4 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // the 3rd request should be flushed by itself due to flush interval +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 6 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // the 4th and 5th request should be flushed in the same batched request by max concurrency limit. +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 2, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 10 +// }, 100*time.Millisecond, 10*time.Millisecond) + +// // do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling. +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 5, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) +// if tt.batcherCfg.MaxSizeItems == 10 { +// // in case of MaxSizeItems=10, wait for the leftover request to send +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 21 +// }, 50*time.Millisecond, 10*time.Millisecond) +// } + +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 6, sink: sink})) +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 20, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems +// }, 100*time.Millisecond, 10*time.Millisecond) +// }) +// } +// } + +// func TestBatchSender_BatchBlocking(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 3 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 6 blocking requests +// wg := sync.WaitGroup{} +// for i := 0; i < 6; i++ { +// wg.Add(1) +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 10 * time.Millisecond})) +// wg.Done() +// }() +// } +// wg.Wait() + +// // should be sent in two batches since the batch size is 3 +// assert.Equal(t, uint64(2), sink.requestsCount.Load()) +// assert.Equal(t, uint64(6), sink.itemsCount.Load()) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// // Validate that the batch is cancelled once the first request in the request is cancelled +// func TestBatchSender_BatchCancelled(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 2 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 2 blocking requests +// wg := sync.WaitGroup{} +// ctx, cancel := context.WithCancel(context.Background()) +// wg.Add(1) +// go func() { +// assert.ErrorIs(t, be.send(ctx, &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) +// wg.Done() +// }() +// wg.Add(1) +// go func() { +// time.Sleep(20 * time.Millisecond) // ensure this call is the second +// assert.ErrorIs(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 100 * time.Millisecond}), context.Canceled) +// wg.Done() +// }() +// cancel() // canceling the first request should cancel the whole batch +// wg.Wait() + +// // nothing should be delivered +// assert.Equal(t, uint64(0), sink.requestsCount.Load()) +// assert.Equal(t, uint64(0), sink.itemsCount.Load()) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func TestBatchSender_DrainActiveRequests(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 2 +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NotNil(t, be) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // send 3 blocking requests with a timeout +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() +// go func() { +// assert.NoError(t, be.send(context.Background(), &fakeRequest{items: 1, sink: sink, delay: 40 * time.Millisecond})) +// }() + +// // give time for the first two requests to be batched +// time.Sleep(20 * time.Millisecond) + +// // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. +// // It should take 120 milliseconds to complete. +// require.NoError(t, be.Shutdown(context.Background())) + +// assert.Equal(t, uint64(2), sink.requestsCount.Load()) +// assert.Equal(t, uint64(3), sink.itemsCount.Load()) +// } + +// func TestBatchSender_WithBatcherOption(t *testing.T) { +// tests := []struct { +// name string +// opts []Option +// expectedErr bool +// }{ +// { +// name: "no_funcs_set", +// opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig())}, +// expectedErr: true, +// }, +// { +// name: "funcs_set_internally", +// opts: []Option{withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, +// expectedErr: false, +// }, +// { +// name: "funcs_set_twice", +// opts: []Option{ +// withBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, +// fakeBatchMergeSplitFunc)), +// }, +// expectedErr: true, +// }, +// { +// name: "nil_funcs", +// opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))}, +// expectedErr: true, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, tt.opts...) +// if tt.expectedErr { +// assert.Nil(t, be) +// assert.Error(t, err) +// } else { +// assert.NotNil(t, be) +// assert.NoError(t, err) +// } +// }) +// } +// } + +// func TestBatchSender_UnstartedShutdown(t *testing.T) { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) + +// err = be.Shutdown(context.Background()) +// require.NoError(t, err) +// } + +// // TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being +// // merged. +// func TestBatchSender_ShutdownDeadlock(t *testing.T) { +// blockMerge := make(chan struct{}) +// waitMerge := make(chan struct{}, 10) + +// // blockedBatchMergeFunc blocks until the blockMerge channel is closed +// blockedBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { +// waitMerge <- struct{}{} +// <-blockMerge +// r1.(*fakeRequest).items += r2.(*fakeRequest).items +// return r1, nil +// } + +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests +// go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() +// go func() { require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() + +// // Wait for the requests to enter the merge function +// <-waitMerge + +// // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, +// // then wait for the exporter to finish. +// startShutdown := make(chan struct{}) +// doneShutdown := make(chan struct{}) +// go func() { +// close(startShutdown) +// require.NoError(t, be.Shutdown(context.Background())) +// close(doneShutdown) +// }() +// <-startShutdown +// close(blockMerge) +// <-doneShutdown + +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 8, sink.itemsCount.Load()) +// } + +// func TestBatchSenderWithTimeout(t *testing.T) { +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 10 +// tCfg := NewDefaultTimeoutConfig() +// tCfg.Timeout = 50 * time.Millisecond +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), +// WithTimeout(tCfg)) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // Send 3 concurrent requests that should be merged in one batch +// wg := sync.WaitGroup{} +// for i := 0; i < 3; i++ { +// wg.Add(1) +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// wg.Done() +// }() +// } +// wg.Wait() +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) + +// // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender +// for i := 0; i < 3; i++ { +// wg.Add(1) +// go func() { +// assert.Error(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink, delay: 30 * time.Millisecond})) +// wg.Done() +// }() +// } +// wg.Wait() + +// assert.NoError(t, be.Shutdown(context.Background())) + +// // The sink should not change +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) +// } + +// func TestBatchSenderTimerResetNoConflict(t *testing.T) { +// delayBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) { +// time.Sleep(30 * time.Millisecond) +// if r1 == nil { +// return r2, nil +// } +// fr1 := r1.(*fakeRequest) +// fr2 := r2.(*fakeRequest) +// if fr2.mergeErr != nil { +// return nil, fr2.mergeErr +// } +// return &fakeRequest{ +// items: fr1.items + fr2.items, +// sink: fr1.sink, +// exportErr: fr2.exportErr, +// delay: fr1.delay + fr2.delay, +// }, nil +// } +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 8 +// bCfg.FlushTimeout = 50 * time.Millisecond +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// time.Sleep(30 * time.Millisecond) +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() + +// // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict +// assert.EventuallyWithT(t, func(c *assert.CollectT) { +// assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(c, 8, sink.itemsCount.Load()) +// }, 200*time.Millisecond, 10*time.Millisecond) + +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func TestBatchSenderTimerFlush(t *testing.T) { +// if runtime.GOOS == "windows" { +// t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") +// } +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 8 +// bCfg.FlushTimeout = 100 * time.Millisecond +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, +// WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// sink := newFakeRequestSink() +// time.Sleep(50 * time.Millisecond) + +// // Send 2 concurrent requests that should be merged in one batch and sent immediately +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// assert.EventuallyWithT(t, func(c *assert.CollectT) { +// assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(c, 8, sink.itemsCount.Load()) +// }, 30*time.Millisecond, 5*time.Millisecond) + +// // Send another request that should be flushed after 100ms instead of 50ms since last flush +// go func() { +// require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() + +// // Confirm that it is not flushed in 50ms +// time.Sleep(60 * time.Millisecond) +// assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load()) +// assert.EqualValues(t, 8, sink.itemsCount.Load()) + +// // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) +// time.Sleep(50 * time.Millisecond) +// assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load()) +// assert.EqualValues(t, 12, sink.itemsCount.Load()) +// require.NoError(t, be.Shutdown(context.Background())) +// } + +// func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter { +// be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption, +// WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) +// require.NotNil(t, be) +// require.NoError(t, err) +// return be +// } diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 58edbcc8732..6228db4f274 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -6,6 +6,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "context" "errors" + "math" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -15,6 +16,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/internal/queue" @@ -79,7 +81,8 @@ type queueSender struct { queue exporterqueue.Queue[Request] numConsumers int traceAttribute attribute.KeyValue - consumers *queue.Consumers[Request] + // consumers *queue.Consumers[Request] + batcher *queue.Batcher obsrep *obsReport exporterID component.ID @@ -94,7 +97,19 @@ func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numCo obsrep: obsrep, exporterID: set.ID, } - consumeFunc := func(ctx context.Context, req Request) error { + + // consumeFunc := func(ctx context.Context, req Request) error { + // err := qs.nextSender.send(ctx, req) + // if err != nil { + // set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, + // zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + // } + // return err + // } + + // qs.consumers = queue.NewQueueConsumers[Request](q, numConsumers, consumeFunc) + + exportFunc := func(ctx context.Context, req Request) error { err := qs.nextSender.send(ctx, req) if err != nil { set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, @@ -102,13 +117,31 @@ func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numCo } return err } - qs.consumers = queue.NewQueueConsumers[Request](q, numConsumers, consumeFunc) + + qs.batcher = queue.NewBatcher(q, numConsumers, exportFunc) return qs } // Start is invoked during service startup. func (qs *queueSender) Start(ctx context.Context, host component.Host) error { - if err := qs.consumers.Start(ctx, host); err != nil { + // if err := qs.consumers.Start(ctx, host); err != nil { + // return err + // } + + if bs, ok := (qs.nextSender).(*batchSender); ok { + qs.batcher.Config(bs.cfg) + } else { + config := exporterbatcher.NewDefaultConfig() + config.MinSizeConfig = exporterbatcher.MinSizeConfig{ + MinSizeItems: 0, + } + config.MaxSizeConfig = exporterbatcher.MaxSizeConfig{ + MaxSizeItems: math.MaxInt, + } + qs.batcher.Config(config) + } + + if err := qs.batcher.Start(ctx, host); err != nil { return err } @@ -125,7 +158,10 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error { func (qs *queueSender) Shutdown(ctx context.Context) error { // Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. - return qs.consumers.Shutdown(ctx) + + // return qs.consumers.Shutdown(ctx) + + return qs.batcher.Shutdown(ctx) } // send implements the requestSender interface. It puts the request in the queue. diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go new file mode 100644 index 00000000000..db11dd1e296 --- /dev/null +++ b/exporter/internal/queue/batcher.go @@ -0,0 +1,152 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "sync" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +type batch struct { + ctx context.Context + req internal.Request + onExportFinished func(error) +} + +type Batcher struct { + cfg exporterbatcher.Config + mergeFunc exporterbatcher.BatchMergeFunc[internal.Request] + mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[internal.Request] + + queue Queue[internal.Request] + numWorkers int + exportFunc func(context.Context, internal.Request) error + stopWG sync.WaitGroup + + mu sync.Mutex + lastFlushed time.Time + pendingBatches []batch + timer *time.Timer +} + +func NewBatcher(queue Queue[internal.Request], numWorkers int, exportFunc func(context.Context, internal.Request) error) *Batcher { + return &Batcher{ + queue: queue, + numWorkers: numWorkers, + exportFunc: exportFunc, + stopWG: sync.WaitGroup{}, + pendingBatches: make([]batch, 1), + } +} + +func (qb *Batcher) Config(cfg exporterbatcher.Config) { + qb.cfg = cfg +} + +func (qb *Batcher) flushIfNecessary() { + qb.mu.Lock() + if time.Since(qb.lastFlushed) < qb.cfg.FlushTimeout && qb.pendingBatches[0].req.ItemsCount() < qb.cfg.MinSizeItems { + qb.mu.Unlock() + return + } + + flushedBatch := qb.pendingBatches[0] + qb.pendingBatches = qb.pendingBatches[1:] + if len(qb.pendingBatches) == 0 { + qb.pendingBatches = append(qb.pendingBatches, batch{}) + } + + qb.lastFlushed = time.Now() + qb.timer.Reset(qb.cfg.FlushTimeout) + qb.mu.Unlock() + + err := qb.exportFunc(flushedBatch.ctx, flushedBatch.req) + if flushedBatch.onExportFinished != nil { + flushedBatch.onExportFinished(err) + } +} + +func (qb *Batcher) push(req internal.Request, onExportFinished func(error)) error { + qb.mu.Lock() + defer qb.mu.Unlock() + + idx := len(qb.pendingBatches) - 1 + if qb.pendingBatches[idx].req == nil { + qb.pendingBatches[idx].req = req + qb.pendingBatches[idx].ctx = context.Background() + qb.pendingBatches[idx].onExportFinished = onExportFinished + } else { + reqs, err := qb.mergeSplitFunc(context.Background(), + qb.cfg.MaxSizeConfig, + qb.pendingBatches[idx].req, req) + if err != nil || len(reqs) == 0 { + return err + } + + for offset, newReq := range reqs { + if offset != 0 { + qb.pendingBatches = append(qb.pendingBatches, batch{}) + } + qb.pendingBatches[idx+offset].req = newReq + } + } + return nil +} + +// Start ensures that queue and all consumers are started. +func (qb *Batcher) Start(ctx context.Context, host component.Host) error { + if err := qb.queue.Start(ctx, host); err != nil { + return err + } + + qb.timer = time.NewTimer(qb.cfg.FlushTimeout) + allocReader := make(chan bool, 10) + + go func() { + allocReader <- true + }() + + var startWG sync.WaitGroup + for i := 0; i < qb.numWorkers; i++ { + qb.stopWG.Add(1) + startWG.Add(1) + go func() { + startWG.Done() + defer qb.stopWG.Done() + for { + select { + case <-qb.timer.C: + qb.flushIfNecessary() + case <-allocReader: + req, ok, onProcessingFinished := qb.queue.ClaimAndRead(func() { + allocReader <- true + }) + if !ok { + return + } + + qb.push(req, onProcessingFinished) // Handle error + qb.flushIfNecessary() + } + } + }() + } + startWG.Wait() + + return nil +} + +// Shutdown ensures that queue and all Batcher are stopped. +func (qb *Batcher) Shutdown(ctx context.Context) error { + if err := qb.queue.Shutdown(ctx); err != nil { + return err + } + qb.stopWG.Wait() + return nil +} diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 98e1b281176..df7a8584996 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -40,19 +40,35 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}, q.sizer.Sizeof(req), nil) } +func (q *boundedMemoryQueue[T]) ClaimAndRead(onClaim func()) (T, bool, func(error)) { + item, ok := q.sizedChannel.pop() + onClaim() + + if !ok { + return item.req, ok, nil + } + + q.sizedChannel.updateSize(-q.sizer.Sizeof(item.req)) + return item.req, ok, nil +} + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped and emptied. func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - item, ok := q.sizedChannel.pop(func(el memQueueEl[T]) int64 { return q.sizer.Sizeof(el.req) }) + item, ok := q.sizedChannel.pop() if !ok { return false } + q.sizedChannel.updateSize(-q.sizer.Sizeof(item.req)) // the memory queue doesn't handle consume errors _ = consumeFunc(item.ctx, item.req) return true } +func (pq *boundedMemoryQueue[T]) CommitConsume(ctx context.Context, index uint64) { +} + // Shutdown closes the queue channel to initiate draining of the queue. func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { q.sizedChannel.shutdown() diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 7dd646c6ef3..47696806f8e 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -188,34 +188,52 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) ( return bytesToItemIndex(val) } + +func (pq *persistentQueue[T]) ClaimAndRead(onClaim func()) (T, bool, func(error)) { + _, ok := pq.sizedChannel.pop() + onClaim() + + if !ok { + var t T + return t, false, nil + } + + var ( + req T + onProcessingFinished func(error) + consumed bool + ) + req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + if consumed { + pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req)) + } + return req, true, onProcessingFinished +} + +func (pq *persistentQueue[T]) CommitConsume(ctx context.Context, index uint64) { + if err := pq.itemDispatchingFinish(ctx, index); err != nil { + pq.logger.Error("Error deleting item from queue", zap.Error(err)) + } +} + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { - for { - var ( - req T - onProcessingFinished func(error) - consumed bool - ) - - // If we are stopped we still process all the other events in the channel before, but we - // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. - _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { - req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) - if !consumed { - return 0 - } - return pq.set.Sizer.Sizeof(req) - }) - if !ok { - return false - } - if consumed { - onProcessingFinished(consumeFunc(context.Background(), req)) - return true - } + // If we are stopped we still process all the other events in the channel before, but we + // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. + if _, ok := pq.sizedChannel.pop(); !ok { + return false } + + req, onProcessingFinished, ok := pq.getNextItem(context.Background()) + if !ok { + return false + } + + pq.sizedChannel.updateSize(-pq.set.Sizer.Sizeof(req)) + onProcessingFinished(consumeFunc(context.Background(), req)) + return true } func (pq *persistentQueue[T]) Shutdown(ctx context.Context) error { @@ -344,8 +362,7 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), // Increase the reference count, so the client is not closed while the request is being processed. // The client cannot be closed because we hold the lock since last we checked `stopped`. - pq.refClient++ - return request, func(consumeErr error) { + onProcessingFinished := func(consumeErr error) { // Delete the item from the persistent storage after it was processed. pq.mu.Lock() // Always unref client even if the consumer is shutdown because we always ref it for every valid request. @@ -377,7 +394,9 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), // Ensure the used size and the channel size are in sync. pq.sizedChannel.syncSize() - }, true + } + pq.refClient++ + return request, onProcessingFinished, true } // retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 35bc504579e..652a0a93be8 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -25,6 +25,10 @@ type Queue[T any] interface { // without violating capacity restrictions. If success returns no error. // It returns ErrQueueIsFull if no space is currently available. Offer(ctx context.Context, item T) error + + ClaimAndRead(onClaim func()) (T, bool, func(error)) + CommitConsume(ctx context.Context, index uint64) + // Consume applies the provided function on the head of queue. // The call blocks until there is an item available or the queue is stopped. // The function returns true when an item is consumed or false if the queue is stopped. diff --git a/exporter/internal/queue/sized_channel.go b/exporter/internal/queue/sized_channel.go index f322e58c01c..24f5528fb67 100644 --- a/exporter/internal/queue/sized_channel.go +++ b/exporter/internal/queue/sized_channel.go @@ -7,6 +7,7 @@ import "sync/atomic" // sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. // The channel will accept elements until the total size of the elements reaches the capacity. +// Not thread safe. type sizedChannel[T any] struct { used *atomic.Int64 @@ -67,27 +68,46 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error } } -// pop removes the element from the queue and returns it. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped and emptied. -// The callback is called before the element is removed from the queue. It must return the size of the element. -func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { - el, ok := <-vcq.ch - if !ok { - return el, false - } +// REMOVE +// NOTE: the main change in size_channel is that pop() is seprated into pop() and updateSize() +// This is because we want to parallize "reading" from the queue, and we only know the item size +// after reading. We also need to update the queue size in case the batch end up failing. - size := callback(el) +func (vcq *sizedChannel[T]) pop() (T, bool) { + el, ok := <-vcq.ch + return el, ok +} +func (vcq *sizedChannel[T]) updateSize(deltaSize int64) { // The used size and the channel size might be not in sync with the queue in case it's restored from the disk // because we don't flush the current queue size on the disk on every read/write. // In that case we need to make sure it doesn't go below 0. - if vcq.used.Add(-size) < 0 { + if vcq.used.Add(deltaSize) < 0 { vcq.used.Store(0) } - return el, true } +// // pop removes the element from the queue and returns it. +// // The call blocks until there is an item available or the queue is stopped. +// // The function returns true when an item is consumed or false if the queue is stopped and emptied. +// // The callback is called before the element is removed from the queue. It must return the size of the element. +// func (vcq *sizedChannel[T]) pop(callback func(T) (size int64)) (T, bool) { +// el, ok := <-vcq.ch +// if !ok { +// return el, false +// } + +// size := callback(el) + +// // The used size and the channel size might be not in sync with the queue in case it's restored from the disk +// // because we don't flush the current queue size on the disk on every read/write. +// // In that case we need to make sure it doesn't go below 0. +// if vcq.used.Add(-size) < 0 { +// vcq.used.Store(0) +// } +// return el, true +// } + // syncSize updates the used size to 0 if the queue is empty. // The caller must ensure that this call is not called concurrently with push. // It's used by the persistent queue to ensure the used value correctly reflects the reality which may not be always