Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter] [chore] Initialize batchSender and queueSender after configuration - #2 #11184

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 32 additions & 23 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,11 @@
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
o.queueCfg = config
o.queueFactory = exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
Marshaler: o.marshaler,
Unmarshaler: o.unmarshaler,
})
q := qf(context.Background(), exporterqueue.Settings{
DataType: o.signal,
ExporterSettings: o.set,
}, exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
})
}
Expand All @@ -136,7 +128,7 @@
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.queueCfg = cfg
o.exporterQueueCfg = cfg
o.queueFactory = queueFactory
return nil
})
Expand Down Expand Up @@ -261,7 +253,10 @@

consumerOptions []consumer.Option

queueCfg exporterqueue.Config
// QueueConfig and exporterqueue.Config come in from different APIs.
queueCfg QueueConfig
exporterQueueCfg exporterqueue.Config

queueFactory exporterqueue.Factory[Request]
batcherCfg exporterbatcher.Config
batcherOpts []BatcherOption
Expand Down Expand Up @@ -293,6 +288,31 @@
return nil, err
}

// Set up queue sender if queue is enabled
if be.marshaler != nil && be.unmarshaler != nil && be.queueCfg.Enabled {
q := be.queueFactory(context.Background(), exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}, exporterqueue.Config{
Enabled: be.queueCfg.Enabled,
NumConsumers: be.queueCfg.NumConsumers,
QueueSize: be.queueCfg.QueueSize,
})
be.queueSender = newQueueSender(q, be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
}

if be.marshaler == nil && be.unmarshaler == nil && be.exporterQueueCfg.Enabled {
set := exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}
be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.exporterQueueCfg), be.set, be.exporterQueueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
for _, op := range options {
err = multierr.Append(err, op(be))

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / Integration test

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / test-coverage

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-0)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (pkg)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-3)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (connector)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (cmd-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (other)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (exporter-1)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (internal)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / windows-unittest

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

invalid operation: cannot call non-function op (variable of type Option)

Check failure on line 311 in exporter/exporterhelper/common.go

View workflow job for this annotation

GitHub Actions / contrib-tests-matrix (receiver-2)

invalid operation: cannot call non-function op (variable of type Option)
}
}

// Set up batch sender if batching is enabled
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving batch sender initialization down because in the future we will only initialize batch sender if queue sender is not initialized.

if be.batcherCfg.Enabled {
bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc)
for _, opt := range be.batcherOpts {
Expand All @@ -304,17 +324,6 @@
be.batchSender = bs
}

if be.queueCfg.Enabled {
set := exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}
be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.queueCfg), be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
for _, op := range options {
err = multierr.Append(err, op.apply(be))
}
}

if err != nil {
return nil, err
}
Expand Down
Loading