From 2052bd10f37194569d12577bb433f98ee4dab779 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Mon, 25 Nov 2024 14:45:52 -0300 Subject: [PATCH 1/5] Add test to show data race in batchTimeSeries() Signed-off-by: Arthur Silva Sens --- .../exporter_concurrency_test.go | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 exporter/prometheusremotewriteexporter/exporter_concurrency_test.go diff --git a/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go b/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go new file mode 100644 index 000000000000..689cbcf7b9fc --- /dev/null +++ b/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go @@ -0,0 +1,138 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewriteexporter + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strconv" + "sync" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" +) + +// Test everything works when there is more than one goroutine calling PushMetrics. +// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes. +func Test_PushMetricsConcurrent(t *testing.T) { + n := 1000 + ms := make([]pmetric.Metrics, n) + testIDKey := "test_id" + for i := 0; i < n; i++ { + m := testdata.GenerateMetricsOneMetric() + dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints() + for j := 0; j < dps.Len(); j++ { + dp := dps.At(j) + dp.Attributes().PutInt(testIDKey, int64(i)) + } + ms[i] = m + } + received := make(map[int]prompb.TimeSeries) + var mu sync.Mutex + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + assert.NotNil(t, body) + // Receives the http requests and unzip, unmarshalls, and extracts TimeSeries + assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version")) + assert.Equal(t, "snappy", r.Header.Get("Content-Encoding")) + var unzipped []byte + + dest, err := snappy.Decode(unzipped, body) + assert.NoError(t, err) + + wr := &prompb.WriteRequest{} + ok := proto.Unmarshal(dest, wr) + assert.NoError(t, ok) + assert.Len(t, wr.Timeseries, 2) + ts := wr.Timeseries[0] + foundLabel := false + for _, label := range ts.Labels { + if label.Name == testIDKey { + id, err := strconv.Atoi(label.Value) + assert.NoError(t, err) + mu.Lock() + _, ok := received[id] + assert.False(t, ok) // fail if we already saw it + received[id] = ts + mu.Unlock() + foundLabel = true + break + } + } + assert.True(t, foundLabel) + w.WriteHeader(http.StatusOK) + })) + + defer server.Close() + + // Adjusted retry settings for faster testing + retrySettings := configretry.BackOffConfig{ + Enabled: true, + InitialInterval: 100 * time.Millisecond, // Shorter initial interval + MaxInterval: 1 * time.Second, // Shorter max interval + MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time + } + clientConfig := confighttp.NewDefaultClientConfig() + clientConfig.Endpoint = server.URL + clientConfig.ReadBufferSize = 0 + clientConfig.WriteBufferSize = 512 * 1024 + cfg := &Config{ + Namespace: "", + ClientConfig: clientConfig, + MaxBatchSizeBytes: 3000000, + RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1}, + TargetInfo: &TargetInfo{ + Enabled: true, + }, + CreatedMetric: &CreatedMetric{ + Enabled: false, + }, + BackOffConfig: retrySettings, + } + + assert.NotNil(t, cfg) + set := exportertest.NewNopSettings() + set.MetricsLevel = configtelemetry.LevelBasic + + prwe, nErr := newPRWExporter(cfg, set) + + require.NoError(t, nErr) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost())) + defer func() { + require.NoError(t, prwe.Shutdown(ctx)) + }() + + var wg sync.WaitGroup + wg.Add(n) + for _, m := range ms { + go func() { + err := prwe.PushMetrics(ctx, m) + assert.NoError(t, err) + wg.Done() + }() + } + wg.Wait() + assert.Len(t, received, n) +} From bdeb2543df9480a2422f1b673569f175c5ccf0a8 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 29 Nov 2024 09:07:50 -0300 Subject: [PATCH 2/5] Add benchmark for PushMetrics() Signed-off-by: Arthur Silva Sens --- .../exporter_test.go | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 5fbe3ef237ab..74aa59c26492 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "math" "net/http" "net/http/httptest" "net/url" @@ -1274,3 +1275,74 @@ func benchmarkExecute(b *testing.B, numSample int) { require.NoError(b, err) } } + +func BenchmarkPushMetrics(b *testing.B) { + for _, numMetrics := range []int{10, 100, 1000, 10000} { + b.Run(fmt.Sprintf("numMetrics=%d", numMetrics), func(b *testing.B) { + benchmarkPushMetrics(b, numMetrics, 1) + }) + } +} + +func BenchmarkPushMetricsVaryingMetrics(b *testing.B) { + benchmarkPushMetrics(b, -1, 1) +} + +// benchmarkPushMetrics benchmarks the PushMetrics method with a given number of metrics. +// If numMetrics is -1, it will benchmark with varying number of metrics, from 10 up to 10000. +func benchmarkPushMetrics(b *testing.B, numMetrics, numConsumers int) { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer mockServer.Close() + endpointURL, err := url.Parse(mockServer.URL) + require.NoError(b, err) + + tel := setupTestTelemetry() + set := tel.NewSettings() + // Adjusted retry settings for faster testing + retrySettings := configretry.BackOffConfig{ + Enabled: true, + InitialInterval: 100 * time.Millisecond, // Shorter initial interval + MaxInterval: 1 * time.Second, // Shorter max interval + MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time + } + clientConfig := confighttp.NewDefaultClientConfig() + clientConfig.Endpoint = endpointURL.String() + clientConfig.ReadBufferSize = 0 + clientConfig.WriteBufferSize = 512 * 1024 + cfg := &Config{ + Namespace: "", + ClientConfig: clientConfig, + MaxBatchSizeBytes: 3000000, + RemoteWriteQueue: RemoteWriteQueue{NumConsumers: numConsumers}, + BackOffConfig: retrySettings, + TargetInfo: &TargetInfo{Enabled: true}, + CreatedMetric: &CreatedMetric{Enabled: false}, + } + exporter, err := newPRWExporter(cfg, set) + require.NoError(b, err) + + var metrics []pmetric.Metrics + for n := 0; n < b.N; n++ { + actualNumMetrics := numMetrics + if numMetrics == -1 { + actualNumMetrics = int(math.Pow(10, float64(n%4+1))) + } + m := testdata.GenerateMetricsManyMetricsSameResource(actualNumMetrics) + metrics = append(metrics, m) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) + defer func() { + require.NoError(b, exporter.Shutdown(ctx)) + }() + b.ReportAllocs() + b.ResetTimer() + for _, m := range metrics { + err := exporter.PushMetrics(ctx, m) + require.NoError(b, err) + } +} From 014c81200da6ee5992871643a3b83f82d9fc7be3 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Thu, 28 Nov 2024 16:51:15 -0300 Subject: [PATCH 3/5] Remove data race by removing the state altogether Signed-off-by: Arthur Silva Sens --- .../prometheusremotewriteexporter/exporter.go | 4 +- .../prometheusremotewriteexporter/helper.go | 52 ++++++------------- .../helper_test.go | 22 ++------ 3 files changed, 20 insertions(+), 58 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 74e1cff42b79..0bddf12ae0fa 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -84,7 +84,6 @@ type prwExporter struct { wal *prweWAL exporterSettings prometheusremotewrite.Settings telemetry prwTelemetry - batchTimeSeriesState batchTimeSeriesState } func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) { @@ -140,7 +139,6 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) { SendMetadata: cfg.SendMetadata, }, telemetry: prwTelemetry, - batchTimeSeriesState: newBatchTimeSericesState(), } if prwe.exporterSettings.ExportCreatedMetric { @@ -229,7 +227,7 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro } // Calls the helper function to convert and batch the TsMap to the desired format - requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState) + requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m) if err != nil { return err } diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index 5819aefc4fe8..bbd7fb6a029e 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -5,55 +5,37 @@ package prometheusremotewriteexporter // import "github.com/open-telemetry/opent import ( "errors" - "math" "sort" "github.com/prometheus/prometheus/prompb" ) -type batchTimeSeriesState struct { - // Track batch sizes sent to avoid over allocating huge buffers. - // This helps in the case where large batches are sent to avoid allocating too much unused memory - nextTimeSeriesBufferSize int - nextMetricMetadataBufferSize int - nextRequestBufferSize int -} - -func newBatchTimeSericesState() batchTimeSeriesState { - return batchTimeSeriesState{ - nextTimeSeriesBufferSize: math.MaxInt, - nextMetricMetadataBufferSize: math.MaxInt, - nextRequestBufferSize: 0, - } -} - -// batchTimeSeries splits series into multiple batch write requests. -func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata, state *batchTimeSeriesState) ([]*prompb.WriteRequest, error) { +// batchTimeSeries splits series into multiple write requests if they exceed the maxBatchByteSize. +func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) { if len(tsMap) == 0 { return nil, errors.New("invalid tsMap: cannot be empty map") } - // Allocate a buffer size of at least 10, or twice the last # of requests we sent - requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize)) + // Allocate a buffer size of at least 10. + requests := make([]*prompb.WriteRequest, 0, 10) - // Allocate a time series buffer 2x the last time series batch size or the length of the input if smaller - tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap))) + // Allocate a time series buffer with the length of the input. + tsArray := make([]prompb.TimeSeries, 0, len(tsMap)) sizeOfCurrentBatch := 0 i := 0 - for _, v := range tsMap { - sizeOfSeries := v.Size() + for _, series := range tsMap { + sizeOfSeries := series.Size() if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize { - state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray)) wrapped := convertTimeseriesToRequest(tsArray) requests = append(requests, wrapped) - tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i)) + tsArray = make([]prompb.TimeSeries, 0, len(tsMap)-i) sizeOfCurrentBatch = 0 } - tsArray = append(tsArray, *v) + tsArray = append(tsArray, *series) sizeOfCurrentBatch += sizeOfSeries i++ } @@ -63,23 +45,22 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, requests = append(requests, wrapped) } - // Allocate a metric metadata buffer 2x the last metric metadata batch size or the length of the input if smaller - mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m))) + // Allocate a metric metadata with the length of the input. + mArray := make([]prompb.MetricMetadata, 0, len(m)) sizeOfCurrentBatch = 0 i = 0 - for _, v := range m { - sizeOfM := v.Size() + for _, metadata := range m { + sizeOfM := metadata.Size() if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize { - state.nextMetricMetadataBufferSize = max(10, 2*len(mArray)) wrapped := convertMetadataToRequest(mArray) requests = append(requests, wrapped) - mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)-i)) + mArray = make([]prompb.MetricMetadata, 0, len(m)-i) sizeOfCurrentBatch = 0 } - mArray = append(mArray, *v) + mArray = append(mArray, *metadata) sizeOfCurrentBatch += sizeOfM i++ } @@ -89,7 +70,6 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, requests = append(requests, wrapped) } - state.nextRequestBufferSize = 2 * len(requests) return requests, nil } diff --git a/exporter/prometheusremotewriteexporter/helper_test.go b/exporter/prometheusremotewriteexporter/helper_test.go index f464c25071b0..b63b2069f59f 100644 --- a/exporter/prometheusremotewriteexporter/helper_test.go +++ b/exporter/prometheusremotewriteexporter/helper_test.go @@ -4,7 +4,6 @@ package prometheusremotewriteexporter import ( - "math" "testing" "github.com/prometheus/prometheus/prompb" @@ -58,8 +57,7 @@ func Test_batchTimeSeries(t *testing.T) { // run tests for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - state := newBatchTimeSericesState() - requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, &state) + requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil) if tt.returnErr { assert.Error(t, err) return @@ -67,15 +65,6 @@ func Test_batchTimeSeries(t *testing.T) { assert.NoError(t, err) assert.Len(t, requests, tt.numExpectedRequests) - if tt.numExpectedRequests <= 1 { - assert.Equal(t, math.MaxInt, state.nextTimeSeriesBufferSize) - assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize) - assert.Equal(t, 2*len(requests), state.nextRequestBufferSize) - } else { - assert.Equal(t, max(10, len(requests[len(requests)-2].Timeseries)*2), state.nextTimeSeriesBufferSize) - assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize) - assert.Equal(t, 2*len(requests), state.nextRequestBufferSize) - } }) } } @@ -96,14 +85,10 @@ func Test_batchTimeSeriesUpdatesStateForLargeBatches(t *testing.T) { tsMap1 := getTimeseriesMap(tsArray) - state := newBatchTimeSericesState() - requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state) + requests, err := batchTimeSeries(tsMap1, 1000000, nil) assert.NoError(t, err) assert.Len(t, requests, 18) - assert.Equal(t, len(requests[len(requests)-2].Timeseries)*2, state.nextTimeSeriesBufferSize) - assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize) - assert.Equal(t, 36, state.nextRequestBufferSize) } // Benchmark_batchTimeSeries checks batchTimeSeries @@ -129,10 +114,9 @@ func Benchmark_batchTimeSeries(b *testing.B) { b.ReportAllocs() b.ResetTimer() - state := newBatchTimeSericesState() // Run batchTimeSeries 100 times with a 1mb max request size for i := 0; i < b.N; i++ { - requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state) + requests, err := batchTimeSeries(tsMap1, 1000000, nil) assert.NoError(b, err) assert.Len(b, requests, 18) } From 2dc18701841ecbac350ffc9115d3e1a12558399e Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Mon, 2 Dec 2024 11:32:11 -0300 Subject: [PATCH 4/5] Increase data used in benchmark Signed-off-by: Arthur Silva Sens --- exporter/prometheusremotewriteexporter/exporter_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 74aa59c26492..04f1dc22ca4d 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -1289,7 +1289,7 @@ func BenchmarkPushMetricsVaryingMetrics(b *testing.B) { } // benchmarkPushMetrics benchmarks the PushMetrics method with a given number of metrics. -// If numMetrics is -1, it will benchmark with varying number of metrics, from 10 up to 10000. +// If numMetrics is -1, it will benchmark with varying number of metrics, from 100 up to 1000000. func benchmarkPushMetrics(b *testing.B, numMetrics, numConsumers int) { mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) @@ -1327,9 +1327,12 @@ func benchmarkPushMetrics(b *testing.B, numMetrics, numConsumers int) { for n := 0; n < b.N; n++ { actualNumMetrics := numMetrics if numMetrics == -1 { - actualNumMetrics = int(math.Pow(10, float64(n%4+1))) + actualNumMetrics = int(math.Pow(100, float64(n%3+1))) } m := testdata.GenerateMetricsManyMetricsSameResource(actualNumMetrics) + for i := 0; i < m.MetricCount(); i++ { + m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(i).Sum().DataPoints().AppendEmpty().SetIntValue(int64(i)) + } metrics = append(metrics, m) } From b5a9c1dc45ad80eaedac24906e1c45e004706cbe Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 13 Dec 2024 16:11:29 -0300 Subject: [PATCH 5/5] Update benchmark to create more diverse timeseries Signed-off-by: Arthur Silva Sens --- .../prometheusremotewriteexporter/exporter_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 04f1dc22ca4d..a21e3ac4b6e4 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -1289,7 +1289,7 @@ func BenchmarkPushMetricsVaryingMetrics(b *testing.B) { } // benchmarkPushMetrics benchmarks the PushMetrics method with a given number of metrics. -// If numMetrics is -1, it will benchmark with varying number of metrics, from 100 up to 1000000. +// If numMetrics is -1, it will benchmark with varying number of metrics, from 10 up to 10000. func benchmarkPushMetrics(b *testing.B, numMetrics, numConsumers int) { mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) @@ -1314,7 +1314,7 @@ func benchmarkPushMetrics(b *testing.B, numMetrics, numConsumers int) { cfg := &Config{ Namespace: "", ClientConfig: clientConfig, - MaxBatchSizeBytes: 3000000, + MaxBatchSizeBytes: 3000, RemoteWriteQueue: RemoteWriteQueue{NumConsumers: numConsumers}, BackOffConfig: retrySettings, TargetInfo: &TargetInfo{Enabled: true}, @@ -1327,11 +1327,14 @@ func benchmarkPushMetrics(b *testing.B, numMetrics, numConsumers int) { for n := 0; n < b.N; n++ { actualNumMetrics := numMetrics if numMetrics == -1 { - actualNumMetrics = int(math.Pow(100, float64(n%3+1))) + actualNumMetrics = int(math.Pow(10, float64(n%4+1))) } m := testdata.GenerateMetricsManyMetricsSameResource(actualNumMetrics) for i := 0; i < m.MetricCount(); i++ { - m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(i).Sum().DataPoints().AppendEmpty().SetIntValue(int64(i)) + dp := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(i).Sum().DataPoints().AppendEmpty() + dp.SetIntValue(int64(i)) + // We add a random key to the attributes to ensure that we create a new time series during translation for each metric. + dp.Attributes().PutInt("random_key", int64(i)) } metrics = append(metrics, m) }