diff --git a/.chloggen/prwexporter-syncpool-batchstate.yaml b/.chloggen/prwexporter-syncpool-batchstate.yaml new file mode 100644 index 000000000000..2f3fc6b03c45 --- /dev/null +++ b/.chloggen/prwexporter-syncpool-batchstate.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: In preparation to re-introducing multiple workers, we're removing a data-race when batching timeseries. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36601] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 74e1cff42b79..cb3b88986930 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -70,21 +70,25 @@ var bufferPool = sync.Pool{ // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type prwExporter struct { - endpointURL *url.URL - client *http.Client - wg *sync.WaitGroup - closeChan chan struct{} - concurrency int - userAgentHeader string - maxBatchSizeBytes int - clientSettings *confighttp.ClientConfig - settings component.TelemetrySettings - retrySettings configretry.BackOffConfig - retryOnHTTP429 bool - wal *prweWAL - exporterSettings prometheusremotewrite.Settings - telemetry prwTelemetry - batchTimeSeriesState batchTimeSeriesState + endpointURL *url.URL + client *http.Client + wg *sync.WaitGroup + closeChan chan struct{} + concurrency int + userAgentHeader string + maxBatchSizeBytes int + clientSettings *confighttp.ClientConfig + settings component.TelemetrySettings + retrySettings configretry.BackOffConfig + retryOnHTTP429 bool + wal *prweWAL + exporterSettings prometheusremotewrite.Settings + telemetry prwTelemetry + + // When concurrency is enabled, concurrent goroutines would potentially + // fight over the same batchState object. To avoid this, we use a pool + // to provide each goroutine with its own state. + batchStatePool sync.Pool } func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) { @@ -139,8 +143,8 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) { AddMetricSuffixes: cfg.AddMetricSuffixes, SendMetadata: cfg.SendMetadata, }, - telemetry: prwTelemetry, - batchTimeSeriesState: newBatchTimeSericesState(), + telemetry: prwTelemetry, + batchStatePool: sync.Pool{New: func() any { return newBatchTimeSericesState() }}, } if prwe.exporterSettings.ExportCreatedMetric { @@ -228,8 +232,10 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro return nil } + state := prwe.batchStatePool.Get().(*batchTimeSeriesState) + defer prwe.batchStatePool.Put(state) // Calls the helper function to convert and batch the TsMap to the desired format - requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState) + requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, state) if err != nil { return err } diff --git a/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go b/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go new file mode 100644 index 000000000000..689cbcf7b9fc --- /dev/null +++ b/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go @@ -0,0 +1,138 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewriteexporter + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strconv" + "sync" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" +) + +// Test everything works when there is more than one goroutine calling PushMetrics. +// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes. +func Test_PushMetricsConcurrent(t *testing.T) { + n := 1000 + ms := make([]pmetric.Metrics, n) + testIDKey := "test_id" + for i := 0; i < n; i++ { + m := testdata.GenerateMetricsOneMetric() + dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints() + for j := 0; j < dps.Len(); j++ { + dp := dps.At(j) + dp.Attributes().PutInt(testIDKey, int64(i)) + } + ms[i] = m + } + received := make(map[int]prompb.TimeSeries) + var mu sync.Mutex + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + assert.NotNil(t, body) + // Receives the http requests and unzip, unmarshalls, and extracts TimeSeries + assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version")) + assert.Equal(t, "snappy", r.Header.Get("Content-Encoding")) + var unzipped []byte + + dest, err := snappy.Decode(unzipped, body) + assert.NoError(t, err) + + wr := &prompb.WriteRequest{} + ok := proto.Unmarshal(dest, wr) + assert.NoError(t, ok) + assert.Len(t, wr.Timeseries, 2) + ts := wr.Timeseries[0] + foundLabel := false + for _, label := range ts.Labels { + if label.Name == testIDKey { + id, err := strconv.Atoi(label.Value) + assert.NoError(t, err) + mu.Lock() + _, ok := received[id] + assert.False(t, ok) // fail if we already saw it + received[id] = ts + mu.Unlock() + foundLabel = true + break + } + } + assert.True(t, foundLabel) + w.WriteHeader(http.StatusOK) + })) + + defer server.Close() + + // Adjusted retry settings for faster testing + retrySettings := configretry.BackOffConfig{ + Enabled: true, + InitialInterval: 100 * time.Millisecond, // Shorter initial interval + MaxInterval: 1 * time.Second, // Shorter max interval + MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time + } + clientConfig := confighttp.NewDefaultClientConfig() + clientConfig.Endpoint = server.URL + clientConfig.ReadBufferSize = 0 + clientConfig.WriteBufferSize = 512 * 1024 + cfg := &Config{ + Namespace: "", + ClientConfig: clientConfig, + MaxBatchSizeBytes: 3000000, + RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1}, + TargetInfo: &TargetInfo{ + Enabled: true, + }, + CreatedMetric: &CreatedMetric{ + Enabled: false, + }, + BackOffConfig: retrySettings, + } + + assert.NotNil(t, cfg) + set := exportertest.NewNopSettings() + set.MetricsLevel = configtelemetry.LevelBasic + + prwe, nErr := newPRWExporter(cfg, set) + + require.NoError(t, nErr) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost())) + defer func() { + require.NoError(t, prwe.Shutdown(ctx)) + }() + + var wg sync.WaitGroup + wg.Add(n) + for _, m := range ms { + go func() { + err := prwe.PushMetrics(ctx, m) + assert.NoError(t, err) + wg.Done() + }() + } + wg.Wait() + assert.Len(t, received, n) +} diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 5fbe3ef237ab..a21e3ac4b6e4 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "math" "net/http" "net/http/httptest" "net/url" @@ -1274,3 +1275,80 @@ func benchmarkExecute(b *testing.B, numSample int) { require.NoError(b, err) } } + +func BenchmarkPushMetrics(b *testing.B) { + for _, numMetrics := range []int{10, 100, 1000, 10000} { + b.Run(fmt.Sprintf("numMetrics=%d", numMetrics), func(b *testing.B) { + benchmarkPushMetrics(b, numMetrics, 1) + }) + } +} + +func BenchmarkPushMetricsVaryingMetrics(b *testing.B) { + benchmarkPushMetrics(b, -1, 1) +} + +// benchmarkPushMetrics benchmarks the PushMetrics method with a given number of metrics. +// If numMetrics is -1, it will benchmark with varying number of metrics, from 10 up to 10000. +func benchmarkPushMetrics(b *testing.B, numMetrics, numConsumers int) { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer mockServer.Close() + endpointURL, err := url.Parse(mockServer.URL) + require.NoError(b, err) + + tel := setupTestTelemetry() + set := tel.NewSettings() + // Adjusted retry settings for faster testing + retrySettings := configretry.BackOffConfig{ + Enabled: true, + InitialInterval: 100 * time.Millisecond, // Shorter initial interval + MaxInterval: 1 * time.Second, // Shorter max interval + MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time + } + clientConfig := confighttp.NewDefaultClientConfig() + clientConfig.Endpoint = endpointURL.String() + clientConfig.ReadBufferSize = 0 + clientConfig.WriteBufferSize = 512 * 1024 + cfg := &Config{ + Namespace: "", + ClientConfig: clientConfig, + MaxBatchSizeBytes: 3000, + RemoteWriteQueue: RemoteWriteQueue{NumConsumers: numConsumers}, + BackOffConfig: retrySettings, + TargetInfo: &TargetInfo{Enabled: true}, + CreatedMetric: &CreatedMetric{Enabled: false}, + } + exporter, err := newPRWExporter(cfg, set) + require.NoError(b, err) + + var metrics []pmetric.Metrics + for n := 0; n < b.N; n++ { + actualNumMetrics := numMetrics + if numMetrics == -1 { + actualNumMetrics = int(math.Pow(10, float64(n%4+1))) + } + m := testdata.GenerateMetricsManyMetricsSameResource(actualNumMetrics) + for i := 0; i < m.MetricCount(); i++ { + dp := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(i).Sum().DataPoints().AppendEmpty() + dp.SetIntValue(int64(i)) + // We add a random key to the attributes to ensure that we create a new time series during translation for each metric. + dp.Attributes().PutInt("random_key", int64(i)) + } + metrics = append(metrics, m) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) + defer func() { + require.NoError(b, exporter.Shutdown(ctx)) + }() + b.ReportAllocs() + b.ResetTimer() + for _, m := range metrics { + err := exporter.PushMetrics(ctx, m) + require.NoError(b, err) + } +} diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index 5819aefc4fe8..26def2570eff 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -19,8 +19,8 @@ type batchTimeSeriesState struct { nextRequestBufferSize int } -func newBatchTimeSericesState() batchTimeSeriesState { - return batchTimeSeriesState{ +func newBatchTimeSericesState() *batchTimeSeriesState { + return &batchTimeSeriesState{ nextTimeSeriesBufferSize: math.MaxInt, nextMetricMetadataBufferSize: math.MaxInt, nextRequestBufferSize: 0, diff --git a/exporter/prometheusremotewriteexporter/helper_test.go b/exporter/prometheusremotewriteexporter/helper_test.go index f464c25071b0..aa7714ca297d 100644 --- a/exporter/prometheusremotewriteexporter/helper_test.go +++ b/exporter/prometheusremotewriteexporter/helper_test.go @@ -59,7 +59,7 @@ func Test_batchTimeSeries(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { state := newBatchTimeSericesState() - requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, &state) + requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, state) if tt.returnErr { assert.Error(t, err) return @@ -97,7 +97,7 @@ func Test_batchTimeSeriesUpdatesStateForLargeBatches(t *testing.T) { tsMap1 := getTimeseriesMap(tsArray) state := newBatchTimeSericesState() - requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state) + requests, err := batchTimeSeries(tsMap1, 1000000, nil, state) assert.NoError(t, err) assert.Len(t, requests, 18) @@ -132,7 +132,7 @@ func Benchmark_batchTimeSeries(b *testing.B) { state := newBatchTimeSericesState() // Run batchTimeSeries 100 times with a 1mb max request size for i := 0; i < b.N; i++ { - requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state) + requests, err := batchTimeSeries(tsMap1, 1000000, nil, state) assert.NoError(b, err) assert.Len(b, requests, 18) }