Skip to content

Commit

Permalink
Delay queue sender initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Sep 16, 2024
1 parent 2c0941f commit 0b491c6
Showing 1 changed file with 32 additions and 23 deletions.
55 changes: 32 additions & 23 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,11 @@ func WithQueue(config QueueConfig) Option {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
o.queueCfg = config
o.queueFactory = exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
Marshaler: o.marshaler,
Unmarshaler: o.unmarshaler,
})
q := qf(context.Background(), exporterqueue.Settings{
DataType: o.signal,
ExporterSettings: o.set,
}, exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
}
}
Expand All @@ -128,7 +120,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.queueCfg = cfg
o.exporterQueueCfg = cfg
o.queueFactory = queueFactory
return nil
}
Expand Down Expand Up @@ -233,7 +225,10 @@ type baseExporter struct {

consumerOptions []consumer.Option

queueCfg exporterqueue.Config
// QueueConfig and exporterqueue.Config come in from different APIs.
queueCfg QueueConfig
exporterQueueCfg exporterqueue.Config

queueFactory exporterqueue.Factory[Request]
batcherCfg exporterbatcher.Config
batcherOpts []BatcherOption
Expand Down Expand Up @@ -265,28 +260,42 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre
return nil, err
}

if be.batcherCfg.Enabled {
bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc)
for _, opt := range be.batcherOpts {
err = multierr.Append(err, opt(bs))
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
be.batchSender = bs
// Set up queue sender if queue is enabled
if be.marshaler != nil && be.unmarshaler != nil && be.queueCfg.Enabled {
q := be.queueFactory(context.Background(), exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}, exporterqueue.Config{
Enabled: be.queueCfg.Enabled,
NumConsumers: be.queueCfg.NumConsumers,
QueueSize: be.queueCfg.QueueSize,
})
be.queueSender = newQueueSender(q, be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
}

if be.queueCfg.Enabled {
if be.marshaler == nil && be.unmarshaler == nil && be.exporterQueueCfg.Enabled {
set := exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}
be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.queueCfg), be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.exporterQueueCfg), be.set, be.exporterQueueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
for _, op := range options {
err = multierr.Append(err, op(be))
}
}

// Set up batch sender if batching is enabled
if be.batcherCfg.Enabled {
bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc)
for _, opt := range be.batcherOpts {
err = multierr.Append(err, opt(bs))
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
be.batchSender = bs
}

if err != nil {
return nil, err
}
Expand Down

0 comments on commit 0b491c6

Please sign in to comment.