From e9bc4bde924e26b9e5304a3d94d9318ca5aa886b Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Sat, 14 Dec 2024 07:04:34 -0800 Subject: [PATCH] Deprecate connectorprofiles module in favor of xconnector (#11887) to allow adding more experimental data types. Updates https://github.com/open-telemetry/opentelemetry-collector/issues/11778 --- .chloggen/deprecate-connectorprofiles.yaml | 20 ++ .github/CODEOWNERS | 1 + cmd/builder/internal/builder/main_test.go | 2 +- cmd/otelcorecol/builder-config.yaml | 2 +- cmd/otelcorecol/go.mod | 4 +- connector/connectorprofiles/connector.go | 319 +++-------------- connector/connectorprofiles/go.mod | 5 +- .../connectorprofiles/profiles_router.go | 35 +- connector/connectortest/connector.go | 44 +-- connector/connectortest/connector_test.go | 16 +- connector/connectortest/go.mod | 4 +- connector/forwardconnector/go.mod | 4 +- connector/xconnector/Makefile | 1 + connector/xconnector/connector.go | 331 ++++++++++++++++++ connector/xconnector/connector_test.go | 148 ++++++++ connector/xconnector/go.mod | 64 ++++ connector/xconnector/go.sum | 91 +++++ connector/xconnector/profiles_router.go | 36 ++ connector/xconnector/profiles_router_test.go | 159 +++++++++ internal/e2e/go.mod | 4 +- otelcol/go.mod | 4 +- otelcol/otelcoltest/go.mod | 4 +- service/go.mod | 4 +- service/internal/builders/connector.go | 24 +- service/internal/builders/connector_test.go | 92 ++--- service/internal/graph/connector.go | 6 +- service/internal/graph/graph.go | 10 +- service/internal/graph/util_test.go | 38 +- .../testcomponents/example_connector.go | 60 ++-- .../internal/testcomponents/example_router.go | 16 +- .../testcomponents/example_router_test.go | 4 +- versions.yaml | 1 + 32 files changed, 1090 insertions(+), 463 deletions(-) create mode 100644 .chloggen/deprecate-connectorprofiles.yaml create mode 100644 connector/xconnector/Makefile create mode 100644 connector/xconnector/connector.go create mode 100644 connector/xconnector/connector_test.go create mode 100644 connector/xconnector/go.mod create mode 100644 connector/xconnector/go.sum create mode 100644 connector/xconnector/profiles_router.go create mode 100644 connector/xconnector/profiles_router_test.go diff --git a/.chloggen/deprecate-connectorprofiles.yaml b/.chloggen/deprecate-connectorprofiles.yaml new file mode 100644 index 00000000000..d3072f6ccff --- /dev/null +++ b/.chloggen/deprecate-connectorprofiles.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: connector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate connectorprofiles module in favor of xconnector to allow adding more experimental data types. + +# One or more tracking issues or pull requests related to the change +issues: [11778] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 9b9a3f1959e..c09ff1a0d93 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -32,6 +32,7 @@ pdata/pprofile @open-telemetry/collector-approve consumer/consumerprofiles @open-telemetry/collector-approvers @mx-psi @dmathieu consumer/xconsumer @open-telemetry/collector-approvers @mx-psi @dmathieu connector/connectorprofiles @open-telemetry/collector-approvers @mx-psi @dmathieu +connector/xconnector @open-telemetry/collector-approvers @mx-psi @dmathieu exporter/exporterhelper/exporterhelperprofiles @open-telemetry/collector-approvers @mx-psi @dmathieu exporter/exporterhelper/xexporterhelper @open-telemetry/collector-approvers @mx-psi @dmathieu exporter/exporterprofiles @open-telemetry/collector-approvers @mx-psi @dmathieu diff --git a/cmd/builder/internal/builder/main_test.go b/cmd/builder/internal/builder/main_test.go index 162d8e23023..bfd45adab7e 100644 --- a/cmd/builder/internal/builder/main_test.go +++ b/cmd/builder/internal/builder/main_test.go @@ -66,7 +66,7 @@ var replaceModules = []string{ "/consumer/consumertest", "/connector", "/connector/connectortest", - "/connector/connectorprofiles", + "/connector/xconnector", "/exporter", "/exporter/debugexporter", "/exporter/xexporter", diff --git a/cmd/otelcorecol/builder-config.yaml b/cmd/otelcorecol/builder-config.yaml index 80c21fff15e..eae8cefaa07 100644 --- a/cmd/otelcorecol/builder-config.yaml +++ b/cmd/otelcorecol/builder-config.yaml @@ -65,7 +65,7 @@ replaces: - go.opentelemetry.io/collector/consumer/consumertest => ../../consumer/consumertest - go.opentelemetry.io/collector/connector => ../../connector - go.opentelemetry.io/collector/connector/connectortest => ../../connector/connectortest - - go.opentelemetry.io/collector/connector/connectorprofiles => ../../connector/connectorprofiles + - go.opentelemetry.io/collector/connector/xconnector => ../../connector/xconnector - go.opentelemetry.io/collector/connector/forwardconnector => ../../connector/forwardconnector - go.opentelemetry.io/collector/exporter => ../../exporter - go.opentelemetry.io/collector/exporter/debugexporter => ../../exporter/debugexporter diff --git a/cmd/otelcorecol/go.mod b/cmd/otelcorecol/go.mod index 02eaec2fbe8..64f4fa099b4 100644 --- a/cmd/otelcorecol/go.mod +++ b/cmd/otelcorecol/go.mod @@ -93,8 +93,8 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect go.opentelemetry.io/collector/config/configtls v1.21.0 // indirect go.opentelemetry.io/collector/config/internal v0.115.0 // indirect - go.opentelemetry.io/collector/connector/connectorprofiles v0.115.0 // indirect go.opentelemetry.io/collector/connector/connectortest v0.115.0 // indirect + go.opentelemetry.io/collector/connector/xconnector v0.115.0 // indirect go.opentelemetry.io/collector/consumer v1.21.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.0.0-00010101000000-000000000000 // indirect @@ -215,7 +215,7 @@ replace go.opentelemetry.io/collector/connector => ../../connector replace go.opentelemetry.io/collector/connector/connectortest => ../../connector/connectortest -replace go.opentelemetry.io/collector/connector/connectorprofiles => ../../connector/connectorprofiles +replace go.opentelemetry.io/collector/connector/xconnector => ../../connector/xconnector replace go.opentelemetry.io/collector/connector/forwardconnector => ../../connector/forwardconnector diff --git a/connector/connectorprofiles/connector.go b/connector/connectorprofiles/connector.go index 064c0509c32..99437fe5336 100644 --- a/connector/connectorprofiles/connector.go +++ b/connector/connectorprofiles/connector.go @@ -1,41 +1,13 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +// Deprecated: [0.116.0] Use go.opentelemetry.io/collector/connector/xconnector instead. package connectorprofiles // import "go.opentelemetry.io/collector/connector/connectorprofiles" -import ( - "context" +import "go.opentelemetry.io/collector/connector/xconnector" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/internal" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/xconsumer" - "go.opentelemetry.io/collector/pipeline" - "go.opentelemetry.io/collector/pipeline/xpipeline" -) - -type Factory interface { - connector.Factory - - CreateTracesToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Traces, error) - CreateMetricsToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Metrics, error) - CreateLogsToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Logs, error) - - TracesToProfilesStability() component.StabilityLevel - MetricsToProfilesStability() component.StabilityLevel - LogsToProfilesStability() component.StabilityLevel - - CreateProfilesToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (Profiles, error) - CreateProfilesToTraces(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Traces) (Profiles, error) - CreateProfilesToMetrics(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Metrics) (Profiles, error) - CreateProfilesToLogs(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Logs) (Profiles, error) - - ProfilesToProfilesStability() component.StabilityLevel - ProfilesToTracesStability() component.StabilityLevel - ProfilesToMetricsStability() component.StabilityLevel - ProfilesToLogsStability() component.StabilityLevel -} +// Deprecated: [0.116.0] Use xconnector.Factory instead. +type Factory = xconnector.Factory // A Profiles connector acts as an exporter from a profiles pipeline and a receiver // to one or more traces, metrics, logs, or profiles pipelines. @@ -49,283 +21,106 @@ type Factory interface { // the number of profiles observed. // - Profiles could be analyzed by a logs connector that emits events when particular // criteria are met. -type Profiles interface { - component.Component - xconsumer.Profiles -} +// +// Deprecated: [0.116.0] Use xconnector.Profiles instead. +type Profiles = xconnector.Profiles // CreateTracesToProfilesFunc is the equivalent of Factory.CreateTracesToProfiles(). -type CreateTracesToProfilesFunc func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Traces, error) - -// CreateTracesToProfiles implements Factory.CreateTracesToProfiles(). -func (f CreateTracesToProfilesFunc) CreateTracesToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Traces, error) { - if f == nil { - return nil, internal.ErrDataTypes(set.ID, pipeline.SignalTraces, xpipeline.SignalProfiles) - } - return f(ctx, set, cfg, next) -} +// Deprecated: [0.116.0] Use xconnector.CreateTracesToProfilesFunc instead. +type CreateTracesToProfilesFunc = xconnector.CreateTracesToProfilesFunc // CreateMetricsToProfilesFunc is the equivalent of Factory.CreateMetricsToProfiles(). -type CreateMetricsToProfilesFunc func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Metrics, error) - -// CreateMetricsToProfiles implements Factory.CreateMetricsToProfiles(). -func (f CreateMetricsToProfilesFunc) CreateMetricsToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Metrics, error) { - if f == nil { - return nil, internal.ErrDataTypes(set.ID, pipeline.SignalMetrics, xpipeline.SignalProfiles) - } - return f(ctx, set, cfg, next) -} +// Deprecated: [0.116.0] Use xconnector.CreateMetricsToProfilesFunc instead. +type CreateMetricsToProfilesFunc = xconnector.CreateMetricsToProfilesFunc // CreateLogsToProfilesFunc is the equivalent of Factory.CreateLogsToProfiles(). -type CreateLogsToProfilesFunc func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Logs, error) - -// CreateLogsToProfiles implements Factory.CreateLogsToProfiles(). -func (f CreateLogsToProfilesFunc) CreateLogsToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Logs, error) { - if f == nil { - return nil, internal.ErrDataTypes(set.ID, pipeline.SignalLogs, xpipeline.SignalProfiles) - } - return f(ctx, set, cfg, next) -} +// Deprecated: [0.116.0] Use xconnector.CreateLogsToProfilesFunc instead. +type CreateLogsToProfilesFunc = xconnector.CreateLogsToProfilesFunc // CreateProfilesToProfilesFunc is the equivalent of Factory.CreateProfilesToProfiles(). -type CreateProfilesToProfilesFunc func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (Profiles, error) - -// CreateProfilesToProfiles implements Factory.CreateProfilesToProfiles(). -func (f CreateProfilesToProfilesFunc) CreateProfilesToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (Profiles, error) { - if f == nil { - return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, xpipeline.SignalProfiles) - } - return f(ctx, set, cfg, next) -} +// Deprecated: [0.116.0] Use xconnector.CreateProfilesToProfilesFunc instead. +type CreateProfilesToProfilesFunc = xconnector.CreateProfilesToProfilesFunc // CreateProfilesToTracesFunc is the equivalent of Factory.CreateProfilesToTraces(). -type CreateProfilesToTracesFunc func(context.Context, connector.Settings, component.Config, consumer.Traces) (Profiles, error) - -// CreateProfilesToTraces implements Factory.CreateProfilesToTraces(). -func (f CreateProfilesToTracesFunc) CreateProfilesToTraces(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Traces) (Profiles, error) { - if f == nil { - return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalTraces) - } - return f(ctx, set, cfg, next) -} +// Deprecated: [0.116.0] Use xconnector.CreateProfilesToTracesFunc instead. +type CreateProfilesToTracesFunc = xconnector.CreateProfilesToTracesFunc // CreateProfilesToMetricsFunc is the equivalent of Factory.CreateProfilesToMetrics(). -type CreateProfilesToMetricsFunc func(context.Context, connector.Settings, component.Config, consumer.Metrics) (Profiles, error) - -// CreateProfilesToMetrics implements Factory.CreateProfilesToMetrics(). -func (f CreateProfilesToMetricsFunc) CreateProfilesToMetrics(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Metrics) (Profiles, error) { - if f == nil { - return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalMetrics) - } - return f(ctx, set, cfg, next) -} +// Deprecated: [0.116.0] Use xconnector.CreateProfilesToMetricsFunc instead. +type CreateProfilesToMetricsFunc = xconnector.CreateProfilesToMetricsFunc // CreateProfilesToLogsFunc is the equivalent of Factory.CreateProfilesToLogs(). -type CreateProfilesToLogsFunc func(context.Context, connector.Settings, component.Config, consumer.Logs) (Profiles, error) - -// CreateProfilesToLogs implements Factory.CreateProfilesToLogs(). -func (f CreateProfilesToLogsFunc) CreateProfilesToLogs(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Logs) (Profiles, error) { - if f == nil { - return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalLogs) - } - return f(ctx, set, cfg, next) -} +// Deprecated: [0.116.0] Use xconnector.CreateProfilesToLogsFunc instead. +type CreateProfilesToLogsFunc = xconnector.CreateProfilesToLogsFunc // FactoryOption apply changes to ReceiverOptions. -type FactoryOption interface { - // applyOption applies the option. - applyOption(o *factoryOpts) -} - -// factoryOptionFunc is an ReceiverFactoryOption created through a function. -type factoryOptionFunc func(*factoryOpts) - -func (f factoryOptionFunc) applyOption(o *factoryOpts) { - f(o) -} - -type factoryOpts struct { - opts []connector.FactoryOption - - *factory -} +// Deprecated: [0.116.0] Use xconnector.FactoryOption instead. +type FactoryOption = xconnector.FactoryOption // WithTracesToTraces overrides the default "error not supported" implementation for WithTracesToTraces and the default "undefined" stability level. -func WithTracesToTraces(createTracesToTraces connector.CreateTracesToTracesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.WithTracesToTraces(createTracesToTraces, sl)) - }) -} +// Deprecated: [0.116.0] Use xconnector.WithTracesToTraces instead. +var WithTracesToTraces = xconnector.WithTracesToTraces // WithTracesToMetrics overrides the default "error not supported" implementation for WithTracesToMetrics and the default "undefined" stability level. -func WithTracesToMetrics(createTracesToMetrics connector.CreateTracesToMetricsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.WithTracesToMetrics(createTracesToMetrics, sl)) - }) -} +// Deprecated: [0.116.0] Use xconnector.WithTracesToMetrics instead. +var WithTracesToMetrics = xconnector.WithTracesToMetrics // WithTracesToLogs overrides the default "error not supported" implementation for WithTracesToLogs and the default "undefined" stability level. -func WithTracesToLogs(createTracesToLogs connector.CreateTracesToLogsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.WithTracesToLogs(createTracesToLogs, sl)) - }) -} +// Deprecated: [0.116.0] Use xconnector.WithTracesToLogs instead. +var WithTracesToLogs = xconnector.WithTracesToLogs // WithMetricsToTraces overrides the default "error not supported" implementation for WithMetricsToTraces and the default "undefined" stability level. -func WithMetricsToTraces(createMetricsToTraces connector.CreateMetricsToTracesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.WithMetricsToTraces(createMetricsToTraces, sl)) - }) -} +// Deprecated: [0.116.0] Use xconnector.WithMetricsToTraces instead. +var WithMetricsToTraces = xconnector.WithMetricsToTraces // WithMetricsToMetrics overrides the default "error not supported" implementation for WithMetricsToMetrics and the default "undefined" stability level. -func WithMetricsToMetrics(createMetricsToMetrics connector.CreateMetricsToMetricsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.WithMetricsToMetrics(createMetricsToMetrics, sl)) - }) -} +// Deprecated: [0.116.0] Use xconnector.WithMetricsToMetrics instead. +var WithMetricsToMetrics = xconnector.WithMetricsToMetrics // WithMetricsToLogs overrides the default "error not supported" implementation for WithMetricsToLogs and the default "undefined" stability level. -func WithMetricsToLogs(createMetricsToLogs connector.CreateMetricsToLogsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.WithMetricsToLogs(createMetricsToLogs, sl)) - }) -} +// Deprecated: [0.116.0] Use xconnector.WithMetricsToLogs instead. +var WithMetricsToLogs = xconnector.WithMetricsToLogs // WithLogsToTraces overrides the default "error not supported" implementation for WithLogsToTraces and the default "undefined" stability level. -func WithLogsToTraces(createLogsToTraces connector.CreateLogsToTracesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.WithLogsToTraces(createLogsToTraces, sl)) - }) -} +// Deprecated: [0.116.0] Use xconnector.WithLogsToTraces instead. +var WithLogsToTraces = xconnector.WithLogsToTraces // WithLogsToMetrics overrides the default "error not supported" implementation for WithLogsToMetrics and the default "undefined" stability level. -func WithLogsToMetrics(createLogsToMetrics connector.CreateLogsToMetricsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.WithLogsToMetrics(createLogsToMetrics, sl)) - }) -} +// Deprecated: [0.116.0] Use xconnector.WithLogsToMetrics instead. +var WithLogsToMetrics = xconnector.WithLogsToMetrics // WithLogsToLogs overrides the default "error not supported" implementation for WithLogsToLogs and the default "undefined" stability level. -func WithLogsToLogs(createLogsToLogs connector.CreateLogsToLogsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.opts = append(o.opts, connector.WithLogsToLogs(createLogsToLogs, sl)) - }) -} +// Deprecated: [0.116.0] Use xconnector.WithLogsToLogs instead. +var WithLogsToLogs = xconnector.WithLogsToLogs // WithTracesToProfiles overrides the default "error not supported" implementation for WithTracesToProfiles and the default "undefined" stability level. -func WithTracesToProfiles(createTracesToProfiles CreateTracesToProfilesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.tracesToProfilesStabilityLevel = sl - o.CreateTracesToProfilesFunc = createTracesToProfiles - }) -} +// Deprecated: [0.116.0] Use xconnector.WithTracesToProfiles instead. +var WithTracesToProfiles = xconnector.WithTracesToProfiles // WithMetricsToProfiles overrides the default "error not supported" implementation for WithMetricsToProfiles and the default "undefined" stability level. -func WithMetricsToProfiles(createMetricsToProfiles CreateMetricsToProfilesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.metricsToProfilesStabilityLevel = sl - o.CreateMetricsToProfilesFunc = createMetricsToProfiles - }) -} +// Deprecated: [0.116.0] Use xconnector.WithMetricsToProfiles instead. +var WithMetricsToProfiles = xconnector.WithMetricsToProfiles // WithLogsToProfiles overrides the default "error not supported" implementation for WithLogsToProfiles and the default "undefined" stability level. -func WithLogsToProfiles(createLogsToProfiles CreateLogsToProfilesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.logsToProfilesStabilityLevel = sl - o.CreateLogsToProfilesFunc = createLogsToProfiles - }) -} +// Deprecated: [0.116.0] Use xconnector.WithLogsToProfiles instead. +var WithLogsToProfiles = xconnector.WithLogsToProfiles // WithProfilesToProfiles overrides the default "error not supported" implementation for WithProfilesToProfiles and the default "undefined" stability level. -func WithProfilesToProfiles(createProfilesToProfiles CreateProfilesToProfilesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.profilesToProfilesStabilityLevel = sl - o.CreateProfilesToProfilesFunc = createProfilesToProfiles - }) -} +// Deprecated: [0.116.0] Use xconnector.WithProfilesToProfiles instead. +var WithProfilesToProfiles = xconnector.WithProfilesToProfiles // WithProfilesToTraces overrides the default "error not supported" implementation for WithProfilesToTraces and the default "undefined" stability level. -func WithProfilesToTraces(createProfilesToTraces CreateProfilesToTracesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.profilesToTracesStabilityLevel = sl - o.CreateProfilesToTracesFunc = createProfilesToTraces - }) -} +// Deprecated: [0.116.0] Use xconnector.WithProfilesToTraces instead. +var WithProfilesToTraces = xconnector.WithProfilesToTraces // WithProfilesToMetrics overrides the default "error not supported" implementation for WithProfilesToMetrics and the default "undefined" stability level. -func WithProfilesToMetrics(createProfilesToMetrics CreateProfilesToMetricsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.profilesToMetricsStabilityLevel = sl - o.CreateProfilesToMetricsFunc = createProfilesToMetrics - }) -} +// Deprecated: [0.116.0] Use xconnector.WithProfilesToMetrics instead. +var WithProfilesToMetrics = xconnector.WithProfilesToMetrics // WithProfilesToLogs overrides the default "error not supported" implementation for WithProfilesToLogs and the default "undefined" stability level. -func WithProfilesToLogs(createProfilesToLogs CreateProfilesToLogsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factoryOpts) { - o.profilesToLogsStabilityLevel = sl - o.CreateProfilesToLogsFunc = createProfilesToLogs - }) -} - -// factory implements the Factory interface. -type factory struct { - connector.Factory - - CreateTracesToProfilesFunc - CreateMetricsToProfilesFunc - CreateLogsToProfilesFunc - - CreateProfilesToProfilesFunc - CreateProfilesToTracesFunc - CreateProfilesToMetricsFunc - CreateProfilesToLogsFunc - - tracesToProfilesStabilityLevel component.StabilityLevel - metricsToProfilesStabilityLevel component.StabilityLevel - logsToProfilesStabilityLevel component.StabilityLevel - - profilesToProfilesStabilityLevel component.StabilityLevel - profilesToTracesStabilityLevel component.StabilityLevel - profilesToMetricsStabilityLevel component.StabilityLevel - profilesToLogsStabilityLevel component.StabilityLevel -} - -func (f *factory) TracesToProfilesStability() component.StabilityLevel { - return f.tracesToProfilesStabilityLevel -} - -func (f *factory) MetricsToProfilesStability() component.StabilityLevel { - return f.metricsToProfilesStabilityLevel -} - -func (f *factory) LogsToProfilesStability() component.StabilityLevel { - return f.logsToProfilesStabilityLevel -} - -func (f *factory) ProfilesToProfilesStability() component.StabilityLevel { - return f.profilesToProfilesStabilityLevel -} - -func (f *factory) ProfilesToTracesStability() component.StabilityLevel { - return f.profilesToTracesStabilityLevel -} - -func (f *factory) ProfilesToMetricsStability() component.StabilityLevel { - return f.profilesToMetricsStabilityLevel -} - -func (f *factory) ProfilesToLogsStability() component.StabilityLevel { - return f.profilesToLogsStabilityLevel -} +// Deprecated: [0.116.0] Use xconnector.WithProfilesToLogs instead. +var WithProfilesToLogs = xconnector.WithProfilesToLogs // NewFactory returns a Factory. -func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { - opts := factoryOpts{factory: &factory{}} - for _, opt := range options { - opt.applyOption(&opts) - } - opts.Factory = connector.NewFactory(cfgType, createDefaultConfig, opts.opts...) - return opts.factory -} +// Deprecated: [0.116.0] Use xconnector.NewFactory instead. +var NewFactory = xconnector.NewFactory diff --git a/connector/connectorprofiles/go.mod b/connector/connectorprofiles/go.mod index 52e276d537d..02fa46135eb 100644 --- a/connector/connectorprofiles/go.mod +++ b/connector/connectorprofiles/go.mod @@ -6,10 +6,10 @@ require ( github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.115.0 go.opentelemetry.io/collector/connector v0.115.0 + go.opentelemetry.io/collector/connector/xconnector v0.115.0 go.opentelemetry.io/collector/consumer v1.21.0 go.opentelemetry.io/collector/consumer/consumertest v0.115.0 go.opentelemetry.io/collector/consumer/xconsumer v0.115.1-0.20241213185000-4593ba7de234 - go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.0 go.opentelemetry.io/collector/pdata/pprofile v0.115.0 go.opentelemetry.io/collector/pdata/testdata v0.115.0 go.opentelemetry.io/collector/pipeline v0.115.0 @@ -24,6 +24,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect + go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.0 // indirect go.opentelemetry.io/collector/pdata v1.21.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect @@ -41,6 +42,8 @@ require ( replace go.opentelemetry.io/collector/connector => ../ +replace go.opentelemetry.io/collector/connector/xconnector => ../xconnector + replace go.opentelemetry.io/collector/consumer => ../../consumer replace go.opentelemetry.io/collector/consumer/consumertest => ../../consumer/consumertest diff --git a/connector/connectorprofiles/profiles_router.go b/connector/connectorprofiles/profiles_router.go index fb5d8b01970..1349640ac66 100644 --- a/connector/connectorprofiles/profiles_router.go +++ b/connector/connectorprofiles/profiles_router.go @@ -1,36 +1,13 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +// Deprecated: [0.116.0] Use go.opentelemetry.io/collector/connector/xconnector instead. package connectorprofiles // import "go.opentelemetry.io/collector/connector/connectorprofiles" -import ( - "go.opentelemetry.io/collector/connector/internal" - "go.opentelemetry.io/collector/consumer/xconsumer" - "go.opentelemetry.io/collector/internal/fanoutconsumer" - "go.opentelemetry.io/collector/pipeline" -) +import "go.opentelemetry.io/collector/connector/xconnector" -type ProfilesRouterAndConsumer interface { - xconsumer.Profiles - Consumer(...pipeline.ID) (xconsumer.Profiles, error) - PipelineIDs() []pipeline.ID - privateFunc() -} +// Deprecated: [0.116.0] Use xconnector.ProfilesRouterAndConsumer instead. +type ProfilesRouterAndConsumer = xconnector.ProfilesRouterAndConsumer -type profilesRouter struct { - xconsumer.Profiles - internal.BaseRouter[xconsumer.Profiles] -} - -func NewProfilesRouter(cm map[pipeline.ID]xconsumer.Profiles) ProfilesRouterAndConsumer { - consumers := make([]xconsumer.Profiles, 0, len(cm)) - for _, cons := range cm { - consumers = append(consumers, cons) - } - return &profilesRouter{ - Profiles: fanoutconsumer.NewProfiles(consumers), - BaseRouter: internal.NewBaseRouter(fanoutconsumer.NewProfiles, cm), - } -} - -func (r *profilesRouter) privateFunc() {} +// Deprecated: [0.116.0] Use xconnector.NewProfilesRouter instead. +var NewProfilesRouter = xconnector.NewProfilesRouter diff --git a/connector/connectortest/connector.go b/connector/connectortest/connector.go index 8659063c0d1..fbc8febe94f 100644 --- a/connector/connectortest/connector.go +++ b/connector/connectortest/connector.go @@ -11,7 +11,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/connectorprofiles" + "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/xconsumer" @@ -32,27 +32,27 @@ type nopConfig struct{} // NewNopFactory returns a connector.Factory that constructs nop processors. func NewNopFactory() connector.Factory { - return connectorprofiles.NewFactory( + return xconnector.NewFactory( nopType, func() component.Config { return &nopConfig{} }, - connectorprofiles.WithTracesToTraces(createTracesToTracesConnector, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToLogs(createTracesToLogsConnector, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToProfiles(createTracesToProfilesConnector, component.StabilityLevelAlpha), - connectorprofiles.WithMetricsToTraces(createMetricsToTracesConnector, component.StabilityLevelDevelopment), - connectorprofiles.WithMetricsToMetrics(createMetricsToMetricsConnector, component.StabilityLevelDevelopment), - connectorprofiles.WithMetricsToLogs(createMetricsToLogsConnector, component.StabilityLevelDevelopment), - connectorprofiles.WithMetricsToProfiles(createMetricsToProfilesConnector, component.StabilityLevelAlpha), - connectorprofiles.WithLogsToTraces(createLogsToTracesConnector, component.StabilityLevelDevelopment), - connectorprofiles.WithLogsToMetrics(createLogsToMetricsConnector, component.StabilityLevelDevelopment), - connectorprofiles.WithLogsToLogs(createLogsToLogsConnector, component.StabilityLevelDevelopment), - connectorprofiles.WithLogsToProfiles(createLogsToProfilesConnector, component.StabilityLevelAlpha), - connectorprofiles.WithProfilesToTraces(createProfilesToTracesConnector, component.StabilityLevelAlpha), - connectorprofiles.WithProfilesToMetrics(createProfilesToMetricsConnector, component.StabilityLevelAlpha), - connectorprofiles.WithProfilesToLogs(createProfilesToLogsConnector, component.StabilityLevelAlpha), - connectorprofiles.WithProfilesToProfiles(createProfilesToProfilesConnector, component.StabilityLevelAlpha), + xconnector.WithTracesToTraces(createTracesToTracesConnector, component.StabilityLevelDevelopment), + xconnector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelDevelopment), + xconnector.WithTracesToLogs(createTracesToLogsConnector, component.StabilityLevelDevelopment), + xconnector.WithTracesToProfiles(createTracesToProfilesConnector, component.StabilityLevelAlpha), + xconnector.WithMetricsToTraces(createMetricsToTracesConnector, component.StabilityLevelDevelopment), + xconnector.WithMetricsToMetrics(createMetricsToMetricsConnector, component.StabilityLevelDevelopment), + xconnector.WithMetricsToLogs(createMetricsToLogsConnector, component.StabilityLevelDevelopment), + xconnector.WithMetricsToProfiles(createMetricsToProfilesConnector, component.StabilityLevelAlpha), + xconnector.WithLogsToTraces(createLogsToTracesConnector, component.StabilityLevelDevelopment), + xconnector.WithLogsToMetrics(createLogsToMetricsConnector, component.StabilityLevelDevelopment), + xconnector.WithLogsToLogs(createLogsToLogsConnector, component.StabilityLevelDevelopment), + xconnector.WithLogsToProfiles(createLogsToProfilesConnector, component.StabilityLevelAlpha), + xconnector.WithProfilesToTraces(createProfilesToTracesConnector, component.StabilityLevelAlpha), + xconnector.WithProfilesToMetrics(createProfilesToMetricsConnector, component.StabilityLevelAlpha), + xconnector.WithProfilesToLogs(createProfilesToLogsConnector, component.StabilityLevelAlpha), + xconnector.WithProfilesToProfiles(createProfilesToProfilesConnector, component.StabilityLevelAlpha), ) } @@ -104,19 +104,19 @@ func createLogsToProfilesConnector(context.Context, connector.Settings, componen return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createProfilesToTracesConnector(context.Context, connector.Settings, component.Config, consumer.Traces) (connectorprofiles.Profiles, error) { +func createProfilesToTracesConnector(context.Context, connector.Settings, component.Config, consumer.Traces) (xconnector.Profiles, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createProfilesToMetricsConnector(context.Context, connector.Settings, component.Config, consumer.Metrics) (connectorprofiles.Profiles, error) { +func createProfilesToMetricsConnector(context.Context, connector.Settings, component.Config, consumer.Metrics) (xconnector.Profiles, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createProfilesToLogsConnector(context.Context, connector.Settings, component.Config, consumer.Logs) (connectorprofiles.Profiles, error) { +func createProfilesToLogsConnector(context.Context, connector.Settings, component.Config, consumer.Logs) (xconnector.Profiles, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createProfilesToProfilesConnector(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connectorprofiles.Profiles, error) { +func createProfilesToProfilesConnector(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (xconnector.Profiles, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } diff --git a/connector/connectortest/connector_test.go b/connector/connectortest/connector_test.go index 37393aef600..ad43fc88342 100644 --- a/connector/connectortest/connector_test.go +++ b/connector/connectortest/connector_test.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/connector/connectorprofiles" + "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -45,7 +45,7 @@ func TestNewNopConnectorFactory(t *testing.T) { assert.NoError(t, tracesToLogs.ConsumeTraces(context.Background(), ptrace.NewTraces())) assert.NoError(t, tracesToLogs.Shutdown(context.Background())) - tracesToProfiles, err := factory.(connectorprofiles.Factory).CreateTracesToProfiles(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + tracesToProfiles, err := factory.(xconnector.Factory).CreateTracesToProfiles(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NoError(t, tracesToProfiles.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, tracesToProfiles.ConsumeTraces(context.Background(), ptrace.NewTraces())) @@ -69,7 +69,7 @@ func TestNewNopConnectorFactory(t *testing.T) { assert.NoError(t, metricsToLogs.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) assert.NoError(t, metricsToLogs.Shutdown(context.Background())) - metricsToProfiles, err := factory.(connectorprofiles.Factory).CreateMetricsToProfiles(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + metricsToProfiles, err := factory.(xconnector.Factory).CreateMetricsToProfiles(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NoError(t, metricsToProfiles.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, metricsToProfiles.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) @@ -93,31 +93,31 @@ func TestNewNopConnectorFactory(t *testing.T) { assert.NoError(t, logsToLogs.ConsumeLogs(context.Background(), plog.NewLogs())) assert.NoError(t, logsToLogs.Shutdown(context.Background())) - logsToProfiles, err := factory.(connectorprofiles.Factory).CreateLogsToProfiles(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + logsToProfiles, err := factory.(xconnector.Factory).CreateLogsToProfiles(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NoError(t, logsToProfiles.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, logsToProfiles.ConsumeLogs(context.Background(), plog.NewLogs())) assert.NoError(t, logsToProfiles.Shutdown(context.Background())) - profilesToTraces, err := factory.(connectorprofiles.Factory).CreateProfilesToTraces(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + profilesToTraces, err := factory.(xconnector.Factory).CreateProfilesToTraces(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NoError(t, profilesToTraces.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, profilesToTraces.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) assert.NoError(t, profilesToTraces.Shutdown(context.Background())) - profilesToMetrics, err := factory.(connectorprofiles.Factory).CreateProfilesToMetrics(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + profilesToMetrics, err := factory.(xconnector.Factory).CreateProfilesToMetrics(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NoError(t, profilesToMetrics.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, profilesToMetrics.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) assert.NoError(t, profilesToMetrics.Shutdown(context.Background())) - profilesToLogs, err := factory.(connectorprofiles.Factory).CreateProfilesToProfiles(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + profilesToLogs, err := factory.(xconnector.Factory).CreateProfilesToProfiles(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NoError(t, profilesToLogs.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, profilesToLogs.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) assert.NoError(t, profilesToLogs.Shutdown(context.Background())) - profilesToProfiles, err := factory.(connectorprofiles.Factory).CreateProfilesToProfiles(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + profilesToProfiles, err := factory.(xconnector.Factory).CreateProfilesToProfiles(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NoError(t, profilesToProfiles.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, profilesToProfiles.ConsumeProfiles(context.Background(), pprofile.NewProfiles())) diff --git a/connector/connectortest/go.mod b/connector/connectortest/go.mod index 73f0bfa22c9..c10a5117889 100644 --- a/connector/connectortest/go.mod +++ b/connector/connectortest/go.mod @@ -8,7 +8,7 @@ require ( go.opentelemetry.io/collector/component v0.115.0 go.opentelemetry.io/collector/component/componenttest v0.115.0 go.opentelemetry.io/collector/connector v0.115.0 - go.opentelemetry.io/collector/connector/connectorprofiles v0.115.0 + go.opentelemetry.io/collector/connector/xconnector v0.115.0 go.opentelemetry.io/collector/consumer v1.21.0 go.opentelemetry.io/collector/consumer/consumertest v0.115.0 go.opentelemetry.io/collector/consumer/xconsumer v0.115.1-0.20241213185000-4593ba7de234 @@ -62,7 +62,7 @@ replace go.opentelemetry.io/collector/consumer/xconsumer => ../../consumer/xcons replace go.opentelemetry.io/collector/consumer/consumertest => ../../consumer/consumertest -replace go.opentelemetry.io/collector/connector/connectorprofiles => ../connectorprofiles +replace go.opentelemetry.io/collector/connector/xconnector => ../xconnector replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry diff --git a/connector/forwardconnector/go.mod b/connector/forwardconnector/go.mod index e3b2229ffd6..c2789658e9c 100644 --- a/connector/forwardconnector/go.mod +++ b/connector/forwardconnector/go.mod @@ -33,7 +33,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect - go.opentelemetry.io/collector/connector/connectorprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/connector/xconnector v0.115.0 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.115.1-0.20241213185000-4593ba7de234 // indirect go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect @@ -83,7 +83,7 @@ replace go.opentelemetry.io/collector/consumer/xconsumer => ../../consumer/xcons replace go.opentelemetry.io/collector/consumer/consumertest => ../../consumer/consumertest -replace go.opentelemetry.io/collector/connector/connectorprofiles => ../connectorprofiles +replace go.opentelemetry.io/collector/connector/xconnector => ../xconnector replace go.opentelemetry.io/collector/pipeline => ../../pipeline diff --git a/connector/xconnector/Makefile b/connector/xconnector/Makefile new file mode 100644 index 00000000000..ded7a36092d --- /dev/null +++ b/connector/xconnector/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/connector/xconnector/connector.go b/connector/xconnector/connector.go new file mode 100644 index 00000000000..d697e06a8e4 --- /dev/null +++ b/connector/xconnector/connector.go @@ -0,0 +1,331 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xconnector // import "go.opentelemetry.io/collector/connector/xconnector" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/connector/internal" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/xpipeline" +) + +type Factory interface { + connector.Factory + + CreateTracesToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Traces, error) + CreateMetricsToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Metrics, error) + CreateLogsToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Logs, error) + + TracesToProfilesStability() component.StabilityLevel + MetricsToProfilesStability() component.StabilityLevel + LogsToProfilesStability() component.StabilityLevel + + CreateProfilesToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (Profiles, error) + CreateProfilesToTraces(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Traces) (Profiles, error) + CreateProfilesToMetrics(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Metrics) (Profiles, error) + CreateProfilesToLogs(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Logs) (Profiles, error) + + ProfilesToProfilesStability() component.StabilityLevel + ProfilesToTracesStability() component.StabilityLevel + ProfilesToMetricsStability() component.StabilityLevel + ProfilesToLogsStability() component.StabilityLevel +} + +// A Profiles connector acts as an exporter from a profiles pipeline and a receiver +// to one or more traces, metrics, logs, or profiles pipelines. +// Profiles feeds a consumer.Traces, consumer.Metrics, consumer.Logs, or xconsumer.Profiles with data. +// +// Examples: +// - Profiles could be collected in one pipeline and routed to another profiles pipeline +// based on criteria such as attributes or other content of the profile. The second +// pipeline can then process and export the profile to the appropriate backend. +// - Profiles could be summarized by a metrics connector that emits statistics describing +// the number of profiles observed. +// - Profiles could be analyzed by a logs connector that emits events when particular +// criteria are met. +type Profiles interface { + component.Component + xconsumer.Profiles +} + +// CreateTracesToProfilesFunc is the equivalent of Factory.CreateTracesToProfiles(). +type CreateTracesToProfilesFunc func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Traces, error) + +// CreateTracesToProfiles implements Factory.CreateTracesToProfiles(). +func (f CreateTracesToProfilesFunc) CreateTracesToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Traces, error) { + if f == nil { + return nil, internal.ErrDataTypes(set.ID, pipeline.SignalTraces, xpipeline.SignalProfiles) + } + return f(ctx, set, cfg, next) +} + +// CreateMetricsToProfilesFunc is the equivalent of Factory.CreateMetricsToProfiles(). +type CreateMetricsToProfilesFunc func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Metrics, error) + +// CreateMetricsToProfiles implements Factory.CreateMetricsToProfiles(). +func (f CreateMetricsToProfilesFunc) CreateMetricsToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Metrics, error) { + if f == nil { + return nil, internal.ErrDataTypes(set.ID, pipeline.SignalMetrics, xpipeline.SignalProfiles) + } + return f(ctx, set, cfg, next) +} + +// CreateLogsToProfilesFunc is the equivalent of Factory.CreateLogsToProfiles(). +type CreateLogsToProfilesFunc func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Logs, error) + +// CreateLogsToProfiles implements Factory.CreateLogsToProfiles(). +func (f CreateLogsToProfilesFunc) CreateLogsToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (connector.Logs, error) { + if f == nil { + return nil, internal.ErrDataTypes(set.ID, pipeline.SignalLogs, xpipeline.SignalProfiles) + } + return f(ctx, set, cfg, next) +} + +// CreateProfilesToProfilesFunc is the equivalent of Factory.CreateProfilesToProfiles(). +type CreateProfilesToProfilesFunc func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (Profiles, error) + +// CreateProfilesToProfiles implements Factory.CreateProfilesToProfiles(). +func (f CreateProfilesToProfilesFunc) CreateProfilesToProfiles(ctx context.Context, set connector.Settings, cfg component.Config, next xconsumer.Profiles) (Profiles, error) { + if f == nil { + return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, xpipeline.SignalProfiles) + } + return f(ctx, set, cfg, next) +} + +// CreateProfilesToTracesFunc is the equivalent of Factory.CreateProfilesToTraces(). +type CreateProfilesToTracesFunc func(context.Context, connector.Settings, component.Config, consumer.Traces) (Profiles, error) + +// CreateProfilesToTraces implements Factory.CreateProfilesToTraces(). +func (f CreateProfilesToTracesFunc) CreateProfilesToTraces(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Traces) (Profiles, error) { + if f == nil { + return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalTraces) + } + return f(ctx, set, cfg, next) +} + +// CreateProfilesToMetricsFunc is the equivalent of Factory.CreateProfilesToMetrics(). +type CreateProfilesToMetricsFunc func(context.Context, connector.Settings, component.Config, consumer.Metrics) (Profiles, error) + +// CreateProfilesToMetrics implements Factory.CreateProfilesToMetrics(). +func (f CreateProfilesToMetricsFunc) CreateProfilesToMetrics(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Metrics) (Profiles, error) { + if f == nil { + return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalMetrics) + } + return f(ctx, set, cfg, next) +} + +// CreateProfilesToLogsFunc is the equivalent of Factory.CreateProfilesToLogs(). +type CreateProfilesToLogsFunc func(context.Context, connector.Settings, component.Config, consumer.Logs) (Profiles, error) + +// CreateProfilesToLogs implements Factory.CreateProfilesToLogs(). +func (f CreateProfilesToLogsFunc) CreateProfilesToLogs(ctx context.Context, set connector.Settings, cfg component.Config, next consumer.Logs) (Profiles, error) { + if f == nil { + return nil, internal.ErrDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalLogs) + } + return f(ctx, set, cfg, next) +} + +// FactoryOption apply changes to ReceiverOptions. +type FactoryOption interface { + // applyOption applies the option. + applyOption(o *factoryOpts) +} + +// factoryOptionFunc is an ReceiverFactoryOption created through a function. +type factoryOptionFunc func(*factoryOpts) + +func (f factoryOptionFunc) applyOption(o *factoryOpts) { + f(o) +} + +type factoryOpts struct { + opts []connector.FactoryOption + + *factory +} + +// WithTracesToTraces overrides the default "error not supported" implementation for WithTracesToTraces and the default "undefined" stability level. +func WithTracesToTraces(createTracesToTraces connector.CreateTracesToTracesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, connector.WithTracesToTraces(createTracesToTraces, sl)) + }) +} + +// WithTracesToMetrics overrides the default "error not supported" implementation for WithTracesToMetrics and the default "undefined" stability level. +func WithTracesToMetrics(createTracesToMetrics connector.CreateTracesToMetricsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, connector.WithTracesToMetrics(createTracesToMetrics, sl)) + }) +} + +// WithTracesToLogs overrides the default "error not supported" implementation for WithTracesToLogs and the default "undefined" stability level. +func WithTracesToLogs(createTracesToLogs connector.CreateTracesToLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, connector.WithTracesToLogs(createTracesToLogs, sl)) + }) +} + +// WithMetricsToTraces overrides the default "error not supported" implementation for WithMetricsToTraces and the default "undefined" stability level. +func WithMetricsToTraces(createMetricsToTraces connector.CreateMetricsToTracesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, connector.WithMetricsToTraces(createMetricsToTraces, sl)) + }) +} + +// WithMetricsToMetrics overrides the default "error not supported" implementation for WithMetricsToMetrics and the default "undefined" stability level. +func WithMetricsToMetrics(createMetricsToMetrics connector.CreateMetricsToMetricsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, connector.WithMetricsToMetrics(createMetricsToMetrics, sl)) + }) +} + +// WithMetricsToLogs overrides the default "error not supported" implementation for WithMetricsToLogs and the default "undefined" stability level. +func WithMetricsToLogs(createMetricsToLogs connector.CreateMetricsToLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, connector.WithMetricsToLogs(createMetricsToLogs, sl)) + }) +} + +// WithLogsToTraces overrides the default "error not supported" implementation for WithLogsToTraces and the default "undefined" stability level. +func WithLogsToTraces(createLogsToTraces connector.CreateLogsToTracesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, connector.WithLogsToTraces(createLogsToTraces, sl)) + }) +} + +// WithLogsToMetrics overrides the default "error not supported" implementation for WithLogsToMetrics and the default "undefined" stability level. +func WithLogsToMetrics(createLogsToMetrics connector.CreateLogsToMetricsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, connector.WithLogsToMetrics(createLogsToMetrics, sl)) + }) +} + +// WithLogsToLogs overrides the default "error not supported" implementation for WithLogsToLogs and the default "undefined" stability level. +func WithLogsToLogs(createLogsToLogs connector.CreateLogsToLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, connector.WithLogsToLogs(createLogsToLogs, sl)) + }) +} + +// WithTracesToProfiles overrides the default "error not supported" implementation for WithTracesToProfiles and the default "undefined" stability level. +func WithTracesToProfiles(createTracesToProfiles CreateTracesToProfilesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.tracesToProfilesStabilityLevel = sl + o.CreateTracesToProfilesFunc = createTracesToProfiles + }) +} + +// WithMetricsToProfiles overrides the default "error not supported" implementation for WithMetricsToProfiles and the default "undefined" stability level. +func WithMetricsToProfiles(createMetricsToProfiles CreateMetricsToProfilesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.metricsToProfilesStabilityLevel = sl + o.CreateMetricsToProfilesFunc = createMetricsToProfiles + }) +} + +// WithLogsToProfiles overrides the default "error not supported" implementation for WithLogsToProfiles and the default "undefined" stability level. +func WithLogsToProfiles(createLogsToProfiles CreateLogsToProfilesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.logsToProfilesStabilityLevel = sl + o.CreateLogsToProfilesFunc = createLogsToProfiles + }) +} + +// WithProfilesToProfiles overrides the default "error not supported" implementation for WithProfilesToProfiles and the default "undefined" stability level. +func WithProfilesToProfiles(createProfilesToProfiles CreateProfilesToProfilesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.profilesToProfilesStabilityLevel = sl + o.CreateProfilesToProfilesFunc = createProfilesToProfiles + }) +} + +// WithProfilesToTraces overrides the default "error not supported" implementation for WithProfilesToTraces and the default "undefined" stability level. +func WithProfilesToTraces(createProfilesToTraces CreateProfilesToTracesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.profilesToTracesStabilityLevel = sl + o.CreateProfilesToTracesFunc = createProfilesToTraces + }) +} + +// WithProfilesToMetrics overrides the default "error not supported" implementation for WithProfilesToMetrics and the default "undefined" stability level. +func WithProfilesToMetrics(createProfilesToMetrics CreateProfilesToMetricsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.profilesToMetricsStabilityLevel = sl + o.CreateProfilesToMetricsFunc = createProfilesToMetrics + }) +} + +// WithProfilesToLogs overrides the default "error not supported" implementation for WithProfilesToLogs and the default "undefined" stability level. +func WithProfilesToLogs(createProfilesToLogs CreateProfilesToLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.profilesToLogsStabilityLevel = sl + o.CreateProfilesToLogsFunc = createProfilesToLogs + }) +} + +// factory implements the Factory interface. +type factory struct { + connector.Factory + + CreateTracesToProfilesFunc + CreateMetricsToProfilesFunc + CreateLogsToProfilesFunc + + CreateProfilesToProfilesFunc + CreateProfilesToTracesFunc + CreateProfilesToMetricsFunc + CreateProfilesToLogsFunc + + tracesToProfilesStabilityLevel component.StabilityLevel + metricsToProfilesStabilityLevel component.StabilityLevel + logsToProfilesStabilityLevel component.StabilityLevel + + profilesToProfilesStabilityLevel component.StabilityLevel + profilesToTracesStabilityLevel component.StabilityLevel + profilesToMetricsStabilityLevel component.StabilityLevel + profilesToLogsStabilityLevel component.StabilityLevel +} + +func (f *factory) TracesToProfilesStability() component.StabilityLevel { + return f.tracesToProfilesStabilityLevel +} + +func (f *factory) MetricsToProfilesStability() component.StabilityLevel { + return f.metricsToProfilesStabilityLevel +} + +func (f *factory) LogsToProfilesStability() component.StabilityLevel { + return f.logsToProfilesStabilityLevel +} + +func (f *factory) ProfilesToProfilesStability() component.StabilityLevel { + return f.profilesToProfilesStabilityLevel +} + +func (f *factory) ProfilesToTracesStability() component.StabilityLevel { + return f.profilesToTracesStabilityLevel +} + +func (f *factory) ProfilesToMetricsStability() component.StabilityLevel { + return f.profilesToMetricsStabilityLevel +} + +func (f *factory) ProfilesToLogsStability() component.StabilityLevel { + return f.profilesToLogsStabilityLevel +} + +// NewFactory returns a Factory. +func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { + opts := factoryOpts{factory: &factory{}} + for _, opt := range options { + opt.applyOption(&opts) + } + opts.Factory = connector.NewFactory(cfgType, createDefaultConfig, opts.opts...) + return opts.factory +} diff --git a/connector/xconnector/connector_test.go b/connector/xconnector/connector_test.go new file mode 100644 index 00000000000..636e76450ed --- /dev/null +++ b/connector/xconnector/connector_test.go @@ -0,0 +1,148 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xconnector // import "go.opentelemetry.io/collector/connector/xconnector" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/connector/internal" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/xpipeline" +) + +var ( + testType = component.MustNewType("test") + testID = component.MustNewIDWithName("type", "name") +) + +func TestNewFactoryNoOptions(t *testing.T) { + defaultCfg := struct{}{} + factory := NewFactory(testType, func() component.Config { return &defaultCfg }) + assert.EqualValues(t, testType, factory.Type()) + assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) + + _, err := factory.CreateTracesToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalTraces, xpipeline.SignalProfiles)) + _, err = factory.CreateMetricsToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalMetrics, xpipeline.SignalProfiles)) + _, err = factory.CreateLogsToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, err, internal.ErrDataTypes(testID, pipeline.SignalLogs, xpipeline.SignalProfiles)) + + _, err = factory.CreateProfilesToTraces(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalTraces)) + _, err = factory.CreateProfilesToMetrics(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalMetrics)) + _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalLogs)) +} + +func TestNewFactoryWithSameTypes(t *testing.T) { + defaultCfg := struct{}{} + factory := NewFactory(testType, func() component.Config { return &defaultCfg }, + WithProfilesToProfiles(createProfilesToProfiles, component.StabilityLevelAlpha), + ) + assert.EqualValues(t, testType, factory.Type()) + assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) + + assert.Equal(t, component.StabilityLevelAlpha, factory.ProfilesToProfilesStability()) + _, err := factory.CreateProfilesToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + require.NoError(t, err) + + _, err = factory.CreateProfilesToTraces(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalTraces)) + _, err = factory.CreateProfilesToMetrics(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalMetrics)) + _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, pipeline.SignalLogs)) +} + +func TestNewFactoryWithTranslateTypes(t *testing.T) { + defaultCfg := struct{}{} + factory := NewFactory(testType, func() component.Config { return &defaultCfg }, + WithTracesToProfiles(createTracesToProfiles, component.StabilityLevelBeta), + WithMetricsToProfiles(createMetricsToProfiles, component.StabilityLevelDevelopment), + WithLogsToProfiles(createLogsToProfiles, component.StabilityLevelAlpha), + + WithProfilesToTraces(createProfilesToTraces, component.StabilityLevelBeta), + WithProfilesToMetrics(createProfilesToMetrics, component.StabilityLevelDevelopment), + WithProfilesToLogs(createProfilesToLogs, component.StabilityLevelAlpha), + ) + assert.EqualValues(t, testType, factory.Type()) + assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) + + _, err := factory.CreateProfilesToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, err, internal.ErrDataTypes(testID, xpipeline.SignalProfiles, xpipeline.SignalProfiles)) + + assert.Equal(t, component.StabilityLevelBeta, factory.TracesToProfilesStability()) + _, err = factory.CreateTracesToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + require.NoError(t, err) + + assert.Equal(t, component.StabilityLevelDevelopment, factory.MetricsToProfilesStability()) + _, err = factory.CreateMetricsToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + require.NoError(t, err) + + assert.Equal(t, component.StabilityLevelAlpha, factory.LogsToProfilesStability()) + _, err = factory.CreateLogsToProfiles(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + require.NoError(t, err) + + assert.Equal(t, component.StabilityLevelBeta, factory.ProfilesToTracesStability()) + _, err = factory.CreateProfilesToTraces(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + require.NoError(t, err) + + assert.Equal(t, component.StabilityLevelDevelopment, factory.ProfilesToMetricsStability()) + _, err = factory.CreateProfilesToMetrics(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + require.NoError(t, err) + + assert.Equal(t, component.StabilityLevelAlpha, factory.ProfilesToLogsStability()) + _, err = factory.CreateProfilesToLogs(context.Background(), connector.Settings{ID: testID}, &defaultCfg, consumertest.NewNop()) + assert.NoError(t, err) +} + +var nopInstance = &nopConnector{ + Consumer: consumertest.NewNop(), +} + +// nopConnector stores consumed traces and metrics for testing purposes. +type nopConnector struct { + component.StartFunc + component.ShutdownFunc + consumertest.Consumer +} + +func createTracesToProfiles(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Traces, error) { + return nopInstance, nil +} + +func createMetricsToProfiles(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Metrics, error) { + return nopInstance, nil +} + +func createLogsToProfiles(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Logs, error) { + return nopInstance, nil +} + +func createProfilesToProfiles(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (Profiles, error) { + return nopInstance, nil +} + +func createProfilesToTraces(context.Context, connector.Settings, component.Config, consumer.Traces) (Profiles, error) { + return nopInstance, nil +} + +func createProfilesToMetrics(context.Context, connector.Settings, component.Config, consumer.Metrics) (Profiles, error) { + return nopInstance, nil +} + +func createProfilesToLogs(context.Context, connector.Settings, component.Config, consumer.Logs) (Profiles, error) { + return nopInstance, nil +} diff --git a/connector/xconnector/go.mod b/connector/xconnector/go.mod new file mode 100644 index 00000000000..72283301f51 --- /dev/null +++ b/connector/xconnector/go.mod @@ -0,0 +1,64 @@ +module go.opentelemetry.io/collector/connector/xconnector + +go 1.22.0 + +require ( + github.com/stretchr/testify v1.10.0 + go.opentelemetry.io/collector/component v0.115.0 + go.opentelemetry.io/collector/connector v0.115.0 + go.opentelemetry.io/collector/consumer v1.21.0 + go.opentelemetry.io/collector/consumer/consumertest v0.115.0 + go.opentelemetry.io/collector/consumer/xconsumer v0.115.1-0.20241213185000-4593ba7de234 + go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.0 + go.opentelemetry.io/collector/pdata/pprofile v0.115.0 + go.opentelemetry.io/collector/pdata/testdata v0.115.0 + go.opentelemetry.io/collector/pipeline v0.115.0 + go.opentelemetry.io/collector/pipeline/xpipeline v0.115.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect + go.opentelemetry.io/collector/pdata v1.21.0 // indirect + go.opentelemetry.io/otel v1.32.0 // indirect + go.opentelemetry.io/otel/metric v1.32.0 // indirect + go.opentelemetry.io/otel/trace v1.32.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.18.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + google.golang.org/grpc v1.68.1 // indirect + google.golang.org/protobuf v1.35.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector/connector => ../ + +replace go.opentelemetry.io/collector/consumer => ../../consumer + +replace go.opentelemetry.io/collector/consumer/consumertest => ../../consumer/consumertest + +replace go.opentelemetry.io/collector/pdata/pprofile => ../../pdata/pprofile + +replace go.opentelemetry.io/collector/consumer/xconsumer => ../../consumer/xconsumer + +replace go.opentelemetry.io/collector/component => ../../component + +replace go.opentelemetry.io/collector/pdata => ../../pdata + +replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry + +replace go.opentelemetry.io/collector/pdata/testdata => ../../pdata/testdata + +replace go.opentelemetry.io/collector/pipeline => ../../pipeline + +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline + +replace go.opentelemetry.io/collector/internal/fanoutconsumer => ../../internal/fanoutconsumer diff --git a/connector/xconnector/go.sum b/connector/xconnector/go.sum new file mode 100644 index 00000000000..cf2cf2af26f --- /dev/null +++ b/connector/xconnector/go.sum @@ -0,0 +1,91 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= +google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/connector/xconnector/profiles_router.go b/connector/xconnector/profiles_router.go new file mode 100644 index 00000000000..085305ee557 --- /dev/null +++ b/connector/xconnector/profiles_router.go @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xconnector // import "go.opentelemetry.io/collector/connector/xconnector" + +import ( + "go.opentelemetry.io/collector/connector/internal" + "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/internal/fanoutconsumer" + "go.opentelemetry.io/collector/pipeline" +) + +type ProfilesRouterAndConsumer interface { + xconsumer.Profiles + Consumer(...pipeline.ID) (xconsumer.Profiles, error) + PipelineIDs() []pipeline.ID + privateFunc() +} + +type profilesRouter struct { + xconsumer.Profiles + internal.BaseRouter[xconsumer.Profiles] +} + +func NewProfilesRouter(cm map[pipeline.ID]xconsumer.Profiles) ProfilesRouterAndConsumer { + consumers := make([]xconsumer.Profiles, 0, len(cm)) + for _, cons := range cm { + consumers = append(consumers, cons) + } + return &profilesRouter{ + Profiles: fanoutconsumer.NewProfiles(consumers), + BaseRouter: internal.NewBaseRouter(fanoutconsumer.NewProfiles, cm), + } +} + +func (r *profilesRouter) privateFunc() {} diff --git a/connector/xconnector/profiles_router_test.go b/connector/xconnector/profiles_router_test.go new file mode 100644 index 00000000000..ba2fe196483 --- /dev/null +++ b/connector/xconnector/profiles_router_test.go @@ -0,0 +1,159 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xconnector + +import ( + "context" + "fmt" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/xpipeline" +) + +type mutatingProfilesSink struct { + *consumertest.ProfilesSink +} + +func (mts *mutatingProfilesSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +func TestProfilesRouterMultiplexing(t *testing.T) { + num := 20 + for numIDs := 1; numIDs < num; numIDs++ { + for numCons := 1; numCons < num; numCons++ { + for numProfiles := 1; numProfiles < num; numProfiles++ { + t.Run( + fmt.Sprintf("%d-ids/%d-cons/%d-logs", numIDs, numCons, numProfiles), + fuzzProfiles(numIDs, numCons, numProfiles), + ) + } + } + } +} + +func fuzzProfiles(numIDs, numCons, numProfiles int) func(*testing.T) { + return func(t *testing.T) { + allIDs := make([]pipeline.ID, 0, numCons) + allCons := make([]xconsumer.Profiles, 0, numCons) + allConsMap := make(map[pipeline.ID]xconsumer.Profiles) + + // If any consumer is mutating, the router must report mutating + for i := 0; i < numCons; i++ { + allIDs = append(allIDs, pipeline.NewIDWithName(xpipeline.SignalProfiles, "sink_"+strconv.Itoa(numCons))) + // Random chance for each consumer to be mutating + if (numCons+numProfiles+i)%4 == 0 { + allCons = append(allCons, &mutatingProfilesSink{ProfilesSink: new(consumertest.ProfilesSink)}) + } else { + allCons = append(allCons, new(consumertest.ProfilesSink)) + } + allConsMap[allIDs[i]] = allCons[i] + } + + r := NewProfilesRouter(allConsMap) + td := testdata.GenerateProfiles(1) + + // Keep track of how many logs each consumer should receive. + // This will be validated after every call to RouteProfiles. + expected := make(map[pipeline.ID]int, numCons) + + for i := 0; i < numProfiles; i++ { + // Build a random set of ids (no duplicates) + randCons := make(map[pipeline.ID]bool, numIDs) + for j := 0; j < numIDs; j++ { + // This number should be pretty random and less than numCons + conNum := (numCons + numIDs + i + j) % numCons + randCons[allIDs[conNum]] = true + } + + // Convert to slice, update expectations + conIDs := make([]pipeline.ID, 0, len(randCons)) + for id := range randCons { + conIDs = append(conIDs, id) + expected[id]++ + } + + // Route to list of consumers + fanout, err := r.Consumer(conIDs...) + assert.NoError(t, err) + assert.NoError(t, fanout.ConsumeProfiles(context.Background(), td)) + + // Validate expectations for all consumers + for id := range expected { + profiles := []pprofile.Profiles{} + switch con := allConsMap[id].(type) { + case *consumertest.ProfilesSink: + profiles = con.AllProfiles() + case *mutatingProfilesSink: + profiles = con.AllProfiles() + } + assert.Len(t, profiles, expected[id]) + for n := 0; n < len(profiles); n++ { + assert.EqualValues(t, td, profiles[n]) + } + } + } + } +} + +func TestProfilessRouterConsumer(t *testing.T) { + ctx := context.Background() + td := testdata.GenerateProfiles(1) + + fooID := pipeline.NewIDWithName(xpipeline.SignalProfiles, "foo") + barID := pipeline.NewIDWithName(xpipeline.SignalProfiles, "bar") + + foo := new(consumertest.ProfilesSink) + bar := new(consumertest.ProfilesSink) + r := NewProfilesRouter(map[pipeline.ID]xconsumer.Profiles{fooID: foo, barID: bar}) + + rcs := r.PipelineIDs() + assert.Len(t, rcs, 2) + assert.ElementsMatch(t, []pipeline.ID{fooID, barID}, rcs) + + assert.Empty(t, foo.AllProfiles()) + assert.Empty(t, bar.AllProfiles()) + + both, err := r.Consumer(fooID, barID) + assert.NotNil(t, both) + assert.NoError(t, err) + + assert.NoError(t, both.ConsumeProfiles(ctx, td)) + assert.Len(t, foo.AllProfiles(), 1) + assert.Len(t, bar.AllProfiles(), 1) + + fooOnly, err := r.Consumer(fooID) + assert.NotNil(t, fooOnly) + assert.NoError(t, err) + + assert.NoError(t, fooOnly.ConsumeProfiles(ctx, td)) + assert.Len(t, foo.AllProfiles(), 2) + assert.Len(t, bar.AllProfiles(), 1) + + barOnly, err := r.Consumer(barID) + assert.NotNil(t, barOnly) + assert.NoError(t, err) + + assert.NoError(t, barOnly.ConsumeProfiles(ctx, td)) + assert.Len(t, foo.AllProfiles(), 2) + assert.Len(t, bar.AllProfiles(), 2) + + none, err := r.Consumer() + assert.Nil(t, none) + require.Error(t, err) + + fake, err := r.Consumer(pipeline.NewIDWithName(xpipeline.SignalProfiles, "fake")) + assert.Nil(t, fake) + assert.Error(t, err) +} diff --git a/internal/e2e/go.mod b/internal/e2e/go.mod index b639c7dfce2..d40a2c4ffe1 100644 --- a/internal/e2e/go.mod +++ b/internal/e2e/go.mod @@ -82,7 +82,7 @@ require ( go.opentelemetry.io/collector/config/configcompression v1.21.0 // indirect go.opentelemetry.io/collector/config/confignet v1.21.0 // indirect go.opentelemetry.io/collector/config/internal v0.115.0 // indirect - go.opentelemetry.io/collector/connector/connectorprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/connector/xconnector v0.115.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.0.0-00010101000000-000000000000 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.115.1-0.20241213185000-4593ba7de234 // indirect @@ -218,7 +218,7 @@ replace go.opentelemetry.io/collector/receiver/receivertest => ../../receiver/re replace go.opentelemetry.io/collector/processor/xprocessor => ../../processor/xprocessor -replace go.opentelemetry.io/collector/connector/connectorprofiles => ../../connector/connectorprofiles +replace go.opentelemetry.io/collector/connector/xconnector => ../../connector/xconnector replace go.opentelemetry.io/collector/exporter/xexporter => ../../exporter/xexporter diff --git a/otelcol/go.mod b/otelcol/go.mod index 6adff91d533..e4b3a21899f 100644 --- a/otelcol/go.mod +++ b/otelcol/go.mod @@ -70,7 +70,7 @@ require ( github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/collector/component/componenttest v0.115.0 // indirect - go.opentelemetry.io/collector/connector/connectorprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/connector/xconnector v0.115.0 // indirect go.opentelemetry.io/collector/consumer v1.21.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect go.opentelemetry.io/collector/consumer/consumertest v0.115.0 // indirect @@ -185,7 +185,7 @@ replace go.opentelemetry.io/collector/receiver/receivertest => ../receiver/recei replace go.opentelemetry.io/collector/processor/xprocessor => ../processor/xprocessor -replace go.opentelemetry.io/collector/connector/connectorprofiles => ../connector/connectorprofiles +replace go.opentelemetry.io/collector/connector/xconnector => ../connector/xconnector replace go.opentelemetry.io/collector/exporter/xexporter => ../exporter/xexporter diff --git a/otelcol/otelcoltest/go.mod b/otelcol/otelcoltest/go.mod index b5603a5c3f9..ed991db586d 100644 --- a/otelcol/otelcoltest/go.mod +++ b/otelcol/otelcoltest/go.mod @@ -67,7 +67,7 @@ require ( go.opentelemetry.io/collector/component/componentstatus v0.115.0 // indirect go.opentelemetry.io/collector/component/componenttest v0.115.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect - go.opentelemetry.io/collector/connector/connectorprofiles v0.115.0 // indirect + go.opentelemetry.io/collector/connector/xconnector v0.115.0 // indirect go.opentelemetry.io/collector/consumer v1.21.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect go.opentelemetry.io/collector/consumer/consumertest v0.115.0 // indirect @@ -180,7 +180,7 @@ replace go.opentelemetry.io/collector/receiver/receivertest => ../../receiver/re replace go.opentelemetry.io/collector/processor/xprocessor => ../../processor/xprocessor -replace go.opentelemetry.io/collector/connector/connectorprofiles => ../../connector/connectorprofiles +replace go.opentelemetry.io/collector/connector/xconnector => ../../connector/xconnector replace go.opentelemetry.io/collector/exporter/xexporter => ../../exporter/xexporter diff --git a/service/go.mod b/service/go.mod index 5f6dc414f5e..85c35394edf 100644 --- a/service/go.mod +++ b/service/go.mod @@ -17,8 +17,8 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.115.0 go.opentelemetry.io/collector/confmap v1.21.0 go.opentelemetry.io/collector/connector v0.115.0 - go.opentelemetry.io/collector/connector/connectorprofiles v0.115.0 go.opentelemetry.io/collector/connector/connectortest v0.115.0 + go.opentelemetry.io/collector/connector/xconnector v0.115.0 go.opentelemetry.io/collector/consumer v1.21.0 go.opentelemetry.io/collector/consumer/consumertest v0.115.0 go.opentelemetry.io/collector/consumer/xconsumer v0.115.1-0.20241213185000-4593ba7de234 @@ -207,7 +207,7 @@ replace go.opentelemetry.io/collector/exporter/exportertest => ../exporter/expor replace go.opentelemetry.io/collector/consumer/consumererror => ../consumer/consumererror -replace go.opentelemetry.io/collector/connector/connectorprofiles => ../connector/connectorprofiles +replace go.opentelemetry.io/collector/connector/xconnector => ../connector/xconnector replace go.opentelemetry.io/collector/internal/fanoutconsumer => ../internal/fanoutconsumer diff --git a/service/internal/builders/connector.go b/service/internal/builders/connector.go index c813bf4a620..b157d2dd64a 100644 --- a/service/internal/builders/connector.go +++ b/service/internal/builders/connector.go @@ -9,8 +9,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/connectorprofiles" "go.opentelemetry.io/collector/connector/connectortest" + "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" @@ -104,7 +104,7 @@ func (b *ConnectorBuilder) CreateTracesToProfiles(ctx context.Context, set conne return nil, fmt.Errorf("connector factory not available for: %q", set.ID) } - f, ok := connFact.(connectorprofiles.Factory) + f, ok := connFact.(xconnector.Factory) if !ok { return nil, errDataTypes(set.ID, pipeline.SignalTraces, xpipeline.SignalProfiles) } @@ -185,7 +185,7 @@ func (b *ConnectorBuilder) CreateMetricsToProfiles(ctx context.Context, set conn return nil, fmt.Errorf("connector factory not available for: %q", set.ID) } - f, ok := connFact.(connectorprofiles.Factory) + f, ok := connFact.(xconnector.Factory) if !ok { return nil, errDataTypes(set.ID, pipeline.SignalMetrics, xpipeline.SignalProfiles) } @@ -266,7 +266,7 @@ func (b *ConnectorBuilder) CreateLogsToProfiles(ctx context.Context, set connect return nil, fmt.Errorf("connector factory not available for: %q", set.ID) } - f, ok := connFact.(connectorprofiles.Factory) + f, ok := connFact.(xconnector.Factory) if !ok { return nil, errDataTypes(set.ID, pipeline.SignalLogs, xpipeline.SignalProfiles) } @@ -276,7 +276,7 @@ func (b *ConnectorBuilder) CreateLogsToProfiles(ctx context.Context, set connect } // CreateProfilesToTraces creates a Profiles connector based on the settings and config. -func (b *ConnectorBuilder) CreateProfilesToTraces(ctx context.Context, set connector.Settings, next consumer.Traces) (connectorprofiles.Profiles, error) { +func (b *ConnectorBuilder) CreateProfilesToTraces(ctx context.Context, set connector.Settings, next consumer.Traces) (xconnector.Profiles, error) { if next == nil { return nil, errNilNextConsumer } @@ -290,7 +290,7 @@ func (b *ConnectorBuilder) CreateProfilesToTraces(ctx context.Context, set conne return nil, fmt.Errorf("connector factory not available for: %q", set.ID) } - f, ok := connFact.(connectorprofiles.Factory) + f, ok := connFact.(xconnector.Factory) if !ok { return nil, errDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalTraces) } @@ -300,7 +300,7 @@ func (b *ConnectorBuilder) CreateProfilesToTraces(ctx context.Context, set conne } // CreateProfilesToMetrics creates a Profiles connector based on the settings and config. -func (b *ConnectorBuilder) CreateProfilesToMetrics(ctx context.Context, set connector.Settings, next consumer.Metrics) (connectorprofiles.Profiles, error) { +func (b *ConnectorBuilder) CreateProfilesToMetrics(ctx context.Context, set connector.Settings, next consumer.Metrics) (xconnector.Profiles, error) { if next == nil { return nil, errNilNextConsumer } @@ -314,7 +314,7 @@ func (b *ConnectorBuilder) CreateProfilesToMetrics(ctx context.Context, set conn return nil, fmt.Errorf("connector factory not available for: %q", set.ID) } - f, ok := connFact.(connectorprofiles.Factory) + f, ok := connFact.(xconnector.Factory) if !ok { return nil, errDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalMetrics) } @@ -324,7 +324,7 @@ func (b *ConnectorBuilder) CreateProfilesToMetrics(ctx context.Context, set conn } // CreateProfilesToLogs creates a Profiles connector based on the settings and config. -func (b *ConnectorBuilder) CreateProfilesToLogs(ctx context.Context, set connector.Settings, next consumer.Logs) (connectorprofiles.Profiles, error) { +func (b *ConnectorBuilder) CreateProfilesToLogs(ctx context.Context, set connector.Settings, next consumer.Logs) (xconnector.Profiles, error) { if next == nil { return nil, errNilNextConsumer } @@ -338,7 +338,7 @@ func (b *ConnectorBuilder) CreateProfilesToLogs(ctx context.Context, set connect return nil, fmt.Errorf("connector factory not available for: %q", set.ID) } - f, ok := connFact.(connectorprofiles.Factory) + f, ok := connFact.(xconnector.Factory) if !ok { return nil, errDataTypes(set.ID, xpipeline.SignalProfiles, pipeline.SignalLogs) } @@ -348,7 +348,7 @@ func (b *ConnectorBuilder) CreateProfilesToLogs(ctx context.Context, set connect } // CreateProfilesToProfiles creates a Profiles connector based on the settings and config. -func (b *ConnectorBuilder) CreateProfilesToProfiles(ctx context.Context, set connector.Settings, next xconsumer.Profiles) (connectorprofiles.Profiles, error) { +func (b *ConnectorBuilder) CreateProfilesToProfiles(ctx context.Context, set connector.Settings, next xconsumer.Profiles) (xconnector.Profiles, error) { if next == nil { return nil, errNilNextConsumer } @@ -362,7 +362,7 @@ func (b *ConnectorBuilder) CreateProfilesToProfiles(ctx context.Context, set con return nil, fmt.Errorf("connector factory not available for: %q", set.ID) } - f, ok := connFact.(connectorprofiles.Factory) + f, ok := connFact.(xconnector.Factory) if !ok { return nil, errDataTypes(set.ID, xpipeline.SignalProfiles, xpipeline.SignalProfiles) } diff --git a/service/internal/builders/connector_test.go b/service/internal/builders/connector_test.go index cd7b1fad893..a1d60dbf816 100644 --- a/service/internal/builders/connector_test.go +++ b/service/internal/builders/connector_test.go @@ -14,8 +14,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/connectorprofiles" "go.opentelemetry.io/collector/connector/connectortest" + "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/xconsumer" @@ -27,25 +27,25 @@ func TestConnectorBuilder(t *testing.T) { defaultCfg := struct{}{} factories, err := connector.MakeFactoryMap([]connector.Factory{ connector.NewFactory(component.MustNewType("err"), nil), - connectorprofiles.NewFactory( + xconnector.NewFactory( component.MustNewType("all"), func() component.Config { return &defaultCfg }, - connectorprofiles.WithTracesToTraces(createConnectorTracesToTraces, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToMetrics(createConnectorTracesToMetrics, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToLogs(createConnectorTracesToLogs, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToProfiles(createConnectorTracesToProfiles, component.StabilityLevelDevelopment), - connectorprofiles.WithMetricsToTraces(createConnectorMetricsToTraces, component.StabilityLevelAlpha), - connectorprofiles.WithMetricsToMetrics(createConnectorMetricsToMetrics, component.StabilityLevelAlpha), - connectorprofiles.WithMetricsToLogs(createConnectorMetricsToLogs, component.StabilityLevelAlpha), - connectorprofiles.WithMetricsToProfiles(createConnectorMetricsToProfiles, component.StabilityLevelAlpha), - connectorprofiles.WithLogsToTraces(createConnectorLogsToTraces, component.StabilityLevelDeprecated), - connectorprofiles.WithLogsToMetrics(createConnectorLogsToMetrics, component.StabilityLevelDeprecated), - connectorprofiles.WithLogsToLogs(createConnectorLogsToLogs, component.StabilityLevelDeprecated), - connectorprofiles.WithLogsToProfiles(createConnectorLogsToProfiles, component.StabilityLevelDeprecated), - connectorprofiles.WithProfilesToTraces(createConnectorProfilesToTraces, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToMetrics(createConnectorProfilesToMetrics, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToLogs(createConnectorProfilesToLogs, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToProfiles(createConnectorProfilesToProfiles, component.StabilityLevelDevelopment), + xconnector.WithTracesToTraces(createConnectorTracesToTraces, component.StabilityLevelDevelopment), + xconnector.WithTracesToMetrics(createConnectorTracesToMetrics, component.StabilityLevelDevelopment), + xconnector.WithTracesToLogs(createConnectorTracesToLogs, component.StabilityLevelDevelopment), + xconnector.WithTracesToProfiles(createConnectorTracesToProfiles, component.StabilityLevelDevelopment), + xconnector.WithMetricsToTraces(createConnectorMetricsToTraces, component.StabilityLevelAlpha), + xconnector.WithMetricsToMetrics(createConnectorMetricsToMetrics, component.StabilityLevelAlpha), + xconnector.WithMetricsToLogs(createConnectorMetricsToLogs, component.StabilityLevelAlpha), + xconnector.WithMetricsToProfiles(createConnectorMetricsToProfiles, component.StabilityLevelAlpha), + xconnector.WithLogsToTraces(createConnectorLogsToTraces, component.StabilityLevelDeprecated), + xconnector.WithLogsToMetrics(createConnectorLogsToMetrics, component.StabilityLevelDeprecated), + xconnector.WithLogsToLogs(createConnectorLogsToLogs, component.StabilityLevelDeprecated), + xconnector.WithLogsToProfiles(createConnectorLogsToProfiles, component.StabilityLevelDeprecated), + xconnector.WithProfilesToTraces(createxconnectorToTraces, component.StabilityLevelDevelopment), + xconnector.WithProfilesToMetrics(createxconnectorToMetrics, component.StabilityLevelDevelopment), + xconnector.WithProfilesToLogs(createxconnectorToLogs, component.StabilityLevelDevelopment), + xconnector.WithProfilesToProfiles(createxconnectorToProfiles, component.StabilityLevelDevelopment), ), }...) require.NoError(t, err) @@ -263,25 +263,25 @@ func TestConnectorBuilder(t *testing.T) { func TestConnectorBuilderMissingConfig(t *testing.T) { defaultCfg := struct{}{} factories, err := connector.MakeFactoryMap([]connector.Factory{ - connectorprofiles.NewFactory( + xconnector.NewFactory( component.MustNewType("all"), func() component.Config { return &defaultCfg }, - connectorprofiles.WithTracesToTraces(createConnectorTracesToTraces, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToMetrics(createConnectorTracesToMetrics, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToLogs(createConnectorTracesToLogs, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToProfiles(createConnectorTracesToProfiles, component.StabilityLevelDevelopment), - connectorprofiles.WithMetricsToTraces(createConnectorMetricsToTraces, component.StabilityLevelAlpha), - connectorprofiles.WithMetricsToMetrics(createConnectorMetricsToMetrics, component.StabilityLevelAlpha), - connectorprofiles.WithMetricsToLogs(createConnectorMetricsToLogs, component.StabilityLevelAlpha), - connectorprofiles.WithMetricsToProfiles(createConnectorMetricsToProfiles, component.StabilityLevelAlpha), - connectorprofiles.WithLogsToTraces(createConnectorLogsToTraces, component.StabilityLevelDeprecated), - connectorprofiles.WithLogsToMetrics(createConnectorLogsToMetrics, component.StabilityLevelDeprecated), - connectorprofiles.WithLogsToLogs(createConnectorLogsToLogs, component.StabilityLevelDeprecated), - connectorprofiles.WithLogsToProfiles(createConnectorLogsToProfiles, component.StabilityLevelDeprecated), - connectorprofiles.WithProfilesToTraces(createConnectorProfilesToTraces, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToMetrics(createConnectorProfilesToMetrics, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToLogs(createConnectorProfilesToLogs, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToProfiles(createConnectorProfilesToProfiles, component.StabilityLevelDevelopment), + xconnector.WithTracesToTraces(createConnectorTracesToTraces, component.StabilityLevelDevelopment), + xconnector.WithTracesToMetrics(createConnectorTracesToMetrics, component.StabilityLevelDevelopment), + xconnector.WithTracesToLogs(createConnectorTracesToLogs, component.StabilityLevelDevelopment), + xconnector.WithTracesToProfiles(createConnectorTracesToProfiles, component.StabilityLevelDevelopment), + xconnector.WithMetricsToTraces(createConnectorMetricsToTraces, component.StabilityLevelAlpha), + xconnector.WithMetricsToMetrics(createConnectorMetricsToMetrics, component.StabilityLevelAlpha), + xconnector.WithMetricsToLogs(createConnectorMetricsToLogs, component.StabilityLevelAlpha), + xconnector.WithMetricsToProfiles(createConnectorMetricsToProfiles, component.StabilityLevelAlpha), + xconnector.WithLogsToTraces(createConnectorLogsToTraces, component.StabilityLevelDeprecated), + xconnector.WithLogsToMetrics(createConnectorLogsToMetrics, component.StabilityLevelDeprecated), + xconnector.WithLogsToLogs(createConnectorLogsToLogs, component.StabilityLevelDeprecated), + xconnector.WithLogsToProfiles(createConnectorLogsToProfiles, component.StabilityLevelDeprecated), + xconnector.WithProfilesToTraces(createxconnectorToTraces, component.StabilityLevelDevelopment), + xconnector.WithProfilesToMetrics(createxconnectorToMetrics, component.StabilityLevelDevelopment), + xconnector.WithProfilesToLogs(createxconnectorToLogs, component.StabilityLevelDevelopment), + xconnector.WithProfilesToProfiles(createxconnectorToProfiles, component.StabilityLevelDevelopment), ), }...) @@ -397,7 +397,7 @@ func TestNewNopConnectorConfigsAndFactories(t *testing.T) { require.NoError(t, err) assert.IsType(t, tracesToLogs, bTracesToLogs) - tracesToProfiles, err := factory.(connectorprofiles.Factory).CreateTracesToProfiles(context.Background(), set, cfg, consumertest.NewNop()) + tracesToProfiles, err := factory.(xconnector.Factory).CreateTracesToProfiles(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bTracesToProfiles, err := builder.CreateTracesToProfiles(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) @@ -421,7 +421,7 @@ func TestNewNopConnectorConfigsAndFactories(t *testing.T) { require.NoError(t, err) assert.IsType(t, metricsToLogs, bMetricsToLogs) - metricsToProfiles, err := factory.(connectorprofiles.Factory).CreateMetricsToProfiles(context.Background(), set, cfg, consumertest.NewNop()) + metricsToProfiles, err := factory.(xconnector.Factory).CreateMetricsToProfiles(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bMetricsToProfiles, err := builder.CreateMetricsToProfiles(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) @@ -445,31 +445,31 @@ func TestNewNopConnectorConfigsAndFactories(t *testing.T) { require.NoError(t, err) assert.IsType(t, logsToLogs, bLogsToLogs) - logsToProfiles, err := factory.(connectorprofiles.Factory).CreateLogsToProfiles(context.Background(), set, cfg, consumertest.NewNop()) + logsToProfiles, err := factory.(xconnector.Factory).CreateLogsToProfiles(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bLogsToProfiles, err := builder.CreateLogsToProfiles(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, logsToProfiles, bLogsToProfiles) - profilesToTraces, err := factory.(connectorprofiles.Factory).CreateProfilesToTraces(context.Background(), set, cfg, consumertest.NewNop()) + profilesToTraces, err := factory.(xconnector.Factory).CreateProfilesToTraces(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bProfilesToTraces, err := builder.CreateProfilesToTraces(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, profilesToTraces, bProfilesToTraces) - profilesToMetrics, err := factory.(connectorprofiles.Factory).CreateProfilesToMetrics(context.Background(), set, cfg, consumertest.NewNop()) + profilesToMetrics, err := factory.(xconnector.Factory).CreateProfilesToMetrics(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bProfilesToMetrics, err := builder.CreateProfilesToMetrics(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, profilesToMetrics, bProfilesToMetrics) - profilesToLogs, err := factory.(connectorprofiles.Factory).CreateProfilesToLogs(context.Background(), set, cfg, consumertest.NewNop()) + profilesToLogs, err := factory.(xconnector.Factory).CreateProfilesToLogs(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bProfilesToLogs, err := builder.CreateProfilesToLogs(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, profilesToLogs, bProfilesToLogs) - profilesToProfiles, err := factory.(connectorprofiles.Factory).CreateProfilesToProfiles(context.Background(), set, cfg, consumertest.NewNop()) + profilesToProfiles, err := factory.(xconnector.Factory).CreateProfilesToProfiles(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bProfilesToProfiles, err := builder.CreateProfilesToProfiles(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) @@ -535,19 +535,19 @@ func createConnectorLogsToProfiles(context.Context, connector.Settings, componen return nopConnectorInstance, nil } -func createConnectorProfilesToTraces(context.Context, connector.Settings, component.Config, consumer.Traces) (connectorprofiles.Profiles, error) { +func createxconnectorToTraces(context.Context, connector.Settings, component.Config, consumer.Traces) (xconnector.Profiles, error) { return nopConnectorInstance, nil } -func createConnectorProfilesToMetrics(context.Context, connector.Settings, component.Config, consumer.Metrics) (connectorprofiles.Profiles, error) { +func createxconnectorToMetrics(context.Context, connector.Settings, component.Config, consumer.Metrics) (xconnector.Profiles, error) { return nopConnectorInstance, nil } -func createConnectorProfilesToLogs(context.Context, connector.Settings, component.Config, consumer.Logs) (connectorprofiles.Profiles, error) { +func createxconnectorToLogs(context.Context, connector.Settings, component.Config, consumer.Logs) (xconnector.Profiles, error) { return nopConnectorInstance, nil } -func createConnectorProfilesToProfiles(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connectorprofiles.Profiles, error) { +func createxconnectorToProfiles(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (xconnector.Profiles, error) { return nopConnectorInstance, nil } diff --git a/service/internal/graph/connector.go b/service/internal/graph/connector.go index b2313860d87..1f654454ee6 100644 --- a/service/internal/graph/connector.go +++ b/service/internal/graph/connector.go @@ -8,7 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/connectorprofiles" + "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pipeline" @@ -182,12 +182,12 @@ func (n *connectorNode) buildProfiles( for _, next := range nexts { consumers[next.(*capabilitiesNode).pipelineID] = next.(xconsumer.Profiles) } - next := connectorprofiles.NewProfilesRouter(consumers) + next := xconnector.NewProfilesRouter(consumers) var err error switch n.exprPipelineType { case xpipeline.SignalProfiles: - var conn connectorprofiles.Profiles + var conn xconnector.Profiles conn, err = builder.CreateProfilesToProfiles(ctx, set, next) if err != nil { return err diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 33fccd30b42..2bc4e163d7e 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -27,7 +27,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/connectorprofiles" + "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/internal/fanoutconsumer" @@ -548,7 +548,7 @@ func connectorStability(f connector.Factory, expType, recType pipeline.Signal) c case pipeline.SignalLogs: return f.TracesToLogsStability() case xpipeline.SignalProfiles: - fprof, ok := f.(connectorprofiles.Factory) + fprof, ok := f.(xconnector.Factory) if !ok { return component.StabilityLevelUndefined } @@ -563,7 +563,7 @@ func connectorStability(f connector.Factory, expType, recType pipeline.Signal) c case pipeline.SignalLogs: return f.MetricsToLogsStability() case xpipeline.SignalProfiles: - fprof, ok := f.(connectorprofiles.Factory) + fprof, ok := f.(xconnector.Factory) if !ok { return component.StabilityLevelUndefined } @@ -578,14 +578,14 @@ func connectorStability(f connector.Factory, expType, recType pipeline.Signal) c case pipeline.SignalLogs: return f.LogsToLogsStability() case xpipeline.SignalProfiles: - fprof, ok := f.(connectorprofiles.Factory) + fprof, ok := f.(xconnector.Factory) if !ok { return component.StabilityLevelUndefined } return fprof.LogsToProfilesStability() } case xpipeline.SignalProfiles: - fprof, ok := f.(connectorprofiles.Factory) + fprof, ok := f.(xconnector.Factory) if !ok { return component.StabilityLevelUndefined } diff --git a/service/internal/graph/util_test.go b/service/internal/graph/util_test.go index 389a351d781..60be0a3d57a 100644 --- a/service/internal/graph/util_test.go +++ b/service/internal/graph/util_test.go @@ -10,7 +10,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/connectorprofiles" + "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/xconsumer" @@ -164,7 +164,7 @@ func unwrapExampleConnector(c *connectorNode) *testcomponents.ExampleConnector { return ct.(*testcomponents.ExampleConnector) case componentProfiles: // consumes profiles, emits profiles return ct.Component.(*testcomponents.ExampleConnector) - case connectorprofiles.Profiles: // consumes profiles, emits something else + case xconnector.Profiles: // consumes profiles, emits something else return ct.(*testcomponents.ExampleConnector) } return nil @@ -249,58 +249,58 @@ func newErrExporterFactory() exporter.Factory { } func newErrConnectorFactory() connector.Factory { - return connectorprofiles.NewFactory(component.MustNewType("err"), func() component.Config { + return xconnector.NewFactory(component.MustNewType("err"), func() component.Config { return &struct{}{} }, - connectorprofiles.WithTracesToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Traces, error) { + xconnector.WithTracesToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Traces, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithTracesToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Traces, error) { + xconnector.WithTracesToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Traces, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithTracesToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Traces, error) { + xconnector.WithTracesToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Traces, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithTracesToProfiles(func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Traces, error) { + xconnector.WithTracesToProfiles(func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Traces, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithMetricsToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Metrics, error) { + xconnector.WithMetricsToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Metrics, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithMetricsToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Metrics, error) { + xconnector.WithMetricsToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Metrics, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithMetricsToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Metrics, error) { + xconnector.WithMetricsToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Metrics, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithMetricsToProfiles(func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Metrics, error) { + xconnector.WithMetricsToProfiles(func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Metrics, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithLogsToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Logs, error) { + xconnector.WithLogsToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Logs, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithLogsToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Logs, error) { + xconnector.WithLogsToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Logs, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithLogsToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Logs, error) { + xconnector.WithLogsToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Logs, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithLogsToProfiles(func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Logs, error) { + xconnector.WithLogsToProfiles(func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connector.Logs, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithProfilesToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connectorprofiles.Profiles, error) { + xconnector.WithProfilesToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (xconnector.Profiles, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithProfilesToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connectorprofiles.Profiles, error) { + xconnector.WithProfilesToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (xconnector.Profiles, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithProfilesToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connectorprofiles.Profiles, error) { + xconnector.WithProfilesToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (xconnector.Profiles, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), - connectorprofiles.WithProfilesToProfiles(func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (connectorprofiles.Profiles, error) { + xconnector.WithProfilesToProfiles(func(context.Context, connector.Settings, component.Config, xconsumer.Profiles) (xconnector.Profiles, error) { return &errComponent{}, nil }, component.StabilityLevelUnmaintained), ) diff --git a/service/internal/testcomponents/example_connector.go b/service/internal/testcomponents/example_connector.go index 88e0cf28e0a..3bf9196dd54 100644 --- a/service/internal/testcomponents/example_connector.go +++ b/service/internal/testcomponents/example_connector.go @@ -8,7 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/connectorprofiles" + "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pdata/plog" @@ -21,38 +21,38 @@ import ( var connType = component.MustNewType("exampleconnector") // ExampleConnectorFactory is factory for ExampleConnector. -var ExampleConnectorFactory = connectorprofiles.NewFactory( +var ExampleConnectorFactory = xconnector.NewFactory( connType, createExampleConnectorDefaultConfig, - connectorprofiles.WithTracesToTraces(createExampleTracesToTraces, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToMetrics(createExampleTracesToMetrics, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToLogs(createExampleTracesToLogs, component.StabilityLevelDevelopment), - connectorprofiles.WithTracesToProfiles(createExampleTracesToProfiles, component.StabilityLevelDevelopment), - - connectorprofiles.WithMetricsToTraces(createExampleMetricsToTraces, component.StabilityLevelDevelopment), - connectorprofiles.WithMetricsToMetrics(createExampleMetricsToMetrics, component.StabilityLevelDevelopment), - connectorprofiles.WithMetricsToLogs(createExampleMetricsToLogs, component.StabilityLevelDevelopment), - connectorprofiles.WithMetricsToProfiles(createExampleMetricsToProfiles, component.StabilityLevelDevelopment), - - connectorprofiles.WithLogsToTraces(createExampleLogsToTraces, component.StabilityLevelDevelopment), - connectorprofiles.WithLogsToMetrics(createExampleLogsToMetrics, component.StabilityLevelDevelopment), - connectorprofiles.WithLogsToLogs(createExampleLogsToLogs, component.StabilityLevelDevelopment), - connectorprofiles.WithLogsToProfiles(createExampleLogsToProfiles, component.StabilityLevelDevelopment), - - connectorprofiles.WithProfilesToTraces(createExampleProfilesToTraces, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToMetrics(createExampleProfilesToMetrics, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToLogs(createExampleProfilesToLogs, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToProfiles(createExampleProfilesToProfiles, component.StabilityLevelDevelopment), + xconnector.WithTracesToTraces(createExampleTracesToTraces, component.StabilityLevelDevelopment), + xconnector.WithTracesToMetrics(createExampleTracesToMetrics, component.StabilityLevelDevelopment), + xconnector.WithTracesToLogs(createExampleTracesToLogs, component.StabilityLevelDevelopment), + xconnector.WithTracesToProfiles(createExampleTracesToProfiles, component.StabilityLevelDevelopment), + + xconnector.WithMetricsToTraces(createExampleMetricsToTraces, component.StabilityLevelDevelopment), + xconnector.WithMetricsToMetrics(createExampleMetricsToMetrics, component.StabilityLevelDevelopment), + xconnector.WithMetricsToLogs(createExampleMetricsToLogs, component.StabilityLevelDevelopment), + xconnector.WithMetricsToProfiles(createExampleMetricsToProfiles, component.StabilityLevelDevelopment), + + xconnector.WithLogsToTraces(createExampleLogsToTraces, component.StabilityLevelDevelopment), + xconnector.WithLogsToMetrics(createExampleLogsToMetrics, component.StabilityLevelDevelopment), + xconnector.WithLogsToLogs(createExampleLogsToLogs, component.StabilityLevelDevelopment), + xconnector.WithLogsToProfiles(createExampleLogsToProfiles, component.StabilityLevelDevelopment), + + xconnector.WithProfilesToTraces(createExampleProfilesToTraces, component.StabilityLevelDevelopment), + xconnector.WithProfilesToMetrics(createExampleProfilesToMetrics, component.StabilityLevelDevelopment), + xconnector.WithProfilesToLogs(createExampleProfilesToLogs, component.StabilityLevelDevelopment), + xconnector.WithProfilesToProfiles(createExampleProfilesToProfiles, component.StabilityLevelDevelopment), ) -var MockForwardConnectorFactory = connectorprofiles.NewFactory( +var MockForwardConnectorFactory = xconnector.NewFactory( component.MustNewType("mockforward"), createExampleConnectorDefaultConfig, - connectorprofiles.WithTracesToTraces(createExampleTracesToTraces, component.StabilityLevelDevelopment), - connectorprofiles.WithMetricsToMetrics(createExampleMetricsToMetrics, component.StabilityLevelDevelopment), - connectorprofiles.WithLogsToLogs(createExampleLogsToLogs, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToProfiles(createExampleProfilesToProfiles, component.StabilityLevelDevelopment), + xconnector.WithTracesToTraces(createExampleTracesToTraces, component.StabilityLevelDevelopment), + xconnector.WithMetricsToMetrics(createExampleMetricsToMetrics, component.StabilityLevelDevelopment), + xconnector.WithLogsToLogs(createExampleLogsToLogs, component.StabilityLevelDevelopment), + xconnector.WithProfilesToProfiles(createExampleProfilesToProfiles, component.StabilityLevelDevelopment), ) func createExampleConnectorDefaultConfig() component.Config { @@ -161,7 +161,7 @@ func createExampleLogsToProfiles(_ context.Context, set connector.Settings, _ co }, nil } -func createExampleProfilesToTraces(_ context.Context, set connector.Settings, _ component.Config, traces consumer.Traces) (connectorprofiles.Profiles, error) { +func createExampleProfilesToTraces(_ context.Context, set connector.Settings, _ component.Config, traces consumer.Traces) (xconnector.Profiles, error) { return &ExampleConnector{ ConsumeProfilesFunc: func(ctx context.Context, _ pprofile.Profiles) error { return traces.ConsumeTraces(ctx, testdata.GenerateTraces(1)) @@ -170,7 +170,7 @@ func createExampleProfilesToTraces(_ context.Context, set connector.Settings, _ }, nil } -func createExampleProfilesToMetrics(_ context.Context, set connector.Settings, _ component.Config, metrics consumer.Metrics) (connectorprofiles.Profiles, error) { +func createExampleProfilesToMetrics(_ context.Context, set connector.Settings, _ component.Config, metrics consumer.Metrics) (xconnector.Profiles, error) { return &ExampleConnector{ ConsumeProfilesFunc: func(ctx context.Context, _ pprofile.Profiles) error { return metrics.ConsumeMetrics(ctx, testdata.GenerateMetrics(1)) @@ -179,7 +179,7 @@ func createExampleProfilesToMetrics(_ context.Context, set connector.Settings, _ }, nil } -func createExampleProfilesToLogs(_ context.Context, set connector.Settings, _ component.Config, logs consumer.Logs) (connectorprofiles.Profiles, error) { +func createExampleProfilesToLogs(_ context.Context, set connector.Settings, _ component.Config, logs consumer.Logs) (xconnector.Profiles, error) { return &ExampleConnector{ ConsumeProfilesFunc: func(ctx context.Context, _ pprofile.Profiles) error { return logs.ConsumeLogs(ctx, testdata.GenerateLogs(1)) @@ -188,7 +188,7 @@ func createExampleProfilesToLogs(_ context.Context, set connector.Settings, _ co }, nil } -func createExampleProfilesToProfiles(_ context.Context, set connector.Settings, _ component.Config, profiles xconsumer.Profiles) (connectorprofiles.Profiles, error) { +func createExampleProfilesToProfiles(_ context.Context, set connector.Settings, _ component.Config, profiles xconsumer.Profiles) (xconnector.Profiles, error) { return &ExampleConnector{ ConsumeProfilesFunc: profiles.ConsumeProfiles, mutatesData: set.ID.Name() == "mutate", diff --git a/service/internal/testcomponents/example_router.go b/service/internal/testcomponents/example_router.go index c2ad20b41a1..067f6b554ca 100644 --- a/service/internal/testcomponents/example_router.go +++ b/service/internal/testcomponents/example_router.go @@ -8,7 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/connectorprofiles" + "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pdata/plog" @@ -21,13 +21,13 @@ import ( var routerType = component.MustNewType("examplerouter") // ExampleRouterFactory is factory for ExampleRouter. -var ExampleRouterFactory = connectorprofiles.NewFactory( +var ExampleRouterFactory = xconnector.NewFactory( routerType, createExampleRouterDefaultConfig, - connectorprofiles.WithTracesToTraces(createExampleTracesRouter, component.StabilityLevelDevelopment), - connectorprofiles.WithMetricsToMetrics(createExampleMetricsRouter, component.StabilityLevelDevelopment), - connectorprofiles.WithLogsToLogs(createExampleLogsRouter, component.StabilityLevelDevelopment), - connectorprofiles.WithProfilesToProfiles(createExampleProfilesRouter, component.StabilityLevelDevelopment), + xconnector.WithTracesToTraces(createExampleTracesRouter, component.StabilityLevelDevelopment), + xconnector.WithMetricsToMetrics(createExampleMetricsRouter, component.StabilityLevelDevelopment), + xconnector.WithLogsToLogs(createExampleLogsRouter, component.StabilityLevelDevelopment), + xconnector.WithProfilesToProfiles(createExampleProfilesRouter, component.StabilityLevelDevelopment), ) type LeftRightConfig struct { @@ -79,9 +79,9 @@ func createExampleLogsRouter(_ context.Context, _ connector.Settings, cfg compon }, nil } -func createExampleProfilesRouter(_ context.Context, _ connector.Settings, cfg component.Config, profiles xconsumer.Profiles) (connectorprofiles.Profiles, error) { +func createExampleProfilesRouter(_ context.Context, _ connector.Settings, cfg component.Config, profiles xconsumer.Profiles) (xconnector.Profiles, error) { c := cfg.(ExampleRouterConfig) - r := profiles.(connectorprofiles.ProfilesRouterAndConsumer) + r := profiles.(xconnector.ProfilesRouterAndConsumer) left, _ := r.Consumer(c.Profiles.Left) right, _ := r.Consumer(c.Profiles.Right) return &ExampleRouter{ diff --git a/service/internal/testcomponents/example_router_test.go b/service/internal/testcomponents/example_router_test.go index 3688ed6aff6..ca59df10cf5 100644 --- a/service/internal/testcomponents/example_router_test.go +++ b/service/internal/testcomponents/example_router_test.go @@ -12,8 +12,8 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/connector" - "go.opentelemetry.io/collector/connector/connectorprofiles" "go.opentelemetry.io/collector/connector/connectortest" + "go.opentelemetry.io/collector/connector/xconnector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/xconsumer" @@ -160,7 +160,7 @@ func TestProfilesRouter(t *testing.T) { // The service will build a router to give to every connector. // Many connectors will just call router.ConsumeProfiles, // but some implementation will call RouteProfiles instead. - router := connectorprofiles.NewProfilesRouter( + router := xconnector.NewProfilesRouter( map[pipeline.ID]xconsumer.Profiles{ leftID: sinkLeft, rightID: sinkRight, diff --git a/versions.yaml b/versions.yaml index d5fe8d325a2..7b15b05bb53 100644 --- a/versions.yaml +++ b/versions.yaml @@ -42,6 +42,7 @@ module-sets: - go.opentelemetry.io/collector/connector/connectortest - go.opentelemetry.io/collector/connector/connectorprofiles - go.opentelemetry.io/collector/connector/forwardconnector + - go.opentelemetry.io/collector/connector/xconnector - go.opentelemetry.io/collector/consumer/xconsumer - go.opentelemetry.io/collector/consumer/consumererror - go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles