diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 5b8a2a51767..6e609af8810 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -106,19 +106,11 @@ func WithQueue(config QueueConfig) Option { o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{ + o.queueCfg = config + o.queueFactory = exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{ Marshaler: o.marshaler, Unmarshaler: o.unmarshaler, }) - q := qf(context.Background(), exporterqueue.Settings{ - DataType: o.signal, - ExporterSettings: o.set, - }, exporterqueue.Config{ - Enabled: config.Enabled, - NumConsumers: config.NumConsumers, - QueueSize: config.QueueSize, - }) - o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep) return nil }) } @@ -136,7 +128,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - o.queueCfg = cfg + o.exporterQueueCfg = cfg o.queueFactory = queueFactory return nil }) @@ -261,7 +253,10 @@ type baseExporter struct { consumerOptions []consumer.Option - queueCfg exporterqueue.Config + // QueueConfig and exporterqueue.Config come in from different APIs. + queueCfg QueueConfig + exporterQueueCfg exporterqueue.Config + queueFactory exporterqueue.Factory[Request] batcherCfg exporterbatcher.Config batcherOpts []BatcherOption @@ -293,6 +288,31 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre return nil, err } + // Set up queue sender if queue is enabled + if be.marshaler != nil && be.unmarshaler != nil && be.queueCfg.Enabled { + q := be.queueFactory(context.Background(), exporterqueue.Settings{ + DataType: be.signal, + ExporterSettings: be.set, + }, exporterqueue.Config{ + Enabled: be.queueCfg.Enabled, + NumConsumers: be.queueCfg.NumConsumers, + QueueSize: be.queueCfg.QueueSize, + }) + be.queueSender = newQueueSender(q, be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep) + } + + if be.marshaler == nil && be.unmarshaler == nil && be.exporterQueueCfg.Enabled { + set := exporterqueue.Settings{ + DataType: be.signal, + ExporterSettings: be.set, + } + be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.exporterQueueCfg), be.set, be.exporterQueueCfg.NumConsumers, be.exportFailureMessage, be.obsrep) + for _, op := range options { + err = multierr.Append(err, op(be)) + } + } + + // Set up batch sender if batching is enabled if be.batcherCfg.Enabled { bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc) for _, opt := range be.batcherOpts { @@ -304,17 +324,6 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre be.batchSender = bs } - if be.queueCfg.Enabled { - set := exporterqueue.Settings{ - DataType: be.signal, - ExporterSettings: be.set, - } - be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.queueCfg), be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep) - for _, op := range options { - err = multierr.Append(err, op.apply(be)) - } - } - if err != nil { return nil, err }