From d30111ec84bfd3718ed56460edea890ed84bf58c Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 17 Sep 2024 16:13:51 -0700 Subject: [PATCH] Deprecate service::telemetry::metrics::address Signed-off-by: Bogdan Drutu --- internal/testutil/testutil.go | 38 ++------ otelcol/config_test.go | 18 +++- service/config_test.go | 12 ++- service/service_test.go | 86 +++---------------- service/telemetry/config.go | 57 +++++++++++- service/telemetry/config_test.go | 70 +++++++++++---- service/telemetry/factory.go | 14 ++- service/telemetry/metrics.go | 29 +------ service/telemetry/metrics_test.go | 8 +- service/telemetry/telemetry_test.go | 19 +++- .../testdata/config_deprecated_address.yaml | 5 ++ ...config_deprecated_address_and_readers.yaml | 11 +++ .../config_invalid_deprecated_address.yaml | 5 ++ 13 files changed, 205 insertions(+), 167 deletions(-) create mode 100644 service/telemetry/testdata/config_deprecated_address.yaml create mode 100644 service/telemetry/testdata/config_deprecated_address_and_readers.yaml create mode 100644 service/telemetry/testdata/config_invalid_deprecated_address.yaml diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 60980e840e7..8d0882d04e6 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -25,40 +25,20 @@ type portpair struct { // provided that there is no race by some other code to grab the same port // immediately. func GetAvailableLocalAddress(t testing.TB) string { + return findAvailable(t, "tcp4") +} + +// GetAvailableLocalIPv6Address is IPv6 version of GetAvailableLocalAddress. +func GetAvailableLocalIPv6Address(t testing.TB) string { + return findAvailable(t, "tcp6") +} + +func findAvailable(t testing.TB, network string) string { // Retry has been added for windows as net.Listen can return a port that is not actually available. Details can be // found in https://github.com/docker/for-win/issues/3171 but to summarize Hyper-V will reserve ranges of ports // which do not show up under the "netstat -ano" but can only be found by // "netsh interface ipv4 show excludedportrange protocol=tcp". We'll use []exclusions to hold those ranges and // retry if the port returned by GetAvailableLocalAddress falls in one of those them. - network := "tcp4" - var exclusions []portpair - portFound := false - if runtime.GOOS == "windows" { - exclusions = getExclusionsList(network, t) - } - - var endpoint string - for !portFound { - endpoint = findAvailableAddress(network, t) - _, port, err := net.SplitHostPort(endpoint) - require.NoError(t, err) - portFound = true - if runtime.GOOS == "windows" { - for _, pair := range exclusions { - if port >= pair.first && port <= pair.last { - portFound = false - break - } - } - } - } - - return endpoint -} - -// GetAvailableLocalIPv6Address is IPv6 version of GetAvailableLocalAddress. -func GetAvailableLocalIPv6Address(t testing.TB) string { - network := "tcp6" var exclusions []portpair portFound := false if runtime.GOOS == "windows" { diff --git a/otelcol/config_test.go b/otelcol/config_test.go index c1677c07220..0ea54c54485 100644 --- a/otelcol/config_test.go +++ b/otelcol/config_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/contrib/config" "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" @@ -274,8 +275,17 @@ func generateConfig() *Config { InitialFields: map[string]any{"fieldKey": "filed-value"}, }, Metrics: telemetry.MetricsConfig{ - Level: configtelemetry.LevelNormal, - Address: ":8080", + Level: configtelemetry.LevelNormal, + Readers: []config.MetricReader{ + { + Pull: &config.PullMetricReader{Exporter: config.MetricExporter{ + Prometheus: &config.Prometheus{ + Host: newPtr("localhost"), + Port: newPtr(8080), + }, + }}, + }, + }, }, }, Extensions: []component.ID{component.MustNewID("nop")}, @@ -289,3 +299,7 @@ func generateConfig() *Config { }, } } + +func newPtr[T int | string](str T) *T { + return &str +} diff --git a/service/config_test.go b/service/config_test.go index 6ac9d6e625a..a3cd78295b9 100644 --- a/service/config_test.go +++ b/service/config_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/contrib/config" "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" @@ -66,7 +67,7 @@ func TestConfigValidate(t *testing.T) { cfgFn: func() *Config { cfg := generateConfig() cfg.Telemetry.Metrics.Level = configtelemetry.LevelBasic - cfg.Telemetry.Metrics.Address = "" + cfg.Telemetry.Metrics.Readers = nil return cfg }, expected: nil, @@ -95,8 +96,13 @@ func generateConfig() *Config { InitialFields: map[string]any{"fieldKey": "filed-value"}, }, Metrics: telemetry.MetricsConfig{ - Level: configtelemetry.LevelNormal, - Address: ":8080", + Level: configtelemetry.LevelNormal, + Readers: []config.MetricReader{{ + Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: &config.Prometheus{ + Host: newPtr("localhost"), + Port: newPtr(8080), + }}}}, + }, }, }, Extensions: extensions.Config{component.MustNewID("nop")}, diff --git a/service/service_test.go b/service/service_test.go index bff18959569..cd67c25cce9 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -256,78 +256,6 @@ func TestServiceTelemetryCleanupOnError(t *testing.T) { } func TestServiceTelemetry(t *testing.T) { - for _, tc := range ownMetricsTestCases() { - t.Run(fmt.Sprintf("ipv4_%s", tc.name), func(t *testing.T) { - testCollectorStartHelper(t, tc, "tcp4") - }) - t.Run(fmt.Sprintf("ipv6_%s", tc.name), func(t *testing.T) { - testCollectorStartHelper(t, tc, "tcp6") - }) - } -} - -func testCollectorStartHelper(t *testing.T, tc ownMetricsTestCase, network string) { - var once sync.Once - loggingHookCalled := false - hook := func(zapcore.Entry) error { - once.Do(func() { - loggingHookCalled = true - }) - return nil - } - - var ( - metricsAddr string - zpagesAddr string - ) - switch network { - case "tcp", "tcp4": - metricsAddr = testutil.GetAvailableLocalAddress(t) - zpagesAddr = testutil.GetAvailableLocalAddress(t) - case "tcp6": - metricsAddr = testutil.GetAvailableLocalIPv6Address(t) - zpagesAddr = testutil.GetAvailableLocalIPv6Address(t) - } - require.NotZero(t, metricsAddr, "network must be either of tcp, tcp4 or tcp6") - require.NotZero(t, zpagesAddr, "network must be either of tcp, tcp4 or tcp6") - - set := newNopSettings() - set.BuildInfo = component.BuildInfo{Version: "test version", Command: otelCommand} - set.ExtensionsConfigs = map[component.ID]component.Config{ - component.MustNewID("zpages"): &zpagesextension.Config{ - ServerConfig: confighttp.ServerConfig{Endpoint: zpagesAddr}, - }, - } - set.ExtensionsFactories = map[component.Type]extension.Factory{component.MustNewType("zpages"): zpagesextension.NewFactory()} - set.LoggingOptions = []zap.Option{zap.Hooks(hook)} - - cfg := newNopConfig() - cfg.Extensions = []component.ID{component.MustNewID("zpages")} - cfg.Telemetry.Metrics.Address = metricsAddr - cfg.Telemetry.Resource = make(map[string]*string) - // Include resource attributes under the service::telemetry::resource key. - for k, v := range tc.userDefinedResource { - cfg.Telemetry.Resource[k] = v - } - - // Create a service, check for metrics, shutdown and repeat to ensure that telemetry can be started/shutdown and started again. - for i := 0; i < 2; i++ { - srv, err := New(context.Background(), set, cfg) - require.NoError(t, err) - - require.NoError(t, srv.Start(context.Background())) - // Sleep for 1 second to ensure the http server is started. - time.Sleep(1 * time.Second) - assert.True(t, loggingHookCalled) - - assertResourceLabels(t, srv.telemetrySettings.Resource, tc.expectedLabels) - assertMetrics(t, metricsAddr, tc.expectedLabels) - assertZPages(t, zpagesAddr) - require.NoError(t, srv.Shutdown(context.Background())) - } -} - -func TestServiceTelemetryWithReaders(t *testing.T) { for _, tc := range ownMetricsTestCases() { t.Run(fmt.Sprintf("ipv4_%s", tc.name), func(t *testing.T) { testCollectorStartHelperWithReaders(t, tc, "tcp4") @@ -375,7 +303,6 @@ func testCollectorStartHelperWithReaders(t *testing.T, tc ownMetricsTestCase, ne cfg := newNopConfig() cfg.Extensions = []component.ID{component.MustNewID("zpages")} - cfg.Telemetry.Metrics.Address = "" cfg.Telemetry.Metrics.Readers = []config.MetricReader{ { Pull: &config.PullMetricReader{ @@ -711,8 +638,13 @@ func newNopConfigPipelineConfigs(pipelineCfgs pipelines.Config) Config { InitialFields: map[string]any(nil), }, Metrics: telemetry.MetricsConfig{ - Level: configtelemetry.LevelBasic, - Address: "localhost:8888", + Level: configtelemetry.LevelBasic, + Readers: []config.MetricReader{{ + Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: &config.Prometheus{ + Host: newPtr("localhost"), + Port: newPtr(8080), + }}}}, + }, }, }, } @@ -744,3 +676,7 @@ func newConfigWatcherExtensionFactory(name component.Type) extension.Factory { component.StabilityLevelDevelopment, ) } + +func newPtr[T int | string](str T) *T { + return &str +} diff --git a/service/telemetry/config.go b/service/telemetry/config.go index 197adf491aa..6f7ba0cccb3 100644 --- a/service/telemetry/config.go +++ b/service/telemetry/config.go @@ -5,14 +5,27 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry" import ( "fmt" + "net" + "strconv" "time" "go.opentelemetry.io/contrib/config" "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/featuregate" ) +var _ confmap.Unmarshaler = (*Config)(nil) + +var disableAddressFieldForInternalTelemetryFeatureGate = featuregate.GlobalRegistry().MustRegister( + "telemetry.disableAddressFieldForInternalTelemetry", + featuregate.StageAlpha, + featuregate.WithRegisterFromVersion("v0.110.0"), + featuregate.WithRegisterToVersion("v0.110.0"), + featuregate.WithRegisterDescription("controls whether the deprecated address field for internal telemetry is still supported")) + // Config defines the configurable settings for service telemetry. type Config struct { Logs LogsConfig `mapstructure:"logs"` @@ -119,7 +132,7 @@ type MetricsConfig struct { // - "detailed" adds dimensions and views to the previous levels. Level configtelemetry.Level `mapstructure:"level"` - // Address is the [address]:port that metrics exposition should be bound to. + // Deprecated: [v0.110.0] use readers configuration. Address string `mapstructure:"address"` // Readers allow configuration of metric readers to emit metrics to @@ -143,11 +156,47 @@ type TracesConfig struct { Processors []config.SpanProcessor `mapstructure:"processors"` } +func (c *Config) Unmarshal(conf *confmap.Conf) error { + if err := conf.Unmarshal(c); err != nil { + return err + } + if disableAddressFieldForInternalTelemetryFeatureGate.IsEnabled() { + if len(c.Metrics.Address) != 0 { + host, port, err := net.SplitHostPort(c.Metrics.Address) + if err != nil { + return fmt.Errorf("failing to parse metrics address %q: %w", c.Metrics.Address, err) + } + portInt, err := strconv.Atoi(port) + if err != nil { + return fmt.Errorf("failing to extract the port from the metrics address %q: %w", c.Metrics.Address, err) + } + + // User did not overwrite readers, so we will remove the default configured reader. + if !conf.IsSet("metrics::readers") { + c.Metrics.Readers = nil + } + + c.Metrics.Readers = append(c.Metrics.Readers, config.MetricReader{ + Pull: &config.PullMetricReader{ + Exporter: config.MetricExporter{ + Prometheus: &config.Prometheus{ + Host: &host, + Port: &portInt, + }, + }, + }, + }) + } + + } + return nil +} + // Validate checks whether the current configuration is valid func (c *Config) Validate() error { - // Check when service telemetry metric level is not none, the metrics address should not be empty - if c.Metrics.Level != configtelemetry.LevelNone && c.Metrics.Address == "" && len(c.Metrics.Readers) == 0 { - return fmt.Errorf("collector telemetry metric address or reader should exist when metric level is not none") + // Check when service telemetry metric level is not none, the metrics readers should not be empty + if c.Metrics.Level != configtelemetry.LevelNone && len(c.Metrics.Readers) == 0 { + return fmt.Errorf("collector telemetry metrics reader should exist when metric level is not none") } return nil diff --git a/service/telemetry/config_test.go b/service/telemetry/config_test.go index 5417347c772..915b46171b1 100644 --- a/service/telemetry/config_test.go +++ b/service/telemetry/config_test.go @@ -4,15 +4,60 @@ package telemetry import ( + "path/filepath" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/contrib/config" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/confmaptest" ) -func TestLoadConfig(t *testing.T) { +func TestComponentConfigStruct(t *testing.T) { + require.NoError(t, componenttest.CheckConfigStruct(NewFactory().CreateDefaultConfig())) +} + +func TestUnmarshalDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NoError(t, confmap.New().Unmarshal(&cfg)) + assert.Equal(t, factory.CreateDefaultConfig(), cfg) +} + +func TestUnmarshalConfigDeprecatedAddress(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config_deprecated_address.yaml")) + require.NoError(t, err) + cfg := NewFactory().CreateDefaultConfig() + require.NoError(t, cm.Unmarshal(&cfg)) + require.Len(t, cfg.(*Config).Metrics.Readers, 1) + assert.Equal(t, "", *cfg.(*Config).Metrics.Readers[0].Pull.Exporter.Prometheus.Host) + assert.Equal(t, 6666, *cfg.(*Config).Metrics.Readers[0].Pull.Exporter.Prometheus.Port) +} + +func TestUnmarshalConfigInvalidDeprecatedAddress(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config_invalid_deprecated_address.yaml")) + require.NoError(t, err) + cfg := NewFactory().CreateDefaultConfig() + require.Error(t, cm.Unmarshal(&cfg)) +} + +func TestUnmarshalConfigDeprecatedAddressAndReaders(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config_deprecated_address_and_readers.yaml")) + require.NoError(t, err) + cfg := NewFactory().CreateDefaultConfig() + assert.NoError(t, cm.Unmarshal(&cfg)) + require.Len(t, cfg.(*Config).Metrics.Readers, 2) + assert.Equal(t, "localhost", *cfg.(*Config).Metrics.Readers[0].Pull.Exporter.Prometheus.Host) + assert.Equal(t, 9999, *cfg.(*Config).Metrics.Readers[0].Pull.Exporter.Prometheus.Port) + assert.Equal(t, "localhost", *cfg.(*Config).Metrics.Readers[1].Pull.Exporter.Prometheus.Host) + assert.Equal(t, 6666, *cfg.(*Config).Metrics.Readers[1].Pull.Exporter.Prometheus.Port) +} + +func TestConfigValidate(t *testing.T) { tests := []struct { name string cfg *Config @@ -22,8 +67,13 @@ func TestLoadConfig(t *testing.T) { name: "basic metric telemetry", cfg: &Config{ Metrics: MetricsConfig{ - Level: configtelemetry.LevelBasic, - Address: "127.0.0.1:3333", + Level: configtelemetry.LevelBasic, + Readers: []config.MetricReader{{ + Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: &config.Prometheus{ + Host: newPtr("127.0.0.1"), + Port: newPtr(3333), + }}}}, + }, }, }, success: true, @@ -33,23 +83,11 @@ func TestLoadConfig(t *testing.T) { cfg: &Config{ Metrics: MetricsConfig{ Level: configtelemetry.LevelBasic, - Address: "", + Readers: nil, }, }, success: false, }, - { - name: "valid metric telemetry with metric readers", - cfg: &Config{ - Metrics: MetricsConfig{ - Level: configtelemetry.LevelBasic, - Readers: []config.MetricReader{ - {Pull: &config.PullMetricReader{}}, - }, - }, - }, - success: true, - }, } for _, tt := range tests { diff --git a/service/telemetry/factory.go b/service/telemetry/factory.go index 826c9cc8f74..72dda72860a 100644 --- a/service/telemetry/factory.go +++ b/service/telemetry/factory.go @@ -7,6 +7,7 @@ import ( "context" "time" + "go.opentelemetry.io/contrib/config" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -46,8 +47,13 @@ func createDefaultConfig() component.Config { InitialFields: map[string]any(nil), }, Metrics: MetricsConfig{ - Level: configtelemetry.LevelNormal, - Address: ":8888", + Level: configtelemetry.LevelNormal, + Readers: []config.MetricReader{{ + Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: &config.Prometheus{ + Host: newPtr(""), + Port: newPtr(8888), + }}}}, + }, }, } } @@ -80,3 +86,7 @@ func NewFactory() Factory { }), ) } + +func newPtr[T int | string](str T) *T { + return &str +} diff --git a/service/telemetry/metrics.go b/service/telemetry/metrics.go index 76b0a758b5a..a3be9169c7c 100644 --- a/service/telemetry/metrics.go +++ b/service/telemetry/metrics.go @@ -5,12 +5,9 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry" import ( "context" - "net" "net/http" - "strconv" "sync" - "go.opentelemetry.io/contrib/config" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" sdkmetric "go.opentelemetry.io/otel/sdk/metric" @@ -40,34 +37,10 @@ type meterProviderSettings struct { } func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (metric.MeterProvider, error) { - if set.cfg.Level == configtelemetry.LevelNone || (set.cfg.Address == "" && len(set.cfg.Readers) == 0) { + if set.cfg.Level == configtelemetry.LevelNone || len(set.cfg.Readers) == 0 { return noop.NewMeterProvider(), nil } - if len(set.cfg.Address) != 0 { - host, port, err := net.SplitHostPort(set.cfg.Address) - if err != nil { - return nil, err - } - portInt, err := strconv.Atoi(port) - if err != nil { - return nil, err - } - if set.cfg.Readers == nil { - set.cfg.Readers = []config.MetricReader{} - } - set.cfg.Readers = append(set.cfg.Readers, config.MetricReader{ - Pull: &config.PullMetricReader{ - Exporter: config.MetricExporter{ - Prometheus: &config.Prometheus{ - Host: &host, - Port: &portInt, - }, - }, - }, - }) - } - mp := &meterProvider{} var opts []sdkmetric.Option for _, reader := range set.cfg.Readers { diff --git a/service/telemetry/metrics_test.go b/service/telemetry/metrics_test.go index b4e0c24be70..aa30de898ba 100644 --- a/service/telemetry/metrics_test.go +++ b/service/telemetry/metrics_test.go @@ -17,7 +17,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/internal/testutil" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" "go.opentelemetry.io/collector/service/internal/promtest" "go.opentelemetry.io/collector/service/internal/resource" @@ -208,8 +207,10 @@ func TestTelemetryInit(t *testing.T) { semconv.AttributeServiceInstanceID: &testInstanceID, }, Metrics: MetricsConfig{ - Level: configtelemetry.LevelDetailed, - Address: testutil.GetAvailableLocalAddress(t), + Level: configtelemetry.LevelDetailed, + Readers: []config.MetricReader{{ + Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: promtest.GetAvailableLocalAddressPrometheus(t)}}, + }}, }, } } @@ -276,5 +277,4 @@ func getMetricsFromPrometheus(t *testing.T, handler http.Handler) map[string]*io require.NoError(t, err) return parsed - } diff --git a/service/telemetry/telemetry_test.go b/service/telemetry/telemetry_test.go index 25c42d01830..0d6507e9dcd 100644 --- a/service/telemetry/telemetry_test.go +++ b/service/telemetry/telemetry_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/contrib/config" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -29,8 +30,13 @@ func TestTelemetryConfiguration(t *testing.T) { Encoding: "console", }, Metrics: MetricsConfig{ - Level: configtelemetry.LevelBasic, - Address: "127.0.0.1:3333", + Level: configtelemetry.LevelBasic, + Readers: []config.MetricReader{{ + Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: &config.Prometheus{ + Host: newPtr("127.0.0.1"), + Port: newPtr(3333), + }}}}, + }, }, }, success: true, @@ -42,8 +48,13 @@ func TestTelemetryConfiguration(t *testing.T) { Level: zapcore.DebugLevel, }, Metrics: MetricsConfig{ - Level: configtelemetry.LevelBasic, - Address: "127.0.0.1:3333", + Level: configtelemetry.LevelBasic, + Readers: []config.MetricReader{{ + Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: &config.Prometheus{ + Host: newPtr("127.0.0.1"), + Port: newPtr(3333), + }}}}, + }, }, }, success: false, diff --git a/service/telemetry/testdata/config_deprecated_address.yaml b/service/telemetry/testdata/config_deprecated_address.yaml new file mode 100644 index 00000000000..c52f2dfac28 --- /dev/null +++ b/service/telemetry/testdata/config_deprecated_address.yaml @@ -0,0 +1,5 @@ +logs: + level: "info" +metrics: + level: "basic" + address: ":6666" diff --git a/service/telemetry/testdata/config_deprecated_address_and_readers.yaml b/service/telemetry/testdata/config_deprecated_address_and_readers.yaml new file mode 100644 index 00000000000..b6a8651e1ef --- /dev/null +++ b/service/telemetry/testdata/config_deprecated_address_and_readers.yaml @@ -0,0 +1,11 @@ +logs: + level: "info" +metrics: + level: "basic" + address: "localhost:6666" + readers: + - pull: + exporter: + prometheus: + host: "localhost" + port: 9999 diff --git a/service/telemetry/testdata/config_invalid_deprecated_address.yaml b/service/telemetry/testdata/config_invalid_deprecated_address.yaml new file mode 100644 index 00000000000..fd81b91ad19 --- /dev/null +++ b/service/telemetry/testdata/config_invalid_deprecated_address.yaml @@ -0,0 +1,5 @@ +logs: + level: "info" +metrics: + level: "basic" + address: "1212:212121:2121"