Skip to content

Commit

Permalink
Add test to show data race in batchTimeSeries()
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens committed Nov 25, 2024
1 parent 5698c9d commit 2052bd1
Showing 1 changed file with 138 additions and 0 deletions.
138 changes: 138 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewriteexporter

import (
"context"
"io"
"net/http"
"net/http/httptest"
"strconv"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
)

// Test everything works when there is more than one goroutine calling PushMetrics.
// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes.
func Test_PushMetricsConcurrent(t *testing.T) {
n := 1000
ms := make([]pmetric.Metrics, n)
testIDKey := "test_id"
for i := 0; i < n; i++ {
m := testdata.GenerateMetricsOneMetric()
dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints()
for j := 0; j < dps.Len(); j++ {
dp := dps.At(j)
dp.Attributes().PutInt(testIDKey, int64(i))
}
ms[i] = m
}
received := make(map[int]prompb.TimeSeries)
var mu sync.Mutex

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
assert.NotNil(t, body)
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version"))
assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
var unzipped []byte

dest, err := snappy.Decode(unzipped, body)
assert.NoError(t, err)

wr := &prompb.WriteRequest{}
ok := proto.Unmarshal(dest, wr)
assert.NoError(t, ok)
assert.Len(t, wr.Timeseries, 2)
ts := wr.Timeseries[0]
foundLabel := false
for _, label := range ts.Labels {
if label.Name == testIDKey {
id, err := strconv.Atoi(label.Value)
assert.NoError(t, err)
mu.Lock()
_, ok := received[id]
assert.False(t, ok) // fail if we already saw it
received[id] = ts
mu.Unlock()
foundLabel = true
break
}
}
assert.True(t, foundLabel)
w.WriteHeader(http.StatusOK)
}))

defer server.Close()

// Adjusted retry settings for faster testing
retrySettings := configretry.BackOffConfig{
Enabled: true,
InitialInterval: 100 * time.Millisecond, // Shorter initial interval
MaxInterval: 1 * time.Second, // Shorter max interval
MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
}
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = server.URL
clientConfig.ReadBufferSize = 0
clientConfig.WriteBufferSize = 512 * 1024
cfg := &Config{
Namespace: "",
ClientConfig: clientConfig,
MaxBatchSizeBytes: 3000000,
RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1},
TargetInfo: &TargetInfo{
Enabled: true,
},
CreatedMetric: &CreatedMetric{
Enabled: false,
},
BackOffConfig: retrySettings,
}

assert.NotNil(t, cfg)
set := exportertest.NewNopSettings()
set.MetricsLevel = configtelemetry.LevelBasic

prwe, nErr := newPRWExporter(cfg, set)

require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(t, prwe.Shutdown(ctx))
}()

var wg sync.WaitGroup
wg.Add(n)
for _, m := range ms {
go func() {
err := prwe.PushMetrics(ctx, m)
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
assert.Len(t, received, n)
}

0 comments on commit 2052bd1

Please sign in to comment.