Skip to content

Commit

Permalink
[chore] Remove duplicate queue initialization logic (open-telemetry#1…
Browse files Browse the repository at this point in the history
…1375)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored and jackgopack4 committed Oct 8, 2024
1 parent 4301658 commit d5c951a
Showing 1 changed file with 21 additions and 27 deletions.
48 changes: 21 additions & 27 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ type BaseExporter struct {

ConsumerOptions []consumer.Option

QueueCfg QueueConfig
ExporterQueueCfg exporterqueue.Config
QueueFactory exporterqueue.Factory[internal.Request]
BatcherCfg exporterbatcher.Config
BatcherOpts []BatcherOption
queueCfg exporterqueue.Config
queueFactory exporterqueue.Factory[internal.Request]
BatcherCfg exporterbatcher.Config
BatcherOpts []BatcherOption
}

func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) {
Expand Down Expand Up @@ -94,24 +93,15 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
return nil, err
}

if be.QueueCfg.Enabled {
q := be.QueueFactory(context.Background(), exporterqueue.Settings{
Signal: be.Signal,
ExporterSettings: be.Set,
}, exporterqueue.Config{
Enabled: be.QueueCfg.Enabled,
NumConsumers: be.QueueCfg.NumConsumers,
QueueSize: be.QueueCfg.QueueSize,
})
be.QueueSender = NewQueueSender(q, be.Set, be.QueueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep)
}

if be.ExporterQueueCfg.Enabled {
set := exporterqueue.Settings{
Signal: be.Signal,
ExporterSettings: be.Set,
}
be.QueueSender = NewQueueSender(be.QueueFactory(context.Background(), set, be.ExporterQueueCfg), be.Set, be.ExporterQueueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep)
if be.queueCfg.Enabled {
q := be.queueFactory(
context.Background(),
exporterqueue.Settings{
Signal: be.Signal,
ExporterSettings: be.Set,
},
be.queueCfg)
be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep)
for _, op := range options {
err = multierr.Append(err, op(be))
}
Expand Down Expand Up @@ -243,8 +233,12 @@ func WithQueue(config QueueConfig) Option {
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.QueueCfg = config
o.QueueFactory = exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{
o.queueCfg = exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
}
o.queueFactory = exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{
Marshaler: o.Marshaler,
Unmarshaler: o.Unmarshaler,
})
Expand All @@ -265,8 +259,8 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.ExporterQueueCfg = cfg
o.QueueFactory = queueFactory
o.queueCfg = cfg
o.queueFactory = queueFactory
return nil
}
}
Expand Down

0 comments on commit d5c951a

Please sign in to comment.