Skip to content

Commit

Permalink
Deprecate service::telemetry::metrics::address
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Sep 17, 2024
1 parent e9a9046 commit d30111e
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 167 deletions.
38 changes: 9 additions & 29 deletions internal/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,20 @@ type portpair struct {
// provided that there is no race by some other code to grab the same port
// immediately.
func GetAvailableLocalAddress(t testing.TB) string {
return findAvailable(t, "tcp4")
}

// GetAvailableLocalIPv6Address is IPv6 version of GetAvailableLocalAddress.
func GetAvailableLocalIPv6Address(t testing.TB) string {
return findAvailable(t, "tcp6")
}

func findAvailable(t testing.TB, network string) string {
// Retry has been added for windows as net.Listen can return a port that is not actually available. Details can be
// found in https://github.com/docker/for-win/issues/3171 but to summarize Hyper-V will reserve ranges of ports
// which do not show up under the "netstat -ano" but can only be found by
// "netsh interface ipv4 show excludedportrange protocol=tcp". We'll use []exclusions to hold those ranges and
// retry if the port returned by GetAvailableLocalAddress falls in one of those them.
network := "tcp4"
var exclusions []portpair
portFound := false
if runtime.GOOS == "windows" {
exclusions = getExclusionsList(network, t)
}

var endpoint string
for !portFound {
endpoint = findAvailableAddress(network, t)
_, port, err := net.SplitHostPort(endpoint)
require.NoError(t, err)
portFound = true
if runtime.GOOS == "windows" {
for _, pair := range exclusions {
if port >= pair.first && port <= pair.last {
portFound = false
break
}
}
}
}

return endpoint
}

// GetAvailableLocalIPv6Address is IPv6 version of GetAvailableLocalAddress.
func GetAvailableLocalIPv6Address(t testing.TB) string {
network := "tcp6"
var exclusions []portpair
portFound := false
if runtime.GOOS == "windows" {
Expand Down
18 changes: 16 additions & 2 deletions otelcol/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/contrib/config"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -274,8 +275,17 @@ func generateConfig() *Config {
InitialFields: map[string]any{"fieldKey": "filed-value"},
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelNormal,
Address: ":8080",
Level: configtelemetry.LevelNormal,
Readers: []config.MetricReader{
{
Pull: &config.PullMetricReader{Exporter: config.MetricExporter{
Prometheus: &config.Prometheus{
Host: newPtr("localhost"),
Port: newPtr(8080),
},
}},
},
},
},
},
Extensions: []component.ID{component.MustNewID("nop")},
Expand All @@ -289,3 +299,7 @@ func generateConfig() *Config {
},
}
}

func newPtr[T int | string](str T) *T {
return &str
}
12 changes: 9 additions & 3 deletions service/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/contrib/config"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestConfigValidate(t *testing.T) {
cfgFn: func() *Config {
cfg := generateConfig()
cfg.Telemetry.Metrics.Level = configtelemetry.LevelBasic
cfg.Telemetry.Metrics.Address = ""
cfg.Telemetry.Metrics.Readers = nil
return cfg
},
expected: nil,
Expand Down Expand Up @@ -95,8 +96,13 @@ func generateConfig() *Config {
InitialFields: map[string]any{"fieldKey": "filed-value"},
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelNormal,
Address: ":8080",
Level: configtelemetry.LevelNormal,
Readers: []config.MetricReader{{
Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: &config.Prometheus{
Host: newPtr("localhost"),
Port: newPtr(8080),
}}}},
},
},
},
Extensions: extensions.Config{component.MustNewID("nop")},
Expand Down
86 changes: 11 additions & 75 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,78 +256,6 @@ func TestServiceTelemetryCleanupOnError(t *testing.T) {
}

func TestServiceTelemetry(t *testing.T) {
for _, tc := range ownMetricsTestCases() {
t.Run(fmt.Sprintf("ipv4_%s", tc.name), func(t *testing.T) {
testCollectorStartHelper(t, tc, "tcp4")
})
t.Run(fmt.Sprintf("ipv6_%s", tc.name), func(t *testing.T) {
testCollectorStartHelper(t, tc, "tcp6")
})
}
}

func testCollectorStartHelper(t *testing.T, tc ownMetricsTestCase, network string) {
var once sync.Once
loggingHookCalled := false
hook := func(zapcore.Entry) error {
once.Do(func() {
loggingHookCalled = true
})
return nil
}

var (
metricsAddr string
zpagesAddr string
)
switch network {
case "tcp", "tcp4":
metricsAddr = testutil.GetAvailableLocalAddress(t)
zpagesAddr = testutil.GetAvailableLocalAddress(t)
case "tcp6":
metricsAddr = testutil.GetAvailableLocalIPv6Address(t)
zpagesAddr = testutil.GetAvailableLocalIPv6Address(t)
}
require.NotZero(t, metricsAddr, "network must be either of tcp, tcp4 or tcp6")
require.NotZero(t, zpagesAddr, "network must be either of tcp, tcp4 or tcp6")

set := newNopSettings()
set.BuildInfo = component.BuildInfo{Version: "test version", Command: otelCommand}
set.ExtensionsConfigs = map[component.ID]component.Config{
component.MustNewID("zpages"): &zpagesextension.Config{
ServerConfig: confighttp.ServerConfig{Endpoint: zpagesAddr},
},
}
set.ExtensionsFactories = map[component.Type]extension.Factory{component.MustNewType("zpages"): zpagesextension.NewFactory()}
set.LoggingOptions = []zap.Option{zap.Hooks(hook)}

cfg := newNopConfig()
cfg.Extensions = []component.ID{component.MustNewID("zpages")}
cfg.Telemetry.Metrics.Address = metricsAddr
cfg.Telemetry.Resource = make(map[string]*string)
// Include resource attributes under the service::telemetry::resource key.
for k, v := range tc.userDefinedResource {
cfg.Telemetry.Resource[k] = v
}

// Create a service, check for metrics, shutdown and repeat to ensure that telemetry can be started/shutdown and started again.
for i := 0; i < 2; i++ {
srv, err := New(context.Background(), set, cfg)
require.NoError(t, err)

require.NoError(t, srv.Start(context.Background()))
// Sleep for 1 second to ensure the http server is started.
time.Sleep(1 * time.Second)
assert.True(t, loggingHookCalled)

assertResourceLabels(t, srv.telemetrySettings.Resource, tc.expectedLabels)
assertMetrics(t, metricsAddr, tc.expectedLabels)
assertZPages(t, zpagesAddr)
require.NoError(t, srv.Shutdown(context.Background()))
}
}

func TestServiceTelemetryWithReaders(t *testing.T) {
for _, tc := range ownMetricsTestCases() {
t.Run(fmt.Sprintf("ipv4_%s", tc.name), func(t *testing.T) {
testCollectorStartHelperWithReaders(t, tc, "tcp4")
Expand Down Expand Up @@ -375,7 +303,6 @@ func testCollectorStartHelperWithReaders(t *testing.T, tc ownMetricsTestCase, ne

cfg := newNopConfig()
cfg.Extensions = []component.ID{component.MustNewID("zpages")}
cfg.Telemetry.Metrics.Address = ""
cfg.Telemetry.Metrics.Readers = []config.MetricReader{
{
Pull: &config.PullMetricReader{
Expand Down Expand Up @@ -711,8 +638,13 @@ func newNopConfigPipelineConfigs(pipelineCfgs pipelines.Config) Config {
InitialFields: map[string]any(nil),
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelBasic,
Address: "localhost:8888",
Level: configtelemetry.LevelBasic,
Readers: []config.MetricReader{{
Pull: &config.PullMetricReader{Exporter: config.MetricExporter{Prometheus: &config.Prometheus{
Host: newPtr("localhost"),
Port: newPtr(8080),
}}}},
},
},
},
}
Expand Down Expand Up @@ -744,3 +676,7 @@ func newConfigWatcherExtensionFactory(name component.Type) extension.Factory {
component.StabilityLevelDevelopment,
)
}

func newPtr[T int | string](str T) *T {
return &str
}
57 changes: 53 additions & 4 deletions service/telemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,27 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"fmt"
"net"
"strconv"
"time"

"go.opentelemetry.io/contrib/config"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/featuregate"
)

var _ confmap.Unmarshaler = (*Config)(nil)

var disableAddressFieldForInternalTelemetryFeatureGate = featuregate.GlobalRegistry().MustRegister(
"telemetry.disableAddressFieldForInternalTelemetry",
featuregate.StageAlpha,
featuregate.WithRegisterFromVersion("v0.110.0"),
featuregate.WithRegisterToVersion("v0.110.0"),
featuregate.WithRegisterDescription("controls whether the deprecated address field for internal telemetry is still supported"))

// Config defines the configurable settings for service telemetry.
type Config struct {
Logs LogsConfig `mapstructure:"logs"`
Expand Down Expand Up @@ -119,7 +132,7 @@ type MetricsConfig struct {
// - "detailed" adds dimensions and views to the previous levels.
Level configtelemetry.Level `mapstructure:"level"`

// Address is the [address]:port that metrics exposition should be bound to.
// Deprecated: [v0.110.0] use readers configuration.
Address string `mapstructure:"address"`

// Readers allow configuration of metric readers to emit metrics to
Expand All @@ -143,11 +156,47 @@ type TracesConfig struct {
Processors []config.SpanProcessor `mapstructure:"processors"`
}

func (c *Config) Unmarshal(conf *confmap.Conf) error {
if err := conf.Unmarshal(c); err != nil {
return err
}
if disableAddressFieldForInternalTelemetryFeatureGate.IsEnabled() {
if len(c.Metrics.Address) != 0 {
host, port, err := net.SplitHostPort(c.Metrics.Address)
if err != nil {
return fmt.Errorf("failing to parse metrics address %q: %w", c.Metrics.Address, err)
}
portInt, err := strconv.Atoi(port)
if err != nil {
return fmt.Errorf("failing to extract the port from the metrics address %q: %w", c.Metrics.Address, err)
}

// User did not overwrite readers, so we will remove the default configured reader.
if !conf.IsSet("metrics::readers") {
c.Metrics.Readers = nil
}

c.Metrics.Readers = append(c.Metrics.Readers, config.MetricReader{
Pull: &config.PullMetricReader{
Exporter: config.MetricExporter{
Prometheus: &config.Prometheus{
Host: &host,
Port: &portInt,
},
},
},
})
}

}
return nil
}

// Validate checks whether the current configuration is valid
func (c *Config) Validate() error {
// Check when service telemetry metric level is not none, the metrics address should not be empty
if c.Metrics.Level != configtelemetry.LevelNone && c.Metrics.Address == "" && len(c.Metrics.Readers) == 0 {
return fmt.Errorf("collector telemetry metric address or reader should exist when metric level is not none")
// Check when service telemetry metric level is not none, the metrics readers should not be empty
if c.Metrics.Level != configtelemetry.LevelNone && len(c.Metrics.Readers) == 0 {
return fmt.Errorf("collector telemetry metrics reader should exist when metric level is not none")
}

return nil
Expand Down
Loading

0 comments on commit d30111e

Please sign in to comment.