From e41c6a23d4f7e319e5f90dd797285b52f806f152 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Mon, 25 Nov 2024 15:40:22 -0300 Subject: [PATCH] bugfix: Fix data race in batchTimeSeries state Signed-off-by: Arthur Silva Sens --- ...porter-batchseries-concurrency-bugfix.yaml | 26 +++++++++++++ .../prometheusremotewriteexporter/exporter.go | 5 +-- .../prometheusremotewriteexporter/helper.go | 37 +++++++++++-------- .../helper_test.go | 24 ++++++------ 4 files changed, 61 insertions(+), 31 deletions(-) create mode 100644 .chloggen/prwexporter-batchseries-concurrency-bugfix.yaml diff --git a/.chloggen/prwexporter-batchseries-concurrency-bugfix.yaml b/.chloggen/prwexporter-batchseries-concurrency-bugfix.yaml new file mode 100644 index 000000000000..f9ce55398325 --- /dev/null +++ b/.chloggen/prwexporter-batchseries-concurrency-bugfix.yaml @@ -0,0 +1,26 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Fix data race in batch series state." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36524] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: "" +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 74e1cff42b79..72cb1bf5cd6c 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -84,7 +84,7 @@ type prwExporter struct { wal *prweWAL exporterSettings prometheusremotewrite.Settings telemetry prwTelemetry - batchTimeSeriesState batchTimeSeriesState + batchTimeSeriesState *batchTimeSeriesState } func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) { @@ -191,7 +191,6 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er case <-prwe.closeChan: return errors.New("shutdown has been called") default: - tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) if err != nil { prwe.telemetry.recordTranslationFailure(ctx) @@ -229,7 +228,7 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro } // Calls the helper function to convert and batch the TsMap to the desired format - requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState) + requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, prwe.batchTimeSeriesState) if err != nil { return err } diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index 5819aefc4fe8..89d14a336fb4 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -7,6 +7,7 @@ import ( "errors" "math" "sort" + "sync/atomic" "github.com/prometheus/prometheus/prompb" ) @@ -14,17 +15,21 @@ import ( type batchTimeSeriesState struct { // Track batch sizes sent to avoid over allocating huge buffers. // This helps in the case where large batches are sent to avoid allocating too much unused memory - nextTimeSeriesBufferSize int - nextMetricMetadataBufferSize int - nextRequestBufferSize int + nextTimeSeriesBufferSize atomic.Int64 + nextMetricMetadataBufferSize atomic.Int64 + nextRequestBufferSize atomic.Int64 } -func newBatchTimeSericesState() batchTimeSeriesState { - return batchTimeSeriesState{ - nextTimeSeriesBufferSize: math.MaxInt, - nextMetricMetadataBufferSize: math.MaxInt, - nextRequestBufferSize: 0, +func newBatchTimeSericesState() *batchTimeSeriesState { + state := &batchTimeSeriesState{ + nextTimeSeriesBufferSize: atomic.Int64{}, + nextMetricMetadataBufferSize: atomic.Int64{}, + nextRequestBufferSize: atomic.Int64{}, } + state.nextTimeSeriesBufferSize.Store(math.MaxInt64) + state.nextMetricMetadataBufferSize.Store(math.MaxInt64) + state.nextRequestBufferSize.Store(0) + return state } // batchTimeSeries splits series into multiple batch write requests. @@ -34,10 +39,10 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, } // Allocate a buffer size of at least 10, or twice the last # of requests we sent - requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize)) + requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize.Load())) // Allocate a time series buffer 2x the last time series batch size or the length of the input if smaller - tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap))) + tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize.Load(), int64(len(tsMap)))) sizeOfCurrentBatch := 0 i := 0 @@ -45,11 +50,11 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, sizeOfSeries := v.Size() if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize { - state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray)) + state.nextTimeSeriesBufferSize.Store(int64(max(10, 2*len(tsArray)))) wrapped := convertTimeseriesToRequest(tsArray) requests = append(requests, wrapped) - tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i)) + tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize.Load(), int64(len(tsMap)-i))) sizeOfCurrentBatch = 0 } @@ -64,18 +69,18 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, } // Allocate a metric metadata buffer 2x the last metric metadata batch size or the length of the input if smaller - mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m))) + mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize.Load(), int64(len(m)))) sizeOfCurrentBatch = 0 i = 0 for _, v := range m { sizeOfM := v.Size() if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize { - state.nextMetricMetadataBufferSize = max(10, 2*len(mArray)) + state.nextMetricMetadataBufferSize.Store(int64(max(10, 2*len(mArray)))) wrapped := convertMetadataToRequest(mArray) requests = append(requests, wrapped) - mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)-i)) + mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize.Load(), int64(len(m)-i))) sizeOfCurrentBatch = 0 } @@ -89,7 +94,7 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, requests = append(requests, wrapped) } - state.nextRequestBufferSize = 2 * len(requests) + state.nextRequestBufferSize.Store(int64(2 * len(requests))) return requests, nil } diff --git a/exporter/prometheusremotewriteexporter/helper_test.go b/exporter/prometheusremotewriteexporter/helper_test.go index f464c25071b0..4c773e11ea86 100644 --- a/exporter/prometheusremotewriteexporter/helper_test.go +++ b/exporter/prometheusremotewriteexporter/helper_test.go @@ -59,7 +59,7 @@ func Test_batchTimeSeries(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { state := newBatchTimeSericesState() - requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, &state) + requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, state) if tt.returnErr { assert.Error(t, err) return @@ -68,13 +68,13 @@ func Test_batchTimeSeries(t *testing.T) { assert.NoError(t, err) assert.Len(t, requests, tt.numExpectedRequests) if tt.numExpectedRequests <= 1 { - assert.Equal(t, math.MaxInt, state.nextTimeSeriesBufferSize) - assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize) - assert.Equal(t, 2*len(requests), state.nextRequestBufferSize) + assert.Equal(t, int64(math.MaxInt64), state.nextTimeSeriesBufferSize.Load()) + assert.Equal(t, int64(math.MaxInt64), state.nextMetricMetadataBufferSize.Load()) + assert.Equal(t, int64(2*len(requests)), state.nextRequestBufferSize.Load()) } else { - assert.Equal(t, max(10, len(requests[len(requests)-2].Timeseries)*2), state.nextTimeSeriesBufferSize) - assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize) - assert.Equal(t, 2*len(requests), state.nextRequestBufferSize) + assert.Equal(t, int64(max(10, len(requests[len(requests)-2].Timeseries)*2)), state.nextTimeSeriesBufferSize.Load()) + assert.Equal(t, int64(math.MaxInt64), state.nextMetricMetadataBufferSize.Load()) + assert.Equal(t, int64(2*len(requests)), state.nextRequestBufferSize.Load()) } }) } @@ -97,13 +97,13 @@ func Test_batchTimeSeriesUpdatesStateForLargeBatches(t *testing.T) { tsMap1 := getTimeseriesMap(tsArray) state := newBatchTimeSericesState() - requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state) + requests, err := batchTimeSeries(tsMap1, 1000000, nil, state) assert.NoError(t, err) assert.Len(t, requests, 18) - assert.Equal(t, len(requests[len(requests)-2].Timeseries)*2, state.nextTimeSeriesBufferSize) - assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize) - assert.Equal(t, 36, state.nextRequestBufferSize) + assert.Equal(t, int64(len(requests[len(requests)-2].Timeseries)*2), state.nextTimeSeriesBufferSize.Load()) + assert.Equal(t, int64(math.MaxInt64), state.nextMetricMetadataBufferSize.Load()) + assert.Equal(t, int64(36), state.nextRequestBufferSize.Load()) } // Benchmark_batchTimeSeries checks batchTimeSeries @@ -132,7 +132,7 @@ func Benchmark_batchTimeSeries(b *testing.B) { state := newBatchTimeSericesState() // Run batchTimeSeries 100 times with a 1mb max request size for i := 0; i < b.N; i++ { - requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state) + requests, err := batchTimeSeries(tsMap1, 1000000, nil, state) assert.NoError(b, err) assert.Len(b, requests, 18) }