Skip to content

Commit

Permalink
[exporterqueue] Default Batcher that reads from the queue, batches an…
Browse files Browse the repository at this point in the history
…d exports (open-telemetry#11546)

#### Description

This PR follows
open-telemetry#11540 and
implements support for item-count based batching for queue batcher.

Limitation: This PR supports merging request but not splitting request.
In other words, it support specifying a minimum request size but not a
maximum request size.

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing

#### Link to tracking issue

open-telemetry#8122
open-telemetry#10368
  • Loading branch information
sfc-gh-sili authored Oct 30, 2024
1 parent 8265197 commit b76b9f7
Show file tree
Hide file tree
Showing 5 changed files with 368 additions and 22 deletions.
17 changes: 14 additions & 3 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,23 @@ type BaseBatcher struct {
}

func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) {
if batchCfg.Enabled {
if !batchCfg.Enabled {
return &DisabledBatcher{
BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
stopWG: sync.WaitGroup{},
},
}, nil
}

if batchCfg.MaxSizeConfig.MaxSizeItems != 0 {
return nil, errors.ErrUnsupported
}

return &DisabledBatcher{
BaseBatcher{
return &DefaultBatcher{
BaseBatcher: BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
Expand Down
122 changes: 122 additions & 0 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"math"
"sync"
"time"

"go.opentelemetry.io/collector/component"
)

// DefaultBatcher continuously reads from the queue and flushes asynchronously if size limit is met or on timeout.
type DefaultBatcher struct {
BaseBatcher
currentBatchMu sync.Mutex
currentBatch *batch
timer *time.Timer
shutdownCh chan bool
}

func (qb *DefaultBatcher) resetTimer() {
if qb.batchCfg.FlushTimeout != 0 {
qb.timer.Reset(qb.batchCfg.FlushTimeout)
}
}

// startReadingFlushingGoroutine starts a goroutine that reads and then flushes.
func (qb *DefaultBatcher) startReadingFlushingGoroutine() {

qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
for {
// Read() blocks until the queue is non-empty or until the queue is stopped.
idx, ctx, req, ok := qb.queue.Read(context.Background())
if !ok {
qb.shutdownCh <- true
return
}

qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
qb.currentBatch = &batch{
req: req,
ctx: ctx,
idxList: []uint64{idx}}
} else {
mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
req: mergedReq,
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx)}
}

if qb.currentBatch.req.ItemsCount() > qb.batchCfg.MinSizeItems {
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
qb.resetTimer()
} else {
qb.currentBatchMu.Unlock()
}
}
}()
}

// startTimeBasedFlushingGoroutine starts a goroutine that flushes on timeout.
func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() {
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
for {
select {
case <-qb.shutdownCh:
return
case <-qb.timer.C:
qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.currentBatchMu.Unlock()
continue
}
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()

// flushAsync() blocks until successfully started a goroutine for flushing.
qb.flushAsync(batchToFlush)
qb.resetTimer()
}
}
}()
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error {
qb.startWorkerPool()
qb.shutdownCh = make(chan bool, 1)

if qb.batchCfg.FlushTimeout == 0 {
qb.timer = time.NewTimer(math.MaxInt)
qb.timer.Stop()
} else {
qb.timer = time.NewTimer(qb.batchCfg.FlushTimeout)
}

qb.startReadingFlushingGoroutine()
qb.startTimeBasedFlushingGoroutine()

return nil
}
217 changes: 217 additions & 0 deletions exporter/internal/queue/default_batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

func TestDefaultBatcher_MinThresholdZero_TimeoutDisabled(t *testing.T) {
tests := []struct {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
},
{
name: "three_workers",
maxWorkers: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
cfg.FlushTimeout = 0
cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{
MinSizeItems: 0,
}

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 5 && sink.itemsCount.Load() == 75
}, 30*time.Millisecond, 10*time.Millisecond)
})
}
}

func TestDefaultBatcher_TimeoutDisabled(t *testing.T) {
tests := []struct {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
},
{
name: "three_workers",
maxWorkers: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
cfg.FlushTimeout = 0
cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{
MinSizeItems: 10,
}

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

// These two requests will be dropped because of export error.
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink}))

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 7, sink: sink}))

// This request will be dropped because of merge error
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, mergeErr: errors.New("transient error"), sink: sink}))

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 55
}, 30*time.Millisecond, 10*time.Millisecond)
})
}
}

func TestDefaultBatcher_WithTimeout(t *testing.T) {
tests := []struct {
name string
maxWorkers int
}{
{
name: "infinate_workers",
maxWorkers: 0,
},
{
name: "one_worker",
maxWorkers: 1,
},
{
name: "three_workers",
maxWorkers: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
cfg.FlushTimeout = 50 * time.Millisecond
cfg.MinSizeConfig = exporterbatcher.MinSizeConfig{
MinSizeItems: 100,
}

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

ba, err := NewBatcher(cfg, q, tt.maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink}))

// This request will be dropped because of merge error
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, mergeErr: errors.New("transient error"), sink: sink}))

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 35, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 2, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 75
}, 100*time.Millisecond, 10*time.Millisecond)
})
}
}

func TestDisabledBatcher_SplitNotImplemented(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
maxWorkers := 0
cfg.MaxSizeConfig.MaxSizeItems = 1

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

_, err := NewBatcher(cfg, q, maxWorkers)
require.Error(t, err)
}
15 changes: 0 additions & 15 deletions exporter/internal/queue/disabled_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,3 @@ func TestDisabledBatcher_Basic(t *testing.T) {
})
}
}

func TestDisabledBatcher_BatchingNotImplemented(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
maxWorkers := 0

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

_, err := NewBatcher(cfg, q, maxWorkers)
require.Error(t, err)
}
Loading

0 comments on commit b76b9f7

Please sign in to comment.