Skip to content

Commit

Permalink
[chore] Move metrics initialization in service/telemetry
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Sep 17, 2024
1 parent 1339c01 commit ae7b9cb
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 73 deletions.
37 changes: 37 additions & 0 deletions service/internal/promtest/server_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package promtest // import "go.opentelemetry.io/collector/service/internal/promtest"

import (
"net"
"strconv"
"testing"

"go.opentelemetry.io/contrib/config"

"go.opentelemetry.io/collector/internal/testutil"
)

func GetAvailableLocalIPv6AddressPrometheus(t testing.TB) *config.Prometheus {
return addrToPrometheus(testutil.GetAvailableLocalIPv6Address(t))
}

func GetAvailableLocalAddressPrometheus(t testing.TB) *config.Prometheus {
return addrToPrometheus(testutil.GetAvailableLocalAddress(t))
}

func addrToPrometheus(address string) *config.Prometheus {
host, port, err := net.SplitHostPort(address)
if err != nil {
return nil

Check warning on line 27 in service/internal/promtest/server_util.go

View check run for this annotation

Codecov / codecov/patch

service/internal/promtest/server_util.go#L27

Added line #L27 was not covered by tests
}
portInt, err := strconv.Atoi(port)
if err != nil {
return nil

Check warning on line 31 in service/internal/promtest/server_util.go

View check run for this annotation

Codecov / codecov/patch

service/internal/promtest/server_util.go#L31

Added line #L31 was not covered by tests
}
return &config.Prometheus{
Host: &host,
Port: &portInt,
}
}
25 changes: 2 additions & 23 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ var useOtelWithSDKConfigurationForInternalTelemetryFeatureGate = featuregate.Glo
featuregate.WithRegisterDescription("controls whether the collector supports extended OpenTelemetry"+
"configuration for internal telemetry"))

// disableHighCardinalityMetricsfeatureGate is the feature gate that controls whether the collector should enable
// potentially high cardinality metrics. The gate will be removed when the collector allows for view configuration.
var disableHighCardinalityMetricsfeatureGate = featuregate.GlobalRegistry().MustRegister(
"telemetry.disableHighCardinalityMetrics",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("controls whether the collector should enable potentially high"+
"cardinality metrics. The gate will be removed when the collector allows for view configuration."))

// Settings holds configuration for building a new Service.
type Settings struct {
// BuildInfo provides collector start information.
Expand Down Expand Up @@ -104,8 +96,6 @@ type Service struct {

// New creates a new Service, its telemetry, and Components.
func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
disableHighCard := disableHighCardinalityMetricsfeatureGate.IsEnabled()

srv := &Service{
buildInfo: set.BuildInfo,
host: &graph.Host{
Expand Down Expand Up @@ -144,14 +134,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {

logger.Info("Setting up own telemetry...")

mp, err := newMeterProvider(
meterProviderSettings{
res: res,
cfg: cfg.Telemetry.Metrics,
asyncErrorChannel: set.AsyncErrorChannel,
},
disableHighCard,
)
mp, err := telFactory.CreateMeterProvider(ctx, telset, &cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to create metric provider: %w", err)
}
Expand Down Expand Up @@ -198,11 +181,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {

func logsAboutMeterProvider(logger *zap.Logger, cfg telemetry.MetricsConfig, mp metric.MeterProvider) {
if cfg.Level == configtelemetry.LevelNone || (cfg.Address == "" && len(cfg.Readers) == 0) {
logger.Info(
"Skipped telemetry setup.",
zap.String(zapKeyTelemetryAddress, cfg.Address),
zap.Stringer(zapKeyTelemetryLevel, cfg.Level),
)
logger.Info("Skipped telemetry setup.")

Check warning on line 184 in service/service.go

View check run for this annotation

Codecov / codecov/patch

service/service.go#L184

Added line #L184 was not covered by tests
return
}

Expand Down
30 changes: 3 additions & 27 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"sync"
"testing"
Expand All @@ -35,6 +33,7 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/promtest"
"go.opentelemetry.io/collector/service/pipelines"
"go.opentelemetry.io/collector/service/telemetry"
)
Expand Down Expand Up @@ -355,10 +354,10 @@ func testCollectorStartHelperWithReaders(t *testing.T, tc ownMetricsTestCase, ne
)
switch network {
case "tcp", "tcp4":
metricsAddr = getAvailableLocalAddressPrometheus(t)
metricsAddr = promtest.GetAvailableLocalAddressPrometheus(t)
zpagesAddr = testutil.GetAvailableLocalAddress(t)
case "tcp6":
metricsAddr = getAvailableLocalIPv6AddressPrometheus(t)
metricsAddr = promtest.GetAvailableLocalIPv6AddressPrometheus(t)
zpagesAddr = testutil.GetAvailableLocalIPv6Address(t)
}
require.NotZero(t, metricsAddr, "network must be either of tcp, tcp4 or tcp6")
Expand Down Expand Up @@ -745,26 +744,3 @@ func newConfigWatcherExtensionFactory(name component.Type) extension.Factory {
component.StabilityLevelDevelopment,
)
}

func getAvailableLocalIPv6AddressPrometheus(t testing.TB) *config.Prometheus {
return addrToPrometheus(testutil.GetAvailableLocalIPv6Address(t))
}

func getAvailableLocalAddressPrometheus(t testing.TB) *config.Prometheus {
return addrToPrometheus(testutil.GetAvailableLocalAddress(t))
}

func addrToPrometheus(address string) *config.Prometheus {
host, port, err := net.SplitHostPort(address)
if err != nil {
return nil
}
portInt, err := strconv.Atoi(port)
if err != nil {
return nil
}
return &config.Prometheus{
Host: &host,
Port: &portInt,
}
}
23 changes: 23 additions & 0 deletions service/telemetry/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,26 @@ import (
"context"
"time"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/telemetry/internal"
)

// disableHighCardinalityMetricsfeatureGate is the feature gate that controls whether the collector should enable
// potentially high cardinality metrics. The gate will be removed when the collector allows for view configuration.
var disableHighCardinalityMetricsfeatureGate = featuregate.GlobalRegistry().MustRegister(
"telemetry.disableHighCardinalityMetrics",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("controls whether the collector should enable potentially high"+
"cardinality metrics. The gate will be removed when the collector allows for view configuration."))

func createDefaultConfig() component.Config {
return &Config{
Logs: LogsConfig{
Expand Down Expand Up @@ -55,5 +66,17 @@ func NewFactory() Factory {
c := *cfg.(*Config)
return newTracerProvider(ctx, set, c)
}),
internal.WithMeterProvider(func(_ context.Context, set Settings, cfg component.Config) (metric.MeterProvider, error) {
c := *cfg.(*Config)
disableHighCard := disableHighCardinalityMetricsfeatureGate.IsEnabled()
return newMeterProvider(
meterProviderSettings{
res: resource.New(set.BuildInfo, c.Resource),
cfg: c.Metrics,
asyncErrorChannel: set.AsyncErrorChannel,
},
disableHighCard,
)
}),
)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry"
package otelinit // import "go.opentelemetry.io/collector/service/telemetry/internal/otelinit"

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package proctelemetry
package otelinit

import (
"context"
Expand Down
13 changes: 6 additions & 7 deletions service/telemetry.go → service/telemetry/metrics.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package service // import "go.opentelemetry.io/collector/service"
package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"context"
Expand All @@ -19,8 +19,7 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/telemetry"
"go.opentelemetry.io/collector/service/telemetry/internal/otelinit"
)

const (
Expand All @@ -36,7 +35,7 @@ type meterProvider struct {

type meterProviderSettings struct {
res *resource.Resource
cfg telemetry.MetricsConfig
cfg MetricsConfig
asyncErrorChannel chan error
}

Expand Down Expand Up @@ -73,7 +72,7 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
var opts []sdkmetric.Option
for _, reader := range set.cfg.Readers {
// https://github.com/open-telemetry/opentelemetry-collector/issues/8045
r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, set.asyncErrorChannel, &mp.serverWG)
r, server, err := otelinit.InitMetricReader(context.Background(), reader, set.asyncErrorChannel, &mp.serverWG)
if err != nil {
return nil, err
}
Expand All @@ -85,15 +84,15 @@ func newMeterProvider(set meterProviderSettings, disableHighCardinality bool) (m
}

var err error
mp.MeterProvider, err = proctelemetry.InitOpenTelemetry(set.res, opts, disableHighCardinality)
mp.MeterProvider, err = otelinit.InitOpenTelemetry(set.res, opts, disableHighCardinality)
if err != nil {
return nil, err
}
return mp, nil
}

// LogAboutServers logs about the servers that are serving metrics.
func (mp *meterProvider) LogAboutServers(logger *zap.Logger, cfg telemetry.MetricsConfig) {
func (mp *meterProvider) LogAboutServers(logger *zap.Logger, cfg MetricsConfig) {
for _, server := range mp.servers {
logger.Info(
"Serving metrics",
Expand Down
30 changes: 16 additions & 14 deletions service/telemetry_test.go → service/telemetry/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package service
package telemetry

import (
"context"
Expand All @@ -19,9 +19,9 @@ import (
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/testutil"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/internal/promtest"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/telemetry"
"go.opentelemetry.io/collector/service/telemetry/internal/otelinit"
)

const (
Expand All @@ -32,6 +32,8 @@ const (
counterName = "test_counter"
)

var testInstanceID = "test_instance_id"

func TestTelemetryInit(t *testing.T) {
type metricValue struct {
value float64
Expand All @@ -43,7 +45,7 @@ func TestTelemetryInit(t *testing.T) {
disableHighCard bool
expectedMetrics map[string]metricValue
extendedConfig bool
cfg *telemetry.Config
cfg *Config
}{
{
name: "UseOpenTelemetryForInternalMetrics",
Expand Down Expand Up @@ -128,11 +130,11 @@ func TestTelemetryInit(t *testing.T) {
{
name: "UseOTelWithSDKConfiguration",
extendedConfig: true,
cfg: &telemetry.Config{
Metrics: telemetry.MetricsConfig{
cfg: &Config{
Metrics: MetricsConfig{
Level: configtelemetry.LevelDetailed,
},
Traces: telemetry.TracesConfig{
Traces: TracesConfig{
Processors: []config.SpanProcessor{
{
Batch: &config.BatchSpanProcessor{
Expand Down Expand Up @@ -194,18 +196,18 @@ func TestTelemetryInit(t *testing.T) {
{
Pull: &config.PullMetricReader{
Exporter: config.MetricExporter{
Prometheus: getAvailableLocalAddressPrometheus(t),
Prometheus: promtest.GetAvailableLocalAddressPrometheus(t),
},
},
},
}
}
if tc.cfg == nil {
tc.cfg = &telemetry.Config{
tc.cfg = &Config{
Resource: map[string]*string{
semconv.AttributeServiceInstanceID: &testInstanceID,
},
Metrics: telemetry.MetricsConfig{
Metrics: MetricsConfig{
Level: configtelemetry.LevelDetailed,
Address: testutil.GetAvailableLocalAddress(t),
},
Expand Down Expand Up @@ -253,13 +255,13 @@ func createTestMetrics(t *testing.T, mp metric.MeterProvider) {
require.NoError(t, err)
counter.Add(context.Background(), 13)

grpcExampleCounter, err := mp.Meter(proctelemetry.GRPCInstrumentation).Int64Counter(metricPrefix + grpcPrefix + counterName)
grpcExampleCounter, err := mp.Meter(otelinit.GRPCInstrumentation).Int64Counter(metricPrefix + grpcPrefix + counterName)
require.NoError(t, err)
grpcExampleCounter.Add(context.Background(), 11, metric.WithAttributes(proctelemetry.GRPCUnacceptableKeyValues...))
grpcExampleCounter.Add(context.Background(), 11, metric.WithAttributes(otelinit.GRPCUnacceptableKeyValues...))

httpExampleCounter, err := mp.Meter(proctelemetry.HTTPInstrumentation).Int64Counter(metricPrefix + httpPrefix + counterName)
httpExampleCounter, err := mp.Meter(otelinit.HTTPInstrumentation).Int64Counter(metricPrefix + httpPrefix + counterName)
require.NoError(t, err)
httpExampleCounter.Add(context.Background(), 10, metric.WithAttributes(proctelemetry.HTTPUnacceptableKeyValues...))
httpExampleCounter.Add(context.Background(), 10, metric.WithAttributes(otelinit.HTTPUnacceptableKeyValues...))
}

func getMetricsFromPrometheus(t *testing.T, handler http.Handler) map[string]*io_prometheus_client.MetricFamily {
Expand Down

0 comments on commit ae7b9cb

Please sign in to comment.