Skip to content

Commit

Permalink
Avoid public APIs with internal params in custom Options (#11054)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Sep 17, 2024
1 parent d5215c5 commit 8027d80
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 91 deletions.
20 changes: 20 additions & 0 deletions .chloggen/avoid-internal-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: options

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Avoid using private types in public APIs and also protect options to be implemented outside this module.

# One or more tracking issues or pull requests related to the change
issues: [11054]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
20 changes: 14 additions & 6 deletions config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,25 +381,33 @@ type toServerOptions struct {

// ToServerOption is an option to change the behavior of the HTTP server
// returned by ServerConfig.ToServer().
type ToServerOption func(opts *toServerOptions)
type ToServerOption interface {
apply(*toServerOptions)
}

type toServerOptionFunc func(*toServerOptions)

func (of toServerOptionFunc) apply(e *toServerOptions) {
of(e)
}

// WithErrorHandler overrides the HTTP error handler that gets invoked
// when there is a failure inside httpContentDecompressor.
func WithErrorHandler(e func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)) ToServerOption {
return func(opts *toServerOptions) {
return toServerOptionFunc(func(opts *toServerOptions) {
opts.errHandler = e
}
})
}

// WithDecoder provides support for additional decoders to be configured
// by the caller.
func WithDecoder(key string, dec func(body io.ReadCloser) (io.ReadCloser, error)) ToServerOption {
return func(opts *toServerOptions) {
return toServerOptionFunc(func(opts *toServerOptions) {
if opts.decoders == nil {
opts.decoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){}
}
opts.decoders[key] = dec
}
})
}

// ToServer creates an http.Server from settings object.
Expand All @@ -408,7 +416,7 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin

serverOpts := &toServerOptions{}
for _, o := range opts {
o(serverOpts)
o.apply(serverOpts)
}

if hss.MaxRequestBodySize <= 0 {
Expand Down
20 changes: 14 additions & 6 deletions confmap/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,29 @@ type retrievedSettings struct {
}

// RetrievedOption options to customize Retrieved values.
type RetrievedOption func(*retrievedSettings)
type RetrievedOption interface {
apply(*retrievedSettings)
}

type retrievedOptionFunc func(*retrievedSettings)

func (of retrievedOptionFunc) apply(e *retrievedSettings) {
of(e)
}

// WithRetrievedClose overrides the default Retrieved.Close function.
// The default Retrieved.Close function does nothing and always returns nil.
func WithRetrievedClose(closeFunc CloseFunc) RetrievedOption {
return func(settings *retrievedSettings) {
return retrievedOptionFunc(func(settings *retrievedSettings) {
settings.closeFunc = closeFunc
}
})
}

func withStringRepresentation(stringRepresentation string) RetrievedOption {
return func(settings *retrievedSettings) {
return retrievedOptionFunc(func(settings *retrievedSettings) {
settings.stringRepresentation = stringRepresentation
settings.isSetString = true
}
})
}

// NewRetrievedFromYAML returns a new Retrieved instance that contains the deserialized data from the yaml bytes.
Expand Down Expand Up @@ -162,7 +170,7 @@ func NewRetrieved(rawConf any, opts ...RetrievedOption) (*Retrieved, error) {
}
set := retrievedSettings{}
for _, opt := range opts {
opt(&set)
opt.apply(&set)
}
return &Retrieved{
rawConf: rawConf,
Expand Down
4 changes: 2 additions & 2 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Option = internal.Option
// WithCapabilities overrides the default GetCapabilities function for a processor.
// The default GetCapabilities function returns mutable capabilities.
func WithCapabilities(capabilities Capabilities) Option {
return func(o *internal.BaseImpl) {
return internal.OptionFunc(func(o *internal.BaseImpl) {
o.Cap = capabilities
}
})
}
12 changes: 10 additions & 2 deletions consumer/internal/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ type BaseImpl struct {
}

// Option to construct new consumers.
type Option func(*BaseImpl)
type Option interface {
apply(*BaseImpl)
}

type OptionFunc func(*BaseImpl)

func (of OptionFunc) apply(e *BaseImpl) {
of(e)
}

// Capabilities returns the capabilities of the component
func (bs BaseImpl) Capabilities() Capabilities {
Expand All @@ -35,7 +43,7 @@ func NewBaseImpl(options ...Option) *BaseImpl {
}

for _, op := range options {
op(bs)
op.apply(bs)
}

return bs
Expand Down
94 changes: 61 additions & 33 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,53 +44,61 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) {
type obsrepSenderFactory func(obsrep *obsReport) requestSender

// Option apply changes to baseExporter.
type Option func(*baseExporter) error
type Option interface {
apply(*baseExporter) error
}

type optionFunc func(*baseExporter) error

func (of optionFunc) apply(e *baseExporter) error {
return of(e)
}

// WithStart overrides the default Start function for an exporter.
// The default start function does nothing and always returns nil.
func WithStart(start component.StartFunc) Option {
return func(o *baseExporter) error {
return optionFunc(func(o *baseExporter) error {
o.StartFunc = start
return nil
}
})
}

// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown component.ShutdownFunc) Option {
return func(o *baseExporter) error {
return optionFunc(func(o *baseExporter) error {
o.ShutdownFunc = shutdown
return nil
}
})
}

// WithTimeout overrides the default TimeoutConfig for an exporter.
// The default TimeoutConfig is 5 seconds.
// WithTimeout overrides the default TimeoutSettings for an exporter.
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutConfig TimeoutConfig) Option {
return func(o *baseExporter) error {
return optionFunc(func(o *baseExporter) error {
o.timeoutSender.cfg = timeoutConfig
return nil
}
})
}

// WithRetry overrides the default configretry.BackOffConfig for an exporter.
// The default configretry.BackOffConfig is to disable retries.
func WithRetry(config configretry.BackOffConfig) Option {
return func(o *baseExporter) error {
return optionFunc(func(o *baseExporter) error {
if !config.Enabled {
o.exportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors."
return nil
}
o.retrySender = newRetrySender(config, o.set)
return nil
}
})
}

// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueConfig) Option {
return func(o *baseExporter) error {
return optionFunc(func(o *baseExporter) error {
if o.marshaler == nil || o.unmarshaler == nil {
return fmt.Errorf("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
Expand All @@ -112,15 +120,15 @@ func WithQueue(config QueueConfig) Option {
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
}
})
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
return func(o *baseExporter) error {
return optionFunc(func(o *baseExporter) error {
if o.marshaler != nil || o.unmarshaler != nil {
return fmt.Errorf("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")
}
Expand All @@ -131,25 +139,33 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
o.queueCfg = cfg
o.queueFactory = queueFactory
return nil
}
})
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
// The default is non-mutable data.
// TODO: Verify if we can change the default to be mutable as we do for processors.
func WithCapabilities(capabilities consumer.Capabilities) Option {
return func(o *baseExporter) error {
return optionFunc(func(o *baseExporter) error {
o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities))
return nil
}
})
}

// BatcherOption apply changes to batcher sender.
type BatcherOption func(*batchSender) error
type BatcherOption interface {
apply(*batchSender) error
}

type batcherOptionFunc func(*batchSender) error

func (of batcherOptionFunc) apply(e *batchSender) error {
return of(e)
}

// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types.
func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) BatcherOption {
return func(bs *batchSender) error {
return batcherOptionFunc(func(bs *batchSender) error {
if mf == nil || msf == nil {
return fmt.Errorf("WithRequestBatchFuncs must be provided with non-nil functions")
}
Expand All @@ -159,7 +175,7 @@ func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf expor
bs.mergeFunc = mf
bs.mergeSplitFunc = msf
return nil
}
})
}

// WithBatcher enables batching for an exporter based on custom request types.
Expand All @@ -168,39 +184,51 @@ func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf expor
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option {
return func(o *baseExporter) error {
o.batcherCfg = cfg
o.batcherOpts = opts
return optionFunc(func(o *baseExporter) error {
if !cfg.Enabled {
return nil
}

bs := newBatchSender(cfg, o.set, o.batchMergeFunc, o.batchMergeSplitfunc)
for _, opt := range opts {
if err := opt.apply(bs); err != nil {
return err
}
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
return fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")
}
o.batchSender = bs
return nil
}
})
}

// withMarshaler is used to set the request marshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func withMarshaler(marshaler exporterqueue.Marshaler[Request]) Option {
return func(o *baseExporter) error {
return optionFunc(func(o *baseExporter) error {
o.marshaler = marshaler
return nil
}
})
}

// withUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func withUnmarshaler(unmarshaler exporterqueue.Unmarshaler[Request]) Option {
return func(o *baseExporter) error {
return optionFunc(func(o *baseExporter) error {
o.unmarshaler = unmarshaler
return nil
}
})
}

// withBatchFuncs is used to set the functions for merging and splitting batches for OLTP-based exporters.
// It must be provided as the first option when creating a new exporter helper.
func withBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) Option {
return func(o *baseExporter) error {
return optionFunc(func(o *baseExporter) error {
o.batchMergeFunc = mf
o.batchMergeSplitfunc = msf
return nil
}
})
}

// baseExporter contains common fields between different exporter types.
Expand Down Expand Up @@ -259,7 +287,7 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre
}

for _, op := range options {
err = multierr.Append(err, op(be))
err = multierr.Append(err, op.apply(be))
}
if err != nil {
return nil, err
Expand All @@ -268,7 +296,7 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre
if be.batcherCfg.Enabled {
bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc)
for _, opt := range be.batcherOpts {
err = multierr.Append(err, opt(bs))
err = multierr.Append(err, opt.apply(bs))
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
Expand All @@ -283,7 +311,7 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre
}
be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.queueCfg), be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
for _, op := range options {
err = multierr.Append(err, op(be))
err = multierr.Append(err, op.apply(be))
}
}

Expand Down
Loading

0 comments on commit 8027d80

Please sign in to comment.