From c239e73bbb83aec650866e15dcae3792d9b7279d Mon Sep 17 00:00:00 2001 From: Dmitrii Anoshin Date: Tue, 23 Jul 2024 14:02:54 -0700 Subject: [PATCH 1/2] [exporterhelper] Add data_type attribute to internal queue metrics (#10593) Add `data_type` attribute to the internal otelcol_exporter_queue_size metric to report the type of data being processed. All other metrics have the data type reported as part of their names. We could've done the same for queue metrics, but that would introduce a significant breaking change. We want to avoid that until we have all the metrics standardized with OpenTelemetry semantic conventions. Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/9943 --- .chloggen/componenttest-extra-attributes.yaml | 18 +++++++ ...per-report-data-type-in-queue-metrics.yaml | 20 +++++++ component/componenttest/obsreporttest.go | 7 ++- .../componenttest/otelprometheuschecker.go | 6 +-- exporter/exporterhelper/common.go | 6 +-- exporter/exporterhelper/obsexporter.go | 4 +- exporter/exporterhelper/obsreport_test.go | 1 - exporter/exporterhelper/queue_sender.go | 25 ++++----- exporter/exporterhelper/queue_sender_test.go | 53 +++++++++++-------- .../obsmetrics/obs_exporter.go | 3 ++ 10 files changed, 98 insertions(+), 45 deletions(-) create mode 100644 .chloggen/componenttest-extra-attributes.yaml create mode 100644 .chloggen/exporterhelper-report-data-type-in-queue-metrics.yaml diff --git a/.chloggen/componenttest-extra-attributes.yaml b/.chloggen/componenttest-extra-attributes.yaml new file mode 100644 index 00000000000..3683ca634b2 --- /dev/null +++ b/.chloggen/componenttest-extra-attributes.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: component/componenttest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add optional ...attribute.KeyValue argument to TestTelemetry.CheckExporterMetricGauge. + +# One or more tracking issues or pull requests related to the change +issues: [10593] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/exporterhelper-report-data-type-in-queue-metrics.yaml b/.chloggen/exporterhelper-report-data-type-in-queue-metrics.yaml new file mode 100644 index 00000000000..e4065d6e4bc --- /dev/null +++ b/.chloggen/exporterhelper-report-data-type-in-queue-metrics.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add data_type attribute to `otelcol_exporter_queue_size` metric to report the type of data being processed. + +# One or more tracking issues or pull requests related to the change +issues: [9943] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/component/componenttest/obsreporttest.go b/component/componenttest/obsreporttest.go index ba076c65905..652db62529f 100644 --- a/component/componenttest/obsreporttest.go +++ b/component/componenttest/obsreporttest.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel/attribute" otelprom "go.opentelemetry.io/otel/exporters/prometheus" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" @@ -72,8 +73,10 @@ func (tts *TestTelemetry) CheckExporterLogs(sentLogRecords, sendFailedLogRecords return tts.prometheusChecker.checkExporterLogs(tts.id, sentLogRecords, sendFailedLogRecords) } -func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) error { - return tts.prometheusChecker.checkExporterMetricGauge(tts.id, metric, val) +func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, extraAttrs ...attribute.KeyValue) error { + attrs := attributesForExporterMetrics(tts.id) + attrs = append(attrs, extraAttrs...) + return tts.prometheusChecker.checkGauge(metric, val, attrs) } // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. diff --git a/component/componenttest/otelprometheuschecker.go b/component/componenttest/otelprometheuschecker.go index f7a0e548dfe..6a63617c206 100644 --- a/component/componenttest/otelprometheuschecker.go +++ b/component/componenttest/otelprometheuschecker.go @@ -100,10 +100,8 @@ func (pc *prometheusChecker) checkExporterEnqueueFailed(exporter component.ID, d return pc.checkCounter(fmt.Sprintf("exporter_enqueue_failed_%s", datatype), enqueueFailed, exporterAttrs) } -func (pc *prometheusChecker) checkExporterMetricGauge(exporter component.ID, metric string, val int64) error { - exporterAttrs := attributesForExporterMetrics(exporter) - - ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, exporterAttrs) +func (pc *prometheusChecker) checkGauge(metric string, val int64, attrs []attribute.KeyValue) error { + ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, attrs) if err != nil { return err } diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 4016c14e7c9..8d68c06bed7 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -110,7 +110,7 @@ func WithQueue(config QueueSettings) Option { NumConsumers: config.NumConsumers, QueueSize: config.QueueSize, }) - o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder) + o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep) return nil } } @@ -132,7 +132,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto DataType: o.signal, ExporterSettings: o.set, } - o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder) + o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep) return nil } } @@ -250,7 +250,7 @@ type baseExporter struct { } func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { - obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}) + obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set, DataType: signal}) if err != nil { return nil, err } diff --git a/exporter/exporterhelper/obsexporter.go b/exporter/exporterhelper/obsexporter.go index d5d366ccabe..70cc0642c47 100644 --- a/exporter/exporterhelper/obsexporter.go +++ b/exporter/exporterhelper/obsexporter.go @@ -26,6 +26,7 @@ type ObsReport struct { level configtelemetry.Level spanNamePrefix string tracer trace.Tracer + dataType component.DataType otelAttrs []attribute.KeyValue telemetryBuilder *metadata.TelemetryBuilder @@ -38,6 +39,7 @@ type ObsReport struct { type ObsReportSettings struct { ExporterID component.ID ExporterCreateSettings exporter.Settings + DataType component.DataType } // NewObsReport creates a new Exporter. @@ -58,7 +60,7 @@ func newExporter(cfg ObsReportSettings) (*ObsReport, error) { level: cfg.ExporterCreateSettings.TelemetrySettings.MetricsLevel, spanNamePrefix: obsmetrics.ExporterPrefix + cfg.ExporterID.String(), tracer: cfg.ExporterCreateSettings.TracerProvider.Tracer(cfg.ExporterID.String()), - + dataType: cfg.DataType, otelAttrs: []attribute.KeyValue{ attribute.String(obsmetrics.ExporterKey, cfg.ExporterID.String()), }, diff --git a/exporter/exporterhelper/obsreport_test.go b/exporter/exporterhelper/obsreport_test.go index daafd422e61..d7c4795615b 100644 --- a/exporter/exporterhelper/obsreport_test.go +++ b/exporter/exporterhelper/obsreport_test.go @@ -15,7 +15,6 @@ import ( ) func TestExportEnqueueFailure(t *testing.T) { - exporterID := component.MustNewID("fakeExporter") tt, err := componenttest.SetupTelemetry(exporterID) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 88d17a7c03f..d7166335557 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/internal/queue" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" @@ -74,18 +73,18 @@ type queueSender struct { traceAttribute attribute.KeyValue consumers *queue.Consumers[Request] - telemetryBuilder *metadata.TelemetryBuilder - exporterID component.ID + obsrep *ObsReport + exporterID component.ID } func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numConsumers int, - exportFailureMessage string, telemetryBuilder *metadata.TelemetryBuilder) *queueSender { + exportFailureMessage string, obsrep *ObsReport) *queueSender { qs := &queueSender{ - queue: q, - numConsumers: numConsumers, - traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), - telemetryBuilder: telemetryBuilder, - exporterID: set.ID, + queue: q, + numConsumers: numConsumers, + traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()), + obsrep: obsrep, + exporterID: set.ID, } consumeFunc := func(ctx context.Context, req Request) error { err := qs.nextSender.send(ctx, req) @@ -105,10 +104,12 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error { return err } - opts := metric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.exporterID.String()))) + dataTypeAttr := attribute.String(obsmetrics.DataTypeKey, qs.obsrep.dataType.String()) return multierr.Append( - qs.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, opts), - qs.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, opts), + qs.obsrep.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, + metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))), + qs.obsrep.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, + metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))), ) } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index ff9aa7c5f74..bd783512d3b 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" @@ -18,10 +19,10 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal/queue" + "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) func TestQueuedRetry_StopWhileWaiting(t *testing.T) { @@ -202,28 +203,33 @@ func TestQueuedRetryHappyPath(t *testing.T) { }) } } -func TestQueuedRetry_QueueMetricsReported(t *testing.T) { - tt, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - qCfg := NewDefaultQueueSettings() - qCfg.NumConsumers = 0 // to make every request go straight to the queue - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultDataType, newObservabilityConsumerSender, - withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize))) +func TestQueuedRetry_QueueMetricsReported(t *testing.T) { + dataTypes := []component.DataType{component.DataTypeLogs, component.DataTypeTraces, component.DataTypeMetrics} + for _, dataType := range dataTypes { + tt, err := componenttest.SetupTelemetry(defaultID) + require.NoError(t, err) + + qCfg := NewDefaultQueueSettings() + qCfg.NumConsumers = 0 // to make every request go straight to the queue + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} + be, err := newBaseExporter(set, dataType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_capacity", int64(defaultQueueSize))) + + for i := 0; i < 7; i++ { + require.NoError(t, be.send(context.Background(), newErrorRequest())) + } + require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7), + attribute.String(obsmetrics.DataTypeKey, dataType.String()))) - for i := 0; i < 7; i++ { - require.NoError(t, be.send(context.Background(), newErrorRequest())) + assert.NoError(t, be.Shutdown(context.Background())) } - require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7))) - - assert.NoError(t, be.Shutdown(context.Background())) } func TestNoCancellationContext(t *testing.T) { @@ -426,9 +432,12 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { func TestQueueSenderNoStartShutdown(t *testing.T) { queue := queue.NewBoundedMemoryQueue[Request](queue.MemoryQueueSettings[Request]{}) set := exportertest.NewNopSettings() - builder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + obsrep, err := NewObsReport(ObsReportSettings{ + ExporterID: exporterID, + ExporterCreateSettings: exportertest.NewNopSettings(), + }) assert.NoError(t, err) - qs := newQueueSender(queue, set, 1, "", builder) + qs := newQueueSender(queue, set, 1, "", obsrep) assert.NoError(t, qs.Shutdown(context.Background())) } diff --git a/internal/obsreportconfig/obsmetrics/obs_exporter.go b/internal/obsreportconfig/obsmetrics/obs_exporter.go index b87270f5e5a..ffaa33a54a4 100644 --- a/internal/obsreportconfig/obsmetrics/obs_exporter.go +++ b/internal/obsreportconfig/obsmetrics/obs_exporter.go @@ -7,6 +7,9 @@ const ( // ExporterKey used to identify exporters in metrics and traces. ExporterKey = "exporter" + // DataTypeKey used to identify the data type in the queue size metric. + DataTypeKey = "data_type" + // SentSpansKey used to track spans sent by exporters. SentSpansKey = "sent_spans" // FailedToSendSpansKey used to track spans that failed to be sent by exporters. From 4e44e32280e44bc4ff7f4057bcfa9ec7280502af Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Tue, 23 Jul 2024 15:59:53 -0600 Subject: [PATCH 2/2] [service] Remove getBallastSize from service (#10696) #### Description This PR removes all ballast logic from service. This effectively deprecates the ballastextension as including the extension with this service would do nothing. Related to https://github.com/open-telemetry/opentelemetry-collector/pull/10671 #### Link to tracking issue Closes https://github.com/open-telemetry/opentelemetry-collector/issues/8342 #### Testing Unit tests. --- .chloggen/service-remove-ballast-deps-2.yaml | 25 +++++ .chloggen/service-remove-ballast-deps-3.yaml | 25 +++++ .chloggen/service-remove-ballast-deps.yaml | 25 +++++ internal/memorylimiter/memorylimiter.go | 23 +--- internal/memorylimiter/memorylimiter_test.go | 54 --------- .../memorylimiter_test.go | 105 +----------------- .../proctelemetry/process_telemetry.go | 10 +- .../process_telemetry_linux_test.go | 2 +- .../proctelemetry/process_telemetry_test.go | 2 +- service/service.go | 11 +- 10 files changed, 87 insertions(+), 195 deletions(-) create mode 100644 .chloggen/service-remove-ballast-deps-2.yaml create mode 100644 .chloggen/service-remove-ballast-deps-3.yaml create mode 100644 .chloggen/service-remove-ballast-deps.yaml diff --git a/.chloggen/service-remove-ballast-deps-2.yaml b/.chloggen/service-remove-ballast-deps-2.yaml new file mode 100644 index 00000000000..e6377b387de --- /dev/null +++ b/.chloggen/service-remove-ballast-deps-2.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processor/memorylimiter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The memory limiter processor will no longer account for ballast size. + +# One or more tracking issues or pull requests related to the change +issues: [10696] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/service-remove-ballast-deps-3.yaml b/.chloggen/service-remove-ballast-deps-3.yaml new file mode 100644 index 00000000000..c07f90439f6 --- /dev/null +++ b/.chloggen/service-remove-ballast-deps-3.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: extension/memorylimiter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The memory limiter extension will no longer account for ballast size. + +# One or more tracking issues or pull requests related to the change +issues: [10696] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/service-remove-ballast-deps.yaml b/.chloggen/service-remove-ballast-deps.yaml new file mode 100644 index 00000000000..31ffa90c3ac --- /dev/null +++ b/.chloggen/service-remove-ballast-deps.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: The service will no longer be able to get a ballast size from the deprecated ballast extension. + +# One or more tracking issues or pull requests related to the change +issues: [10696] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: If you are already using GOMEMLIMIT instead of the ballast extension this does not affect you. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/internal/memorylimiter/memorylimiter.go b/internal/memorylimiter/memorylimiter.go index a010cba5427..db93795127a 100644 --- a/internal/memorylimiter/memorylimiter.go +++ b/internal/memorylimiter/memorylimiter.go @@ -44,7 +44,6 @@ type MemoryLimiter struct { usageChecker memUsageChecker memCheckWait time.Duration - ballastSize uint64 // mustRefuse is used to indicate when data should be refused. mustRefuse *atomic.Bool @@ -58,8 +57,7 @@ type MemoryLimiter struct { readMemStatsFn func(m *runtime.MemStats) // Fields used for logging. - logger *zap.Logger - configMismatchedLogged bool + logger *zap.Logger refCounterLock sync.Mutex refCounter int @@ -114,14 +112,7 @@ func (ml *MemoryLimiter) startMonitoring() { } } -func (ml *MemoryLimiter) Start(_ context.Context, host component.Host) error { - extensions := host.GetExtensions() - for _, extension := range extensions { - if ext, ok := extension.(interface{ GetBallastSize() uint64 }); ok { - ml.ballastSize = ext.GetBallastSize() - break - } - } +func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error { ml.startMonitoring() return nil } @@ -168,16 +159,6 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro func (ml *MemoryLimiter) readMemStats() *runtime.MemStats { ms := &runtime.MemStats{} ml.readMemStatsFn(ms) - // If proper configured ms.Alloc should be at least ml.ballastSize but since - // a misconfiguration is possible check for that here. - if ms.Alloc >= ml.ballastSize { - ms.Alloc -= ml.ballastSize - } else if !ml.configMismatchedLogged { - // This indicates misconfiguration. Log it once. - ml.configMismatchedLogged = true - ml.logger.Warn(`"size_mib" in ballast extension is likely incorrectly configured.`) - } - return ms } diff --git a/internal/memorylimiter/memorylimiter_test.go b/internal/memorylimiter/memorylimiter_test.go index e9e92a33f70..6919ce50968 100644 --- a/internal/memorylimiter/memorylimiter_test.go +++ b/internal/memorylimiter/memorylimiter_test.go @@ -4,17 +4,14 @@ package memorylimiter import ( - "context" "runtime" "sync/atomic" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/internal/iruntime" ) @@ -43,22 +40,6 @@ func TestMemoryPressureResponse(t *testing.T) { ml.CheckMemLimits() assert.True(t, ml.MustRefuse()) - // Check ballast effect - ml.ballastSize = 1000 - - // Below memAllocLimit accounting for ballast. - currentMemAlloc = 800 + ml.ballastSize - ml.CheckMemLimits() - assert.False(t, ml.MustRefuse()) - - // Above memAllocLimit even accounting for ballast. - currentMemAlloc = 1800 + ml.ballastSize - ml.CheckMemLimits() - assert.True(t, ml.MustRefuse()) - - // Restore ballast to default. - ml.ballastSize = 0 - // Check spike limit ml.usageChecker.memSpikeLimit = 512 @@ -151,38 +132,3 @@ func TestRefuseDecision(t *testing.T) { }) } } - -func TestBallastSize(t *testing.T) { - cfg := &Config{ - CheckInterval: 10 * time.Second, - MemoryLimitMiB: 1024, - } - got, err := NewMemoryLimiter(cfg, zap.NewNop()) - require.NoError(t, err) - - got.startMonitoring() - require.NoError(t, got.Start(context.Background(), &host{ballastSize: 113})) - assert.Equal(t, uint64(113), got.ballastSize) - require.NoError(t, got.Shutdown(context.Background())) -} - -type host struct { - ballastSize uint64 - component.Host -} - -func (h *host) GetExtensions() map[component.ID]component.Component { - ret := make(map[component.ID]component.Component) - ret[component.MustNewID("ballast")] = &ballastExtension{ballastSize: h.ballastSize} - return ret -} - -type ballastExtension struct { - ballastSize uint64 - component.StartFunc - component.ShutdownFunc -} - -func (be *ballastExtension) GetBallastSize() uint64 { - return be.ballastSize -} diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 1ce63d794f3..6172fa39889 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -122,7 +122,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { tests := []struct { name string mlCfg *Config - ballastSize uint64 memAlloc uint64 expectError bool }{ @@ -133,7 +132,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -144,29 +142,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, - memAlloc: 1800, - expectError: true, - }, - { - name: "Below memAllocLimit accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, - memAlloc: 800, - expectError: false, - }, - { - name: "Above memAllocLimit even accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, memAlloc: 1800, expectError: true, }, @@ -177,7 +152,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 10, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -188,7 +162,6 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 11, }, - ballastSize: 0, memAlloc: 800, expectError: true, }, @@ -197,7 +170,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { memorylimiter.GetMemoryFn = totalMemory memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { - ms.Alloc = tt.memAlloc + tt.ballastSize + ms.Alloc = tt.memAlloc } ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) @@ -213,7 +186,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { processorhelper.WithShutdown(ml.shutdown)) require.NoError(t, err) - assert.NoError(t, mp.Start(ctx, &host{ballastSize: tt.ballastSize})) + assert.NoError(t, mp.Start(ctx, &host{})) ml.memlimiter.CheckMemLimits() err = mp.ConsumeMetrics(ctx, md) if tt.expectError { @@ -239,7 +212,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { tests := []struct { name string mlCfg *Config - ballastSize uint64 memAlloc uint64 expectError bool }{ @@ -250,7 +222,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -261,29 +232,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, - memAlloc: 1800, - expectError: true, - }, - { - name: "Below memAllocLimit accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, - memAlloc: 800, - expectError: false, - }, - { - name: "Above memAllocLimit even accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, memAlloc: 1800, expectError: true, }, @@ -294,7 +242,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 10, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -305,7 +252,6 @@ func TestTraceMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 11, }, - ballastSize: 0, memAlloc: 800, expectError: true, }, @@ -314,7 +260,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { memorylimiter.GetMemoryFn = totalMemory memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { - ms.Alloc = tt.memAlloc + tt.ballastSize + ms.Alloc = tt.memAlloc } ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) @@ -330,7 +276,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { processorhelper.WithShutdown(ml.shutdown)) require.NoError(t, err) - assert.NoError(t, tp.Start(ctx, &host{ballastSize: tt.ballastSize})) + assert.NoError(t, tp.Start(ctx, &host{})) ml.memlimiter.CheckMemLimits() err = tp.ConsumeTraces(ctx, td) if tt.expectError { @@ -356,7 +302,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { tests := []struct { name string mlCfg *Config - ballastSize uint64 memAlloc uint64 expectError bool }{ @@ -367,7 +312,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -378,29 +322,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 1, }, - ballastSize: 0, - memAlloc: 1800, - expectError: true, - }, - { - name: "Below memAllocLimit accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, - memAlloc: 800, - expectError: false, - }, - { - name: "Above memAllocLimit even accounting for ballast", - mlCfg: &Config{ - CheckInterval: time.Second, - MemoryLimitPercentage: 50, - MemorySpikePercentage: 1, - }, - ballastSize: 1000, memAlloc: 1800, expectError: true, }, @@ -411,7 +332,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 10, }, - ballastSize: 0, memAlloc: 800, expectError: false, }, @@ -422,7 +342,6 @@ func TestLogMemoryPressureResponse(t *testing.T) { MemoryLimitPercentage: 50, MemorySpikePercentage: 11, }, - ballastSize: 0, memAlloc: 800, expectError: true, }, @@ -431,7 +350,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { memorylimiter.GetMemoryFn = totalMemory memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { - ms.Alloc = tt.memAlloc + tt.ballastSize + ms.Alloc = tt.memAlloc } ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) @@ -447,7 +366,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { processorhelper.WithShutdown(ml.shutdown)) require.NoError(t, err) - assert.NoError(t, tp.Start(ctx, &host{ballastSize: tt.ballastSize})) + assert.NoError(t, tp.Start(ctx, &host{})) ml.memlimiter.CheckMemLimits() err = tp.ConsumeLogs(ctx, ld) if tt.expectError { @@ -465,26 +384,14 @@ func TestLogMemoryPressureResponse(t *testing.T) { } type host struct { - ballastSize uint64 component.Host } func (h *host) GetExtensions() map[component.ID]component.Component { ret := make(map[component.ID]component.Component) - ret[component.MustNewID("ballast")] = &ballastExtension{ballastSize: h.ballastSize} return ret } -type ballastExtension struct { - ballastSize uint64 - component.StartFunc - component.ShutdownFunc -} - -func (be *ballastExtension) GetBallastSize() uint64 { - return be.ballastSize -} - func totalMemory() (uint64, error) { return uint64(2048), nil } diff --git a/service/internal/proctelemetry/process_telemetry.go b/service/internal/proctelemetry/process_telemetry.go index 0b978758104..e7a0cc1454a 100644 --- a/service/internal/proctelemetry/process_telemetry.go +++ b/service/internal/proctelemetry/process_telemetry.go @@ -21,7 +21,6 @@ import ( // processMetrics is a struct that contains views related to process metrics (cpu, mem, etc) type processMetrics struct { startTimeUnixNano int64 - ballastSizeBytes uint64 proc *process.Process context context.Context @@ -54,7 +53,7 @@ func WithHostProc(hostProc string) RegisterOption { // RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure // basic information about this process. -func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, ballastSizeBytes uint64, opts ...RegisterOption) error { +func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, opts ...RegisterOption) error { set := registerOption{} for _, opt := range opts { opt.apply(&set) @@ -62,7 +61,6 @@ func RegisterProcessMetrics(cfg servicetelemetry.TelemetrySettings, ballastSizeB var err error pm := &processMetrics{ startTimeUnixNano: time.Now().UnixNano(), - ballastSizeBytes: ballastSizeBytes, ms: &runtime.MemStats{}, } @@ -139,10 +137,4 @@ func (pm *processMetrics) readMemStatsIfNeeded() { } pm.lastMsRead = now runtime.ReadMemStats(pm.ms) - if pm.ballastSizeBytes > 0 { - pm.ms.Alloc -= pm.ballastSizeBytes - pm.ms.HeapAlloc -= pm.ballastSizeBytes - pm.ms.HeapSys -= pm.ballastSizeBytes - pm.ms.HeapInuse -= pm.ballastSizeBytes - } } diff --git a/service/internal/proctelemetry/process_telemetry_linux_test.go b/service/internal/proctelemetry/process_telemetry_linux_test.go index 73605c0ae8e..99471b4eca2 100644 --- a/service/internal/proctelemetry/process_telemetry_linux_test.go +++ b/service/internal/proctelemetry/process_telemetry_linux_test.go @@ -21,7 +21,7 @@ func TestProcessTelemetryWithHostProc(t *testing.T) { // Make the sure the environment variable value is not used. t.Setenv("HOST_PROC", "foo/bar") - require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, 0, WithHostProc("/proc"))) + require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, WithHostProc("/proc"))) // Check that the metrics are actually filled. time.Sleep(200 * time.Millisecond) diff --git a/service/internal/proctelemetry/process_telemetry_test.go b/service/internal/proctelemetry/process_telemetry_test.go index cccc5ad8e30..fb750fcf664 100644 --- a/service/internal/proctelemetry/process_telemetry_test.go +++ b/service/internal/proctelemetry/process_telemetry_test.go @@ -78,7 +78,7 @@ func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_cli func TestProcessTelemetry(t *testing.T) { tel := setupTelemetry(t) - require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings, 0)) + require.NoError(t, RegisterProcessMetrics(tel.TelemetrySettings)) mp, err := fetchPrometheusMetrics(tel.promHandler) require.NoError(t, err) diff --git a/service/service.go b/service/service.go index 99747db9ac6..8b57bd12cad 100644 --- a/service/service.go +++ b/service/service.go @@ -159,7 +159,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" { // The process telemetry initialization requires the ballast size, which is available after the extensions are initialized. - if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings, getBallastSize(srv.host)); err != nil { + if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings); err != nil { return nil, fmt.Errorf("failed to register process metrics: %w", err) } } @@ -316,15 +316,6 @@ func (srv *Service) Logger() *zap.Logger { return srv.telemetrySettings.Logger } -func getBallastSize(host component.Host) uint64 { - for _, ext := range host.GetExtensions() { - if bExt, ok := ext.(interface{ GetBallastSize() uint64 }); ok { - return bExt.GetBallastSize() - } - } - return 0 -} - func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource { // pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests. // Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only