Skip to content

Commit

Permalink
improve unit test add blockingConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Nov 17, 2023
1 parent ed5c906 commit b4ae2fa
Showing 1 changed file with 60 additions and 62 deletions.
122 changes: 60 additions & 62 deletions collector/processor/concurrentbatchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ import (
"errors"
"fmt"
"math"
"runtime"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/semaphore"

"github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/testdata"
"go.opentelemetry.io/collector/client"
Expand Down Expand Up @@ -206,80 +204,60 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) {
require.NoError(t, bp.Shutdown(context.Background()))
}

func blockStart(b *shard, done chan int) {
var items []dataItem
for {
select {
case item := <-b.newItem:
if item.data == nil {
continue
}
items = append(items, item)
case <-done:
for i := range items {
b.processItem(items[i])

}
return
}
}
type blockingConsumer struct {
numItems int
wg sync.WaitGroup
}

func newBlockingBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func() batch, useOtel bool) (*batchProcessor, error, chan int) {
bp := &batchProcessor{
logger: set.Logger,

sendBatchSize: int(cfg.SendBatchSize),
sendBatchMaxSize: int(cfg.SendBatchMaxSize),
timeout: cfg.Timeout,
batchFunc: batchFunc,
shutdownC: make(chan struct{}, 1),
maxInFlightBytes: int(cfg.MaxInFlightBytes),
}
exportCtx := client.NewContext(context.Background(), client.Info{
Metadata: client.NewMetadata(nil),
})
b := &shard{
processor: bp,
newItem: make(chan dataItem, runtime.NumCPU()),
exportCtx: exportCtx,
batch: bp.batchFunc(),
sem: semaphore.NewWeighted(int64(bp.maxInFlightBytes)),
}
func (bc *blockingConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
bc.numItems += td.SpanCount()
bc.wg.Wait()
return nil
}

b.processor.goroutines.Add(1)
done := make(chan int, 1)
go blockStart(b, done)
func (bc *blockingConsumer) unblock() {
bc.wg.Done()
}

bp.batcher = &singleShardBatcher{batcher: b}
func (bc *blockingConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

bpt, err := newBatchProcessorTelemetry(set, bp.batcher.currentMetadataCardinality, useOtel)
if err != nil {
return nil, fmt.Errorf("error creating batch processor telemetry: %w", err), done
// helper function to help determine a setting for cfg.MaxInFlightBytes based
// on the number of requests and number of spans per request.
func calculateMaxInFlightBytes(numRequests, spansPerRequest int) uint32 {
sentResourceSpans := ptrace.NewTraces().ResourceSpans()
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ {
spans.At(spanIndex).SetName(getTestSpanName(0, spanIndex))
}
bp.telemetry = bpt
td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty())

return bp, nil, done
szr := &ptrace.ProtoMarshaler{}
return uint32(szr.TracesSize(td) * numRequests)
}

// This test is meant to confirm that semaphore is still
// released if the client context is canceled.
func TestBatchProcessorCancelContext(t *testing.T) {
sink := new(consumertest.TracesSink)
requestCount := 10
spansPerRequest := 250
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
cfg.MaxInFlightBytes = calculateMaxInFlightBytes(requestCount, spansPerRequest)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
batcher, err, doneCh := newBlockingBatchProcessor(creationSet, cfg, func() batch { return newBatchTraces(sink) }, true)
bc := &blockingConsumer{}
bc.wg.Add(1)
bp, err := newBatchTracesProcessor(creationSet, bc, cfg, true)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 10
spansPerRequest := 250
sentResourceSpans := ptrace.NewTraces().ResourceSpans()
var wg sync.WaitGroup
ctxTimeout, _ := context.WithTimeout(context.Background(), time.Second*1)
ctx, cancel := context.WithCancel(context.Background())
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
Expand All @@ -291,22 +269,42 @@ func TestBatchProcessorCancelContext(t *testing.T) {
// until batch size reached to unblock.
wg.Add(1)
go func() {
err = batcher.ConsumeTraces(ctxTimeout, td)
assert.Contains(t, err.Error(), "context deadline exceeded")
err = bp.ConsumeTraces(ctx, td)
assert.Contains(t, err.Error(), "context canceled")
wg.Done()
}()
}

// wait until context deadline is exceeded.
// check all spans arrived in blockingConsumer.
require.Eventually(t, func() bool {
numSpans := requestCount * spansPerRequest
return bc.numItems == numSpans
}, 5 * time.Second, 10 * time.Millisecond)

// semaphore should be fully acquired at this point.
assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1)))

wg.Add(1)
go func() {
td := testdata.GenerateTraces(spansPerRequest)
err = bp.ConsumeTraces(ctx, td)
assert.Contains(t, err.Error(), "context canceled")
wg.Done()
}()

// cancel context and wait for ConsumeTraces to return.
cancel()
wg.Wait()

// signal to the sender to process and send records.
doneCh <- 1
// check sending another request does not change the semaphore count, even after ConsumeTraces returns.
assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1)))

// signal to the blockingConsumer to return response to waiters.
bc.unblock()

// Semaphore should be released once all requests are sent. Confirm we can acquire MaxInFlightBytes bytes.
// Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytes bytes.
require.Eventually(t, func() bool {
return batcher.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes))
return bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes))
}, 5 * time.Second, 10 * time.Millisecond)
}

Expand Down

0 comments on commit b4ae2fa

Please sign in to comment.