Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/prometheusremotewrite] Fix data race for batch state by removing the state altogether #36600

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ type prwExporter struct {
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
batchTimeSeriesState batchTimeSeriesState
}

func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) {
Expand Down Expand Up @@ -140,7 +139,6 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
SendMetadata: cfg.SendMetadata,
},
telemetry: prwTelemetry,
batchTimeSeriesState: newBatchTimeSericesState(),
}

if prwe.exporterSettings.ExportCreatedMetric {
Expand Down Expand Up @@ -229,7 +227,7 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m)
if err != nil {
return err
}
Expand Down
138 changes: 138 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewriteexporter

import (
"context"
"io"
"net/http"
"net/http/httptest"
"strconv"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
)

// Test everything works when there is more than one goroutine calling PushMetrics.
// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes.
func Test_PushMetricsConcurrent(t *testing.T) {
n := 1000
ms := make([]pmetric.Metrics, n)
testIDKey := "test_id"
for i := 0; i < n; i++ {
m := testdata.GenerateMetricsOneMetric()
dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints()
for j := 0; j < dps.Len(); j++ {
dp := dps.At(j)
dp.Attributes().PutInt(testIDKey, int64(i))
}
ms[i] = m
}
received := make(map[int]prompb.TimeSeries)
var mu sync.Mutex

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
assert.NotNil(t, body)
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version"))
assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
var unzipped []byte

dest, err := snappy.Decode(unzipped, body)
assert.NoError(t, err)

wr := &prompb.WriteRequest{}
ok := proto.Unmarshal(dest, wr)
assert.NoError(t, ok)
assert.Len(t, wr.Timeseries, 2)
ts := wr.Timeseries[0]
foundLabel := false
for _, label := range ts.Labels {
if label.Name == testIDKey {
id, err := strconv.Atoi(label.Value)
assert.NoError(t, err)
mu.Lock()
_, ok := received[id]
assert.False(t, ok) // fail if we already saw it
received[id] = ts
mu.Unlock()
foundLabel = true
break
}
}
assert.True(t, foundLabel)
w.WriteHeader(http.StatusOK)
}))

defer server.Close()

// Adjusted retry settings for faster testing
retrySettings := configretry.BackOffConfig{
Enabled: true,
InitialInterval: 100 * time.Millisecond, // Shorter initial interval
MaxInterval: 1 * time.Second, // Shorter max interval
MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
}
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = server.URL
clientConfig.ReadBufferSize = 0
clientConfig.WriteBufferSize = 512 * 1024
cfg := &Config{
Namespace: "",
ClientConfig: clientConfig,
MaxBatchSizeBytes: 3000000,
RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1},
TargetInfo: &TargetInfo{
Enabled: true,
},
CreatedMetric: &CreatedMetric{
Enabled: false,
},
BackOffConfig: retrySettings,
}

assert.NotNil(t, cfg)
set := exportertest.NewNopSettings()
set.MetricsLevel = configtelemetry.LevelBasic

prwe, nErr := newPRWExporter(cfg, set)

require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(t, prwe.Shutdown(ctx))
}()

var wg sync.WaitGroup
wg.Add(n)
for _, m := range ms {
go func() {
err := prwe.PushMetrics(ctx, m)
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
assert.Len(t, received, n)
}
78 changes: 78 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -1274,3 +1275,80 @@ func benchmarkExecute(b *testing.B, numSample int) {
require.NoError(b, err)
}
}

func BenchmarkPushMetrics(b *testing.B) {
for _, numMetrics := range []int{10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("numMetrics=%d", numMetrics), func(b *testing.B) {
benchmarkPushMetrics(b, numMetrics, 1)
})
}
}

func BenchmarkPushMetricsVaryingMetrics(b *testing.B) {
benchmarkPushMetrics(b, -1, 1)
}

// benchmarkPushMetrics benchmarks the PushMetrics method with a given number of metrics.
// If numMetrics is -1, it will benchmark with varying number of metrics, from 10 up to 10000.
func benchmarkPushMetrics(b *testing.B, numMetrics, numConsumers int) {
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
defer mockServer.Close()
endpointURL, err := url.Parse(mockServer.URL)
require.NoError(b, err)

tel := setupTestTelemetry()
set := tel.NewSettings()
// Adjusted retry settings for faster testing
retrySettings := configretry.BackOffConfig{
Enabled: true,
InitialInterval: 100 * time.Millisecond, // Shorter initial interval
MaxInterval: 1 * time.Second, // Shorter max interval
MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
}
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = endpointURL.String()
clientConfig.ReadBufferSize = 0
clientConfig.WriteBufferSize = 512 * 1024
cfg := &Config{
Namespace: "",
ClientConfig: clientConfig,
MaxBatchSizeBytes: 3000,
RemoteWriteQueue: RemoteWriteQueue{NumConsumers: numConsumers},
BackOffConfig: retrySettings,
TargetInfo: &TargetInfo{Enabled: true},
CreatedMetric: &CreatedMetric{Enabled: false},
}
exporter, err := newPRWExporter(cfg, set)
require.NoError(b, err)

var metrics []pmetric.Metrics
for n := 0; n < b.N; n++ {
actualNumMetrics := numMetrics
if numMetrics == -1 {
actualNumMetrics = int(math.Pow(10, float64(n%4+1)))
}
m := testdata.GenerateMetricsManyMetricsSameResource(actualNumMetrics)
for i := 0; i < m.MetricCount(); i++ {
dp := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(i).Sum().DataPoints().AppendEmpty()
dp.SetIntValue(int64(i))
// We add a random key to the attributes to ensure that we create a new time series during translation for each metric.
dp.Attributes().PutInt("random_key", int64(i))
}
metrics = append(metrics, m)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(b, exporter.Shutdown(ctx))
}()
b.ReportAllocs()
b.ResetTimer()
for _, m := range metrics {
err := exporter.PushMetrics(ctx, m)
require.NoError(b, err)
}
}
52 changes: 16 additions & 36 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,37 @@ package prometheusremotewriteexporter // import "github.com/open-telemetry/opent

import (
"errors"
"math"
"sort"

"github.com/prometheus/prometheus/prompb"
)

type batchTimeSeriesState struct {
// Track batch sizes sent to avoid over allocating huge buffers.
// This helps in the case where large batches are sent to avoid allocating too much unused memory
nextTimeSeriesBufferSize int
nextMetricMetadataBufferSize int
nextRequestBufferSize int
}

func newBatchTimeSericesState() batchTimeSeriesState {
return batchTimeSeriesState{
nextTimeSeriesBufferSize: math.MaxInt,
nextMetricMetadataBufferSize: math.MaxInt,
nextRequestBufferSize: 0,
}
}

// batchTimeSeries splits series into multiple batch write requests.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata, state *batchTimeSeriesState) ([]*prompb.WriteRequest, error) {
// batchTimeSeries splits series into multiple write requests if they exceed the maxBatchByteSize.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}

// Allocate a buffer size of at least 10, or twice the last # of requests we sent
requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize))
// Allocate a buffer size of at least 10.
requests := make([]*prompb.WriteRequest, 0, 10)

// Allocate a time series buffer 2x the last time series batch size or the length of the input if smaller
tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)))
// Allocate a time series buffer with the length of the input.
tsArray := make([]prompb.TimeSeries, 0, len(tsMap))
sizeOfCurrentBatch := 0

i := 0
for _, v := range tsMap {
sizeOfSeries := v.Size()
for _, series := range tsMap {
sizeOfSeries := series.Size()

if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray))
wrapped := convertTimeseriesToRequest(tsArray)
requests = append(requests, wrapped)

tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i))
tsArray = make([]prompb.TimeSeries, 0, len(tsMap)-i)
sizeOfCurrentBatch = 0
}

tsArray = append(tsArray, *v)
tsArray = append(tsArray, *series)
sizeOfCurrentBatch += sizeOfSeries
i++
}
Expand All @@ -63,23 +45,22 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
requests = append(requests, wrapped)
}

// Allocate a metric metadata buffer 2x the last metric metadata batch size or the length of the input if smaller
mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)))
// Allocate a metric metadata with the length of the input.
mArray := make([]prompb.MetricMetadata, 0, len(m))
sizeOfCurrentBatch = 0
i = 0
for _, v := range m {
sizeOfM := v.Size()
for _, metadata := range m {
sizeOfM := metadata.Size()

if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize {
state.nextMetricMetadataBufferSize = max(10, 2*len(mArray))
wrapped := convertMetadataToRequest(mArray)
requests = append(requests, wrapped)

mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)-i))
mArray = make([]prompb.MetricMetadata, 0, len(m)-i)
sizeOfCurrentBatch = 0
}

mArray = append(mArray, *v)
mArray = append(mArray, *metadata)
sizeOfCurrentBatch += sizeOfM
i++
}
Expand All @@ -89,7 +70,6 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
requests = append(requests, wrapped)
}

state.nextRequestBufferSize = 2 * len(requests)
return requests, nil
}

Expand Down
Loading
Loading