diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 0a3238d3561c..07ff35418c4a 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -98,19 +98,11 @@ func WithQueue(config QueueConfig) Option { o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{ + o.queueCfg = config + o.queueFactory = exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{ Marshaler: o.marshaler, Unmarshaler: o.unmarshaler, }) - q := qf(context.Background(), exporterqueue.Settings{ - DataType: o.signal, - ExporterSettings: o.set, - }, exporterqueue.Config{ - Enabled: config.Enabled, - NumConsumers: config.NumConsumers, - QueueSize: config.QueueSize, - }) - o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep) return nil } } @@ -128,7 +120,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - o.queueCfg = cfg + o.exporterQueueCfg = cfg o.queueFactory = queueFactory return nil } @@ -233,7 +225,10 @@ type baseExporter struct { consumerOptions []consumer.Option - queueCfg exporterqueue.Config + // QueueConfig and exporterqueue.Config come in from different APIs. + queueCfg QueueConfig + exporterQueueCfg exporterqueue.Config + queueFactory exporterqueue.Factory[Request] batcherCfg exporterbatcher.Config batcherOpts []BatcherOption @@ -265,28 +260,42 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre return nil, err } - if be.batcherCfg.Enabled { - bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc) - for _, opt := range be.batcherOpts { - err = multierr.Append(err, opt(bs)) - } - if bs.mergeFunc == nil || bs.mergeSplitFunc == nil { - err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")) - } - be.batchSender = bs + // Set up queue sender if queue is enabled + if be.marshaler != nil && be.unmarshaler != nil && be.queueCfg.Enabled { + q := be.queueFactory(context.Background(), exporterqueue.Settings{ + DataType: be.signal, + ExporterSettings: be.set, + }, exporterqueue.Config{ + Enabled: be.queueCfg.Enabled, + NumConsumers: be.queueCfg.NumConsumers, + QueueSize: be.queueCfg.QueueSize, + }) + be.queueSender = newQueueSender(q, be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep) } - if be.queueCfg.Enabled { + if be.marshaler == nil && be.unmarshaler == nil && be.exporterQueueCfg.Enabled { set := exporterqueue.Settings{ DataType: be.signal, ExporterSettings: be.set, } - be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.queueCfg), be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep) + be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.exporterQueueCfg), be.set, be.exporterQueueCfg.NumConsumers, be.exportFailureMessage, be.obsrep) for _, op := range options { err = multierr.Append(err, op(be)) } } + // Set up batch sender if batching is enabled + if be.batcherCfg.Enabled { + bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc) + for _, opt := range be.batcherOpts { + err = multierr.Append(err, opt(bs)) + } + if bs.mergeFunc == nil || bs.mergeSplitFunc == nil { + err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")) + } + be.batchSender = bs + } + if err != nil { return nil, err }