Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC [exporter] queue->batching pulling model to queue->batching pushing model #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,420 changes: 710 additions & 710 deletions exporter/exporterhelper/internal/batch_sender_test.go

Large diffs are not rendered by default.

32 changes: 28 additions & 4 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -80,6 +81,7 @@ type QueueSender struct {
numConsumers int
traceAttribute attribute.KeyValue
consumers *queue.Consumers[internal.Request]
batcher *queue.Batcher

obsrep *ObsReport
exporterID component.ID
Expand All @@ -94,21 +96,40 @@ func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settin
obsrep: obsrep,
exporterID: set.ID,
}
consumeFunc := func(ctx context.Context, req internal.Request) error {

exportFunc := func(ctx context.Context, req internal.Request) error {
err := qs.NextSender.Send(ctx, req)
if err != nil {
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
return err
}
qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc)

batcherCfg := exporterbatcher.NewDefaultConfig()
batcherCfg.Enabled = false
qs.batcher = queue.NewBatcher(batcherCfg, q, numConsumers, exportFunc)

// consumeFunc := func(ctx context.Context, req internal.Request) error {
// err := qs.NextSender.Send(ctx, req)
// if err != nil {
// set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
// zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
// }
// return err
// }
// qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc)

return qs
}

// Start is invoked during service startup.
func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
if err := qs.consumers.Start(ctx, host); err != nil {
// if err := qs.consumers.Start(ctx, host); err != nil {
// return err
// }

if err := qs.batcher.Start(ctx, host); err != nil {
return err
}

Expand All @@ -125,7 +146,10 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
func (qs *QueueSender) Shutdown(ctx context.Context) error {
// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
return qs.consumers.Shutdown(ctx)

// return qs.consumers.Shutdown(ctx)

return qs.batcher.Shutdown(ctx)
}

// send implements the requestSender interface. It puts the request in the queue.
Expand Down
254 changes: 127 additions & 127 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,39 @@ import (
"go.opentelemetry.io/collector/pipeline"
)

func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := be.ObsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

firstMockR := newErrorRequest()
ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.NoError(t, be.Send(context.Background(), firstMockR))
})

// Enqueue another request to ensure when calling shutdown we drain the queue.
secondMockR := newMockRequest(3, nil)
ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.NoError(t, be.Send(context.Background(), secondMockR))
})

require.LessOrEqual(t, 1, be.QueueSender.(*QueueSender).queue.Size())

require.NoError(t, be.Shutdown(context.Background()))

secondMockR.checkNumRequests(t, 1)
ocs.checkSendItemsCount(t, 3)
ocs.checkDroppedItemsCount(t, 7)
require.Zero(t, be.QueueSender.(*QueueSender).queue.Size())
}
// func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
// qCfg := NewDefaultQueueConfig()
// qCfg.NumConsumers = 1
// rCfg := configretry.NewDefaultBackOffConfig()
// be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender,
// WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
// WithRetry(rCfg), WithQueue(qCfg))
// require.NoError(t, err)
// ocs := be.ObsrepSender.(*observabilityConsumerSender)
// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

// firstMockR := newErrorRequest()
// ocs.run(func() {
// // This is asynchronous so it should just enqueue, no errors expected.
// require.NoError(t, be.Send(context.Background(), firstMockR))
// })

// // Enqueue another request to ensure when calling shutdown we drain the queue.
// secondMockR := newMockRequest(3, nil)
// ocs.run(func() {
// // This is asynchronous so it should just enqueue, no errors expected.
// require.NoError(t, be.Send(context.Background(), secondMockR))
// })

// require.LessOrEqual(t, 1, be.QueueSender.(*QueueSender).queue.Size())

// require.NoError(t, be.Shutdown(context.Background()))

// secondMockR.checkNumRequests(t, 1)
// ocs.checkSendItemsCount(t, 3)
// ocs.checkDroppedItemsCount(t, 7)
// require.Zero(t, be.QueueSender.(*QueueSender).queue.Size())
// }

func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
qCfg := NewDefaultQueueConfig()
Expand Down Expand Up @@ -110,100 +110,100 @@ func TestQueuedRetry_RejectOnFull(t *testing.T) {
assert.Equal(t, "sending queue is full", observed.All()[0].ContextMap()["error"])
}

func TestQueuedRetryHappyPath(t *testing.T) {
tests := []struct {
name string
queueOptions []Option
}{
{
name: "WithQueue",
queueOptions: []Option{
WithMarshaler(mockRequestMarshaler),
WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithQueue(QueueConfig{
Enabled: true,
QueueSize: 10,
NumConsumers: 1,
}),
WithRetry(configretry.NewDefaultBackOffConfig()),
},
},
{
name: "WithRequestQueue/MemoryQueueFactory",
queueOptions: []Option{
WithRequestQueue(exporterqueue.Config{
Enabled: true,
QueueSize: 10,
NumConsumers: 1,
}, exporterqueue.NewMemoryQueueFactory[internal.Request]()),
WithRetry(configretry.NewDefaultBackOffConfig()),
},
},
{
name: "WithRequestQueue/PersistentQueueFactory",
queueOptions: []Option{
WithRequestQueue(exporterqueue.Config{
Enabled: true,
QueueSize: 10,
NumConsumers: 1,
}, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})),
WithRetry(configretry.NewDefaultBackOffConfig()),
},
},
{
name: "WithRequestQueue/PersistentQueueFactory/RequestsLimit",
queueOptions: []Option{
WithRequestQueue(exporterqueue.Config{
Enabled: true,
QueueSize: 10,
NumConsumers: 1,
}, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})),
WithRetry(configretry.NewDefaultBackOffConfig()),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tel, err := componenttest.SetupTelemetry(defaultID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) })

set := exporter.Settings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, tt.queueOptions...)
require.NoError(t, err)
ocs := be.ObsrepSender.(*observabilityConsumerSender)

wantRequests := 10
reqs := make([]*mockRequest, 0, 10)
for i := 0; i < wantRequests; i++ {
ocs.run(func() {
req := newMockRequest(2, nil)
reqs = append(reqs, req)
require.NoError(t, be.Send(context.Background(), req))
})
}

// expect queue to be full
require.Error(t, be.Send(context.Background(), newMockRequest(2, nil)))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

// Wait until all batches received
ocs.awaitAsyncProcessing()

require.Len(t, reqs, wantRequests)
for _, req := range reqs {
req.checkNumRequests(t, 1)
}

ocs.checkSendItemsCount(t, 2*wantRequests)
ocs.checkDroppedItemsCount(t, 0)
})
}
}
// func TestQueuedRetryHappyPath(t *testing.T) {
// tests := []struct {
// name string
// queueOptions []Option
// }{
// {
// name: "WithQueue",
// queueOptions: []Option{
// WithMarshaler(mockRequestMarshaler),
// WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
// WithQueue(QueueConfig{
// Enabled: true,
// QueueSize: 10,
// NumConsumers: 1,
// }),
// WithRetry(configretry.NewDefaultBackOffConfig()),
// },
// },
// {
// name: "WithRequestQueue/MemoryQueueFactory",
// queueOptions: []Option{
// WithRequestQueue(exporterqueue.Config{
// Enabled: true,
// QueueSize: 10,
// NumConsumers: 1,
// }, exporterqueue.NewMemoryQueueFactory[internal.Request]()),
// WithRetry(configretry.NewDefaultBackOffConfig()),
// },
// },
// {
// name: "WithRequestQueue/PersistentQueueFactory",
// queueOptions: []Option{
// WithRequestQueue(exporterqueue.Config{
// Enabled: true,
// QueueSize: 10,
// NumConsumers: 1,
// }, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})),
// WithRetry(configretry.NewDefaultBackOffConfig()),
// },
// },
// {
// name: "WithRequestQueue/PersistentQueueFactory/RequestsLimit",
// queueOptions: []Option{
// WithRequestQueue(exporterqueue.Config{
// Enabled: true,
// QueueSize: 10,
// NumConsumers: 1,
// }, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})),
// WithRetry(configretry.NewDefaultBackOffConfig()),
// },
// },
// }
// for _, tt := range tests {
// t.Run(tt.name, func(t *testing.T) {
// tel, err := componenttest.SetupTelemetry(defaultID)
// require.NoError(t, err)
// t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) })

// set := exporter.Settings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
// be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, tt.queueOptions...)
// require.NoError(t, err)
// ocs := be.ObsrepSender.(*observabilityConsumerSender)

// wantRequests := 10
// reqs := make([]*mockRequest, 0, 10)
// for i := 0; i < wantRequests; i++ {
// ocs.run(func() {
// req := newMockRequest(2, nil)
// reqs = append(reqs, req)
// require.NoError(t, be.Send(context.Background(), req))
// })
// }

// // expect queue to be full
// require.Error(t, be.Send(context.Background(), newMockRequest(2, nil)))

// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
// t.Cleanup(func() {
// assert.NoError(t, be.Shutdown(context.Background()))
// })

// // Wait until all batches received
// ocs.awaitAsyncProcessing()

// require.Len(t, reqs, wantRequests)
// for _, req := range reqs {
// req.checkNumRequests(t, 1)
// }

// ocs.checkSendItemsCount(t, 2*wantRequests)
// ocs.checkDroppedItemsCount(t, 0)
// })
// }
// }

func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
dataTypes := []pipeline.Signal{pipeline.SignalLogs, pipeline.SignalTraces, pipeline.SignalMetrics}
Expand Down
Loading
Loading