Skip to content

Commit

Permalink
POC
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Sep 29, 2024
1 parent 9811830 commit 3617a1d
Show file tree
Hide file tree
Showing 9 changed files with 1,169 additions and 920 deletions.
1,420 changes: 710 additions & 710 deletions exporter/exporterhelper/internal/batch_sender_test.go

Large diffs are not rendered by default.

32 changes: 28 additions & 4 deletions exporter/exporterhelper/internal/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -80,6 +81,7 @@ type QueueSender struct {
numConsumers int
traceAttribute attribute.KeyValue
consumers *queue.Consumers[internal.Request]
batcher *queue.Batcher

obsrep *ObsReport
exporterID component.ID
Expand All @@ -94,21 +96,40 @@ func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settin
obsrep: obsrep,
exporterID: set.ID,
}
consumeFunc := func(ctx context.Context, req internal.Request) error {

exportFunc := func(ctx context.Context, req internal.Request) error {
err := qs.NextSender.Send(ctx, req)
if err != nil {
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
return err
}
qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc)

batcherCfg := exporterbatcher.NewDefaultConfig()
batcherCfg.Enabled = false
qs.batcher = queue.NewBatcher(batcherCfg, q, numConsumers, exportFunc)

// consumeFunc := func(ctx context.Context, req internal.Request) error {
// err := qs.NextSender.Send(ctx, req)
// if err != nil {
// set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
// zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
// }
// return err
// }
// qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc)

return qs
}

// Start is invoked during service startup.
func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
if err := qs.consumers.Start(ctx, host); err != nil {
// if err := qs.consumers.Start(ctx, host); err != nil {
// return err
// }

if err := qs.batcher.Start(ctx, host); err != nil {
return err
}

Expand All @@ -125,7 +146,10 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error {
func (qs *QueueSender) Shutdown(ctx context.Context) error {
// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
return qs.consumers.Shutdown(ctx)

// return qs.consumers.Shutdown(ctx)

return qs.batcher.Shutdown(ctx)
}

// send implements the requestSender interface. It puts the request in the queue.
Expand Down
254 changes: 127 additions & 127 deletions exporter/exporterhelper/internal/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,39 +26,39 @@ import (
"go.opentelemetry.io/collector/pipeline"
)

func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
ocs := be.ObsrepSender.(*observabilityConsumerSender)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

firstMockR := newErrorRequest()
ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.NoError(t, be.Send(context.Background(), firstMockR))
})

// Enqueue another request to ensure when calling shutdown we drain the queue.
secondMockR := newMockRequest(3, nil)
ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.NoError(t, be.Send(context.Background(), secondMockR))
})

require.LessOrEqual(t, 1, be.QueueSender.(*QueueSender).queue.Size())

require.NoError(t, be.Shutdown(context.Background()))

secondMockR.checkNumRequests(t, 1)
ocs.checkSendItemsCount(t, 3)
ocs.checkDroppedItemsCount(t, 7)
require.Zero(t, be.QueueSender.(*QueueSender).queue.Size())
}
// func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
// qCfg := NewDefaultQueueConfig()
// qCfg.NumConsumers = 1
// rCfg := configretry.NewDefaultBackOffConfig()
// be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender,
// WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
// WithRetry(rCfg), WithQueue(qCfg))
// require.NoError(t, err)
// ocs := be.ObsrepSender.(*observabilityConsumerSender)
// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

// firstMockR := newErrorRequest()
// ocs.run(func() {
// // This is asynchronous so it should just enqueue, no errors expected.
// require.NoError(t, be.Send(context.Background(), firstMockR))
// })

// // Enqueue another request to ensure when calling shutdown we drain the queue.
// secondMockR := newMockRequest(3, nil)
// ocs.run(func() {
// // This is asynchronous so it should just enqueue, no errors expected.
// require.NoError(t, be.Send(context.Background(), secondMockR))
// })

// require.LessOrEqual(t, 1, be.QueueSender.(*QueueSender).queue.Size())

// require.NoError(t, be.Shutdown(context.Background()))

// secondMockR.checkNumRequests(t, 1)
// ocs.checkSendItemsCount(t, 3)
// ocs.checkDroppedItemsCount(t, 7)
// require.Zero(t, be.QueueSender.(*QueueSender).queue.Size())
// }

func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
qCfg := NewDefaultQueueConfig()
Expand Down Expand Up @@ -110,100 +110,100 @@ func TestQueuedRetry_RejectOnFull(t *testing.T) {
assert.Equal(t, "sending queue is full", observed.All()[0].ContextMap()["error"])
}

func TestQueuedRetryHappyPath(t *testing.T) {
tests := []struct {
name string
queueOptions []Option
}{
{
name: "WithQueue",
queueOptions: []Option{
WithMarshaler(mockRequestMarshaler),
WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithQueue(QueueConfig{
Enabled: true,
QueueSize: 10,
NumConsumers: 1,
}),
WithRetry(configretry.NewDefaultBackOffConfig()),
},
},
{
name: "WithRequestQueue/MemoryQueueFactory",
queueOptions: []Option{
WithRequestQueue(exporterqueue.Config{
Enabled: true,
QueueSize: 10,
NumConsumers: 1,
}, exporterqueue.NewMemoryQueueFactory[internal.Request]()),
WithRetry(configretry.NewDefaultBackOffConfig()),
},
},
{
name: "WithRequestQueue/PersistentQueueFactory",
queueOptions: []Option{
WithRequestQueue(exporterqueue.Config{
Enabled: true,
QueueSize: 10,
NumConsumers: 1,
}, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})),
WithRetry(configretry.NewDefaultBackOffConfig()),
},
},
{
name: "WithRequestQueue/PersistentQueueFactory/RequestsLimit",
queueOptions: []Option{
WithRequestQueue(exporterqueue.Config{
Enabled: true,
QueueSize: 10,
NumConsumers: 1,
}, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})),
WithRetry(configretry.NewDefaultBackOffConfig()),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tel, err := componenttest.SetupTelemetry(defaultID)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) })

set := exporter.Settings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, tt.queueOptions...)
require.NoError(t, err)
ocs := be.ObsrepSender.(*observabilityConsumerSender)

wantRequests := 10
reqs := make([]*mockRequest, 0, 10)
for i := 0; i < wantRequests; i++ {
ocs.run(func() {
req := newMockRequest(2, nil)
reqs = append(reqs, req)
require.NoError(t, be.Send(context.Background(), req))
})
}

// expect queue to be full
require.Error(t, be.Send(context.Background(), newMockRequest(2, nil)))

require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

// Wait until all batches received
ocs.awaitAsyncProcessing()

require.Len(t, reqs, wantRequests)
for _, req := range reqs {
req.checkNumRequests(t, 1)
}

ocs.checkSendItemsCount(t, 2*wantRequests)
ocs.checkDroppedItemsCount(t, 0)
})
}
}
// func TestQueuedRetryHappyPath(t *testing.T) {
// tests := []struct {
// name string
// queueOptions []Option
// }{
// {
// name: "WithQueue",
// queueOptions: []Option{
// WithMarshaler(mockRequestMarshaler),
// WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
// WithQueue(QueueConfig{
// Enabled: true,
// QueueSize: 10,
// NumConsumers: 1,
// }),
// WithRetry(configretry.NewDefaultBackOffConfig()),
// },
// },
// {
// name: "WithRequestQueue/MemoryQueueFactory",
// queueOptions: []Option{
// WithRequestQueue(exporterqueue.Config{
// Enabled: true,
// QueueSize: 10,
// NumConsumers: 1,
// }, exporterqueue.NewMemoryQueueFactory[internal.Request]()),
// WithRetry(configretry.NewDefaultBackOffConfig()),
// },
// },
// {
// name: "WithRequestQueue/PersistentQueueFactory",
// queueOptions: []Option{
// WithRequestQueue(exporterqueue.Config{
// Enabled: true,
// QueueSize: 10,
// NumConsumers: 1,
// }, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})),
// WithRetry(configretry.NewDefaultBackOffConfig()),
// },
// },
// {
// name: "WithRequestQueue/PersistentQueueFactory/RequestsLimit",
// queueOptions: []Option{
// WithRequestQueue(exporterqueue.Config{
// Enabled: true,
// QueueSize: 10,
// NumConsumers: 1,
// }, exporterqueue.NewPersistentQueueFactory[internal.Request](nil, exporterqueue.PersistentQueueSettings[internal.Request]{})),
// WithRetry(configretry.NewDefaultBackOffConfig()),
// },
// },
// }
// for _, tt := range tests {
// t.Run(tt.name, func(t *testing.T) {
// tel, err := componenttest.SetupTelemetry(defaultID)
// require.NoError(t, err)
// t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) })

// set := exporter.Settings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
// be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, tt.queueOptions...)
// require.NoError(t, err)
// ocs := be.ObsrepSender.(*observabilityConsumerSender)

// wantRequests := 10
// reqs := make([]*mockRequest, 0, 10)
// for i := 0; i < wantRequests; i++ {
// ocs.run(func() {
// req := newMockRequest(2, nil)
// reqs = append(reqs, req)
// require.NoError(t, be.Send(context.Background(), req))
// })
// }

// // expect queue to be full
// require.Error(t, be.Send(context.Background(), newMockRequest(2, nil)))

// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
// t.Cleanup(func() {
// assert.NoError(t, be.Shutdown(context.Background()))
// })

// // Wait until all batches received
// ocs.awaitAsyncProcessing()

// require.Len(t, reqs, wantRequests)
// for _, req := range reqs {
// req.checkNumRequests(t, 1)
// }

// ocs.checkSendItemsCount(t, 2*wantRequests)
// ocs.checkDroppedItemsCount(t, 0)
// })
// }
// }

func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
dataTypes := []pipeline.Signal{pipeline.SignalLogs, pipeline.SignalTraces, pipeline.SignalMetrics}
Expand Down
Loading

0 comments on commit 3617a1d

Please sign in to comment.