Skip to content

Commit

Permalink
POC
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Sep 13, 2024
1 parent 5fc39ba commit b6125de
Show file tree
Hide file tree
Showing 9 changed files with 1,055 additions and 779 deletions.
1,414 changes: 707 additions & 707 deletions exporter/exporterhelper/batch_sender_test.go

Large diffs are not rendered by default.

65 changes: 39 additions & 26 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,11 @@ func WithQueue(config QueueConfig) Option {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
o.queueCfgLegacy = config
o.queueFactory = exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
Marshaler: o.marshaler,
Unmarshaler: o.unmarshaler,
})
q := qf(context.Background(), exporterqueue.Settings{
DataType: o.signal,
ExporterSettings: o.set,
}, exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
}
}
Expand Down Expand Up @@ -233,10 +225,11 @@ type baseExporter struct {

consumerOptions []consumer.Option

queueCfg exporterqueue.Config
queueFactory exporterqueue.Factory[Request]
batcherCfg exporterbatcher.Config
batcherOpts []BatcherOption
queueCfgLegacy QueueConfig
queueCfg exporterqueue.Config
queueFactory exporterqueue.Factory[Request]
batcherCfg exporterbatcher.Config
batcherOpts []BatcherOption
}

func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
Expand Down Expand Up @@ -265,7 +258,38 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre
return nil, err
}

if be.batcherCfg.Enabled {
if be.marshaler != nil && be.unmarshaler != nil {
if be.queueCfgLegacy.Enabled {
q := be.queueFactory(context.Background(), exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}, exporterqueue.Config{
Enabled: be.queueCfgLegacy.Enabled,
NumConsumers: be.queueCfgLegacy.NumConsumers,
QueueSize: be.queueCfgLegacy.QueueSize,
})
be.queueSender = newQueueSender(q, be.batcherCfg, be.set, be.queueCfgLegacy.NumConsumers, be.exportFailureMessage, be.obsrep)
}
}

if be.marshaler == nil && be.unmarshaler == nil {
if be.queueCfg.Enabled {
set := exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}

be.queueSender = newQueueSender(
be.queueFactory(context.Background(), set, be.queueCfg), be.batcherCfg,
be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}
}

// Initialize BatchSender if batcher is enabled but queue is not enabled.
if be.batcherCfg.Enabled && be.queueSender == nil {
bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc)
for _, opt := range be.batcherOpts {
err = multierr.Append(err, opt(bs))
Expand All @@ -276,17 +300,6 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre
be.batchSender = bs
}

if be.queueCfg.Enabled {
set := exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}
be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.queueCfg), be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

if err != nil {
return nil, err
}
Expand Down
46 changes: 40 additions & 6 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte
import (
"context"
"errors"
"math"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand All @@ -15,6 +16,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
Expand Down Expand Up @@ -79,36 +81,65 @@ type queueSender struct {
queue exporterqueue.Queue[Request]
numConsumers int
traceAttribute attribute.KeyValue
consumers *queue.Consumers[Request]
batcher *queue.Batcher
// consumers *queue.Consumers[Request]

obsrep *obsReport
exporterID component.ID
}

func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numConsumers int,
func newQueueSender(q exporterqueue.Queue[Request], batcherCfg exporterbatcher.Config, set exporter.Settings, numConsumers int,
exportFailureMessage string, obsrep *obsReport) *queueSender {

if !batcherCfg.Enabled {
batcherCfg.MinSizeConfig = exporterbatcher.MinSizeConfig{
MinSizeItems: 0,
}
batcherCfg.MaxSizeConfig = exporterbatcher.MaxSizeConfig{
MaxSizeItems: math.MaxInt,
}
batcherCfg.FlushTimeout = 0
}

qs := &queueSender{
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(internal.ExporterKey, set.ID.String()),
obsrep: obsrep,
exporterID: set.ID,
}
consumeFunc := func(ctx context.Context, req Request) error {

// consumeFunc := func(ctx context.Context, req Request) error {
// err := qs.nextSender.send(ctx, req)
// if err != nil {
// set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
// zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
// }
// return err
// }

// qs.consumers = queue.NewQueueConsumers[Request](q, numConsumers, consumeFunc)

exportFunc := func(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)
if err != nil {
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
return err
}
qs.consumers = queue.NewQueueConsumers[Request](q, numConsumers, consumeFunc)

qs.batcher = queue.NewBatcher(batcherCfg, q, numConsumers, exportFunc)
return qs
}

// Start is invoked during service startup.
func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
if err := qs.consumers.Start(ctx, host); err != nil {
// if err := qs.consumers.Start(ctx, host); err != nil {
// return err
// }

if err := qs.batcher.Start(ctx, host); err != nil {
return err
}

Expand All @@ -125,7 +156,10 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
func (qs *queueSender) Shutdown(ctx context.Context) error {
// Stop the queue and consumers, this will drain the queue and will call the retry (which is stopped) that will only
// try once every request.
return qs.consumers.Shutdown(ctx)

// return qs.consumers.Shutdown(ctx)

return qs.batcher.Shutdown(ctx)
}

// send implements the requestSender interface. It puts the request in the queue.
Expand Down
6 changes: 5 additions & 1 deletion exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
Expand Down Expand Up @@ -437,7 +438,10 @@ func TestQueueSenderNoStartShutdown(t *testing.T) {
exporterCreateSettings: exportertest.NewNopSettings(),
})
assert.NoError(t, err)
qs := newQueueSender(queue, set, 1, "", obsrep)

batcherCfg := exporterbatcher.NewDefaultConfig()
batcherCfg.Enabled = false
qs := newQueueSender(queue, batcherCfg, set, 1, "", obsrep)
assert.NoError(t, qs.Shutdown(context.Background()))
}

Expand Down
166 changes: 166 additions & 0 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"math"
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

type batch struct {
ctx context.Context
req internal.Request
onExportFinished func(error)
}

type Batcher struct {
cfg exporterbatcher.Config
mergeFunc exporterbatcher.BatchMergeFunc[internal.Request]
mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[internal.Request]

queue Queue[internal.Request]
numWorkers int
exportFunc func(context.Context, internal.Request) error
stopWG sync.WaitGroup

mu sync.Mutex
lastFlushed time.Time
pendingBatches []batch
timer *time.Timer
}

func NewBatcher(cfg exporterbatcher.Config, queue Queue[internal.Request], numWorkers int, exportFunc func(context.Context, internal.Request) error) *Batcher {
return &Batcher{
cfg: cfg,
queue: queue,
numWorkers: numWorkers,
exportFunc: exportFunc,
stopWG: sync.WaitGroup{},
pendingBatches: make([]batch, 1),
}
}

func (qb *Batcher) flushIfNecessary() {
qb.mu.Lock()

if qb.pendingBatches[0].req == nil {
qb.mu.Unlock()
return
}

if time.Since(qb.lastFlushed) < qb.cfg.FlushTimeout && qb.pendingBatches[0].req.ItemsCount() < qb.cfg.MinSizeItems {
qb.mu.Unlock()
return
}

flushedBatch := qb.pendingBatches[0]
qb.pendingBatches = qb.pendingBatches[1:]
if len(qb.pendingBatches) == 0 {
qb.pendingBatches = append(qb.pendingBatches, batch{})
}

qb.lastFlushed = time.Now()

if qb.cfg.FlushTimeout > 0 {
qb.timer.Reset(qb.cfg.FlushTimeout)
}

qb.mu.Unlock()

err := qb.exportFunc(flushedBatch.ctx, flushedBatch.req)
if flushedBatch.onExportFinished != nil {
flushedBatch.onExportFinished(err)
}
}

func (qb *Batcher) push(req internal.Request, onExportFinished func(error)) error {
qb.mu.Lock()
defer qb.mu.Unlock()

idx := len(qb.pendingBatches) - 1
if qb.pendingBatches[idx].req == nil {
qb.pendingBatches[idx].req = req
qb.pendingBatches[idx].ctx = context.Background()
qb.pendingBatches[idx].onExportFinished = onExportFinished
} else {
reqs, err := qb.mergeSplitFunc(context.Background(),
qb.cfg.MaxSizeConfig,
qb.pendingBatches[idx].req, req)
if err != nil || len(reqs) == 0 {
return err
}

for offset, newReq := range reqs {
if offset != 0 {
qb.pendingBatches = append(qb.pendingBatches, batch{})
}
qb.pendingBatches[idx+offset].req = newReq
}
}
return nil
}

// Start ensures that queue and all consumers are started.
func (qb *Batcher) Start(ctx context.Context, host component.Host) error {
if err := qb.queue.Start(ctx, host); err != nil {
return err
}

if qb.cfg.FlushTimeout > 0 {
qb.timer = time.NewTimer(qb.cfg.FlushTimeout)
} else {
qb.timer = time.NewTimer(math.MaxInt)
qb.timer.Stop()
}

allocReader := make(chan bool, 10)

go func() {
allocReader <- true
}()

var startWG sync.WaitGroup
for i := 0; i < qb.numWorkers; i++ {
qb.stopWG.Add(1)
startWG.Add(1)
go func() {
startWG.Done()
defer qb.stopWG.Done()
for {
select {
case <-qb.timer.C:
qb.flushIfNecessary()
case <-allocReader:
req, ok, onProcessingFinished := qb.queue.ClaimAndRead(func() {
allocReader <- true
})
if !ok {
return
}

qb.push(req, onProcessingFinished) // Handle error
qb.flushIfNecessary()
}
}
}()
}
startWG.Wait()

return nil
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *Batcher) Shutdown(ctx context.Context) error {
if err := qb.queue.Shutdown(ctx); err != nil {
return err
}
qb.stopWG.Wait()
return nil
}
Loading

0 comments on commit b6125de

Please sign in to comment.