From 527df61589fe1f3e1e307f6bf5ba56dbc7eea1b2 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Tue, 15 Oct 2024 18:15:07 -0700 Subject: [PATCH] [exporter] Disable the API to pass in configurations using a callback that operates on batch sender (#11448) #### Description As part of the effort to solve https://github.com/open-telemetry/opentelemetry-collector/issues/10368, we no longer guarantee to initialize a `batchSender` when `batcher` is enabled. Therefore, we would like to remove the interface to set `mergeFunc` and `mergeSplitFunc` as a callback that operates on `batchSender`. Instead, users should use the alternative `WithBatchFuncs` that is a callback that operates `baseExporter`. Context: https://github.com/open-telemetry/opentelemetry-collector/pull/11414 #### Link to tracking issue https://github.com/open-telemetry/opentelemetry-collector/issues/8122 https://github.com/open-telemetry/opentelemetry-collector/issues/10368 --------- Co-authored-by: Bogdan Drutu --- .chloggen/disable-batch-option.yaml | 28 ++++++++ exporter/exporterhelper/common.go | 20 +++--- .../exporterhelper/internal/base_exporter.go | 25 +------ .../internal/batch_sender_test.go | 71 ++++++++++--------- 4 files changed, 75 insertions(+), 69 deletions(-) create mode 100644 .chloggen/disable-batch-option.yaml diff --git a/.chloggen/disable-batch-option.yaml b/.chloggen/disable-batch-option.yaml new file mode 100644 index 00000000000..bf077d21888 --- /dev/null +++ b/.chloggen/disable-batch-option.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Disables setting batch option to batch sender directly. + +# One or more tracking issues or pull requests related to the change +issues: [10368] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + Removed WithRequestBatchFuncs(BatcherOption) in favor of WithBatchFuncs(Option), where | + BatcherOption is a function that operates on batch sender and Option is one that operates | + on BaseExporter + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index de9822ee6aa..7f396f40776 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -61,19 +61,19 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { return internal.WithCapabilities(capabilities) } -// BatcherOption apply changes to batcher sender. -type BatcherOption = internal.BatcherOption - -// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types. -func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) BatcherOption { - return internal.WithRequestBatchFuncs(mf, msf) -} - // WithBatcher enables batching for an exporter based on custom request types. // For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and // WithRequestBatchFuncs provided. // This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option { - return internal.WithBatcher(cfg, opts...) +func WithBatcher(cfg exporterbatcher.Config) Option { + return internal.WithBatcher(cfg) +} + +// WithBatchFuncs enables setting custom batch merge functions. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], + msf exporterbatcher.BatchMergeSplitFunc[Request]) Option { + return internal.WithBatchFuncs(mf, msf) } diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index f095bd558a2..1aebb318c8f 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -29,9 +29,6 @@ type ObsrepSenderFactory = func(obsrep *ObsReport) RequestSender // Option apply changes to BaseExporter. type Option func(*BaseExporter) error -// BatcherOption apply changes to batcher sender. -type BatcherOption func(*BatchSender) error - type BaseExporter struct { component.StartFunc component.ShutdownFunc @@ -64,7 +61,6 @@ type BaseExporter struct { queueCfg exporterqueue.Config queueFactory exporterqueue.Factory[internal.Request] BatcherCfg exporterbatcher.Config - BatcherOpts []BatcherOption } func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSenderFactory, options ...Option) (*BaseExporter, error) { @@ -109,9 +105,6 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe if be.BatcherCfg.Enabled { bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc) - for _, opt := range be.BatcherOpts { - err = multierr.Append(err, opt(bs)) - } if bs.mergeFunc == nil || bs.mergeSplitFunc == nil { err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")) } @@ -275,30 +268,14 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } -// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types. -func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) BatcherOption { - return func(bs *BatchSender) error { - if mf == nil || msf == nil { - return fmt.Errorf("WithRequestBatchFuncs must be provided with non-nil functions") - } - if bs.mergeFunc != nil || bs.mergeSplitFunc != nil { - return fmt.Errorf("WithRequestBatchFuncs can only be used once with request-based exporters") - } - bs.mergeFunc = mf - bs.mergeSplitFunc = msf - return nil - } -} - // WithBatcher enables batching for an exporter based on custom request types. // For now, it can be used only with the New[Traces|Metrics|Logs]RequestExporter exporter helpers and // WithRequestBatchFuncs provided. // This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option { +func WithBatcher(cfg exporterbatcher.Config) Option { return func(o *BaseExporter) error { o.BatcherCfg = cfg - o.BatcherOpts = opts return nil } } diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index 456218fd914..f6d53bca0e0 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -34,20 +34,20 @@ func TestBatchSender_Merge(t *testing.T) { }{ { name: "split_disabled", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + batcherOption: WithBatcher(cfg), }, { name: "split_high_limit", batcherOption: func() Option { c := cfg c.MaxSizeItems = 1000 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + return WithBatcher(c) }(), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) + be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -94,14 +94,14 @@ func TestBatchSender_BatchExportError(t *testing.T) { }{ { name: "merge_only", - batcherOption: WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + batcherOption: WithBatcher(cfg), }, { name: "merge_without_split_triggered", batcherOption: func() Option { c := cfg c.MaxSizeItems = 200 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + return WithBatcher(c) }(), }, { @@ -109,7 +109,7 @@ func TestBatchSender_BatchExportError(t *testing.T) { batcherOption: func() Option { c := cfg c.MaxSizeItems = 20 - return WithBatcher(c, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + return WithBatcher(c) }(), expectedRequests: 1, expectedItems: 20, @@ -117,7 +117,7 @@ func TestBatchSender_BatchExportError(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption) + be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -153,7 +153,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { cfg.MinSizeItems = 5 cfg.MaxSizeItems = 10 cfg.FlushTimeout = 100 * time.Millisecond - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -190,7 +190,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { func TestBatchSender_Shutdown(t *testing.T) { batchCfg := exporterbatcher.NewDefaultConfig() batchCfg.MinSizeItems = 10 - be := queueBatchExporter(t, WithBatcher(batchCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + be := queueBatchExporter(t, WithBatcher(batchCfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -212,7 +212,8 @@ func TestBatchSender_Disabled(t *testing.T) { cfg.Enabled = false cfg.MaxSizeItems = 5 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(cfg)) require.NotNil(t, be) require.NoError(t, err) @@ -241,7 +242,7 @@ func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { cfg := exporterbatcher.NewDefaultConfig() cfg.FlushTimeout = 50 * time.Millisecond cfg.MaxSizeItems = 20 - be := queueBatchExporter(t, WithBatcher(cfg, WithRequestBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc))) + be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -260,8 +261,8 @@ func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { func TestBatchSender_PostShutdown(t *testing.T) { be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(exporterbatcher.NewDefaultConfig())) require.NotNil(t, be) require.NoError(t, err) assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -322,7 +323,8 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { qCfg := exporterqueue.NewDefaultConfig() qCfg.NumConsumers = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(tt.batcherCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(tt.batcherCfg), WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]())) require.NotNil(t, be) require.NoError(t, err) @@ -377,7 +379,8 @@ func TestBatchSender_BatchBlocking(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 3 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -407,7 +410,8 @@ func TestBatchSender_BatchCancelled(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -442,7 +446,8 @@ func TestBatchSender_DrainActiveRequests(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -487,18 +492,9 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { opts: []Option{WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, expectedErr: false, }, - { - name: "funcs_set_twice", - opts: []Option{ - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, - fakeBatchMergeSplitFunc)), - }, - expectedErr: true, - }, { name: "nil_funcs", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(nil, nil))}, + opts: []Option{WithBatchFuncs(nil, nil), WithBatcher(exporterbatcher.NewDefaultConfig())}, expectedErr: true, }, } @@ -518,7 +514,8 @@ func TestBatchSender_WithBatcherOption(t *testing.T) { func TestBatchSender_UnstartedShutdown(t *testing.T) { be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig(), WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(exporterbatcher.NewDefaultConfig())) require.NoError(t, err) err = be.Shutdown(context.Background()) @@ -542,7 +539,8 @@ func TestBatchSender_ShutdownDeadlock(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -578,7 +576,8 @@ func TestBatchSenderWithTimeout(t *testing.T) { tCfg := NewDefaultTimeoutConfig() tCfg.Timeout = 50 * time.Millisecond be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)), + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg), WithTimeout(tCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -637,7 +636,8 @@ func TestBatchSenderTimerResetNoConflict(t *testing.T) { bCfg.MinSizeItems = 8 bCfg.FlushTimeout = 50 * time.Millisecond be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) sink := newFakeRequestSink() @@ -668,7 +668,8 @@ func TestBatchSenderTimerFlush(t *testing.T) { bCfg.MinSizeItems = 8 bCfg.FlushTimeout = 100 * time.Millisecond be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc))) + WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), + WithBatcher(bCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) sink := newFakeRequestSink() @@ -703,9 +704,9 @@ func TestBatchSenderTimerFlush(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) } -func queueBatchExporter(t *testing.T, batchOption Option) *BaseExporter { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, batchOption, - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) +func queueBatchExporter(t *testing.T, opts ...Option) *BaseExporter { + opts = append(opts, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, opts...) require.NotNil(t, be) require.NoError(t, err) return be