From d5c951a6ce06bc7027b4ba921cc54b9e9c201999 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Sun, 6 Oct 2024 20:03:39 -0700 Subject: [PATCH] [chore] Remove duplicate queue initialization logic (#11375) Signed-off-by: Bogdan Drutu --- .../exporterhelper/internal/base_exporter.go | 48 ++++++++----------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index d7f93136457..f095bd558a2 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -61,11 +61,10 @@ type BaseExporter struct { ConsumerOptions []consumer.Option - QueueCfg QueueConfig - ExporterQueueCfg exporterqueue.Config - QueueFactory exporterqueue.Factory[internal.Request] - BatcherCfg exporterbatcher.Config - BatcherOpts []BatcherOption + queueCfg exporterqueue.Config + queueFactory exporterqueue.Factory[internal.Request] + BatcherCfg exporterbatcher.Config + BatcherOpts []BatcherOption } func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) { @@ -94,24 +93,15 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe return nil, err } - if be.QueueCfg.Enabled { - q := be.QueueFactory(context.Background(), exporterqueue.Settings{ - Signal: be.Signal, - ExporterSettings: be.Set, - }, exporterqueue.Config{ - Enabled: be.QueueCfg.Enabled, - NumConsumers: be.QueueCfg.NumConsumers, - QueueSize: be.QueueCfg.QueueSize, - }) - be.QueueSender = NewQueueSender(q, be.Set, be.QueueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep) - } - - if be.ExporterQueueCfg.Enabled { - set := exporterqueue.Settings{ - Signal: be.Signal, - ExporterSettings: be.Set, - } - be.QueueSender = NewQueueSender(be.QueueFactory(context.Background(), set, be.ExporterQueueCfg), be.Set, be.ExporterQueueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep) + if be.queueCfg.Enabled { + q := be.queueFactory( + context.Background(), + exporterqueue.Settings{ + Signal: be.Signal, + ExporterSettings: be.Set, + }, + be.queueCfg) + be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep) for _, op := range options { err = multierr.Append(err, op(be)) } @@ -243,8 +233,12 @@ func WithQueue(config QueueConfig) Option { o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - o.QueueCfg = config - o.QueueFactory = exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{ + o.queueCfg = exporterqueue.Config{ + Enabled: config.Enabled, + NumConsumers: config.NumConsumers, + QueueSize: config.QueueSize, + } + o.queueFactory = exporterqueue.NewPersistentQueueFactory[internal.Request](config.StorageID, exporterqueue.PersistentQueueSettings[internal.Request]{ Marshaler: o.Marshaler, Unmarshaler: o.Unmarshaler, }) @@ -265,8 +259,8 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - o.ExporterQueueCfg = cfg - o.QueueFactory = queueFactory + o.queueCfg = cfg + o.queueFactory = queueFactory return nil } }