Skip to content

Commit

Permalink
Delay queue sender initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Sep 17, 2024
1 parent 8027d80 commit 3461207
Showing 1 changed file with 32 additions and 23 deletions.
55 changes: 32 additions & 23 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,11 @@ func WithQueue(config QueueConfig) Option {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
o.queueCfg = config
o.queueFactory = exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
Marshaler: o.marshaler,
Unmarshaler: o.unmarshaler,
})
q := qf(context.Background(), exporterqueue.Settings{
DataType: o.signal,
ExporterSettings: o.set,
}, exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
})
}
Expand All @@ -136,7 +128,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.queueCfg = cfg
o.exporterQueueCfg = cfg
o.queueFactory = queueFactory
return nil
})
Expand Down Expand Up @@ -261,7 +253,10 @@ type baseExporter struct {

consumerOptions []consumer.Option

queueCfg exporterqueue.Config
// QueueConfig and exporterqueue.Config come in from different APIs.
queueCfg QueueConfig
exporterQueueCfg exporterqueue.Config

queueFactory exporterqueue.Factory[Request]
batcherCfg exporterbatcher.Config
batcherOpts []BatcherOption
Expand Down Expand Up @@ -293,6 +288,31 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre
return nil, err
}

// Set up queue sender if queue is enabled
if be.marshaler != nil && be.unmarshaler != nil && be.queueCfg.Enabled {
q := be.queueFactory(context.Background(), exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}, exporterqueue.Config{
Enabled: be.queueCfg.Enabled,
NumConsumers: be.queueCfg.NumConsumers,
QueueSize: be.queueCfg.QueueSize,
})
be.queueSender = newQueueSender(q, be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
}

if be.marshaler == nil && be.unmarshaler == nil && be.exporterQueueCfg.Enabled {
set := exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}
be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.exporterQueueCfg), be.set, be.exporterQueueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
for _, op := range options {
err = multierr.Append(err, op(be))

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / Integration test

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / test-coverage

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / windows-unittest

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

invalid operation: cannot call non-function op (variable of type Option)
}
}

// Set up batch sender if batching is enabled
if be.batcherCfg.Enabled {
bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc)
for _, opt := range be.batcherOpts {
Expand All @@ -304,17 +324,6 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre
be.batchSender = bs
}

if be.queueCfg.Enabled {
set := exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}
be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.queueCfg), be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
for _, op := range options {
err = multierr.Append(err, op.apply(be))
}
}

if err != nil {
return nil, err
}
Expand Down

0 comments on commit 3461207

Please sign in to comment.