From de80d203ed563808c942cbb033656bf5e669de44 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 3 Oct 2024 23:32:57 -0700 Subject: [PATCH] Fix data race condition, concurrent writes to the err variable, causes Undefined Behavior (#11349) The main issue is that after https://github.com/open-telemetry/opentelemetry-collector/pull/10910 the err variable is shared between requests because it uses the same address as the err defined outside the func. This is an UB because we are overwriting memory and will cause crashes like https://github.com/open-telemetry/opentelemetry-collector/pull/11335, where the check for not nil happens then gets overwrite with nil and crashes. Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/11350 --------- Signed-off-by: Bogdan Drutu --- .chloggen/fix-ub-proc-helper.yaml | 20 ++++++++++++++ processor/processorhelper/logs.go | 9 ++++--- processor/processorhelper/logs_test.go | 32 ++++++++++++++++++++++ processor/processorhelper/metrics.go | 9 ++++--- processor/processorhelper/metrics_test.go | 31 +++++++++++++++++++++ processor/processorhelper/traces.go | 9 ++++--- processor/processorhelper/traces_test.go | 33 +++++++++++++++++++++++ 7 files changed, 131 insertions(+), 12 deletions(-) create mode 100644 .chloggen/fix-ub-proc-helper.yaml diff --git a/.chloggen/fix-ub-proc-helper.yaml b/.chloggen/fix-ub-proc-helper.yaml new file mode 100644 index 00000000000..cc70e62b9c2 --- /dev/null +++ b/.chloggen/fix-ub-proc-helper.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processorhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix data race condition, concurrent writes to the err variable, causes UB (Undefined Behavior) + +# One or more tracking issues or pull requests related to the change +issues: [11350] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index 9ea1e8d40a2..a434fe8fb93 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -51,13 +51,14 @@ func NewLogs( span.AddEvent("Start processing.", eventOptions) recordsIn := ld.LogRecordCount() - ld, err = logsFunc(ctx, ld) + var errFunc error + ld, errFunc = logsFunc(ctx, ld) span.AddEvent("End processing.", eventOptions) - if err != nil { - if errors.Is(err, ErrSkipProcessingData) { + if errFunc != nil { + if errors.Is(errFunc, ErrSkipProcessingData) { return nil } - return err + return errFunc } recordsOut := ld.LogRecordCount() obs.recordInOut(ctx, recordsIn, recordsOut) diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index e39d5cac2b9..0e94feb2f29 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -6,6 +6,7 @@ package processorhelper import ( "context" "errors" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -70,6 +71,37 @@ func newTestLProcessor(retError error) ProcessLogsFunc { } } +func TestLogsConcurrency(t *testing.T) { + logsFunc := func(_ context.Context, ld plog.Logs) (plog.Logs, error) { + return ld, nil + } + + incomingLogs := plog.NewLogs() + incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + + // Add 3 records to the incoming + incomingLogRecords.AppendEmpty() + incomingLogRecords.AppendEmpty() + incomingLogRecords.AppendEmpty() + + lp, err := NewLogs(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), logsFunc) + require.NoError(t, err) + assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10000; j++ { + assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs)) + } + }() + } + wg.Wait() + assert.NoError(t, lp.Shutdown(context.Background())) +} + func TestLogs_RecordInOut(t *testing.T) { // Regardless of how many logs are ingested, emit just one mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) { diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index af8a24dd193..c8381fd9589 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -51,13 +51,14 @@ func NewMetrics( span.AddEvent("Start processing.", eventOptions) pointsIn := md.DataPointCount() - md, err = metricsFunc(ctx, md) + var errFunc error + md, errFunc = metricsFunc(ctx, md) span.AddEvent("End processing.", eventOptions) - if err != nil { - if errors.Is(err, ErrSkipProcessingData) { + if errFunc != nil { + if errors.Is(errFunc, ErrSkipProcessingData) { return nil } - return err + return errFunc } pointsOut := md.DataPointCount() obs.recordInOut(ctx, pointsIn, pointsOut) diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index 960e0c74f07..0bea27c2c22 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -6,6 +6,7 @@ package processorhelper import ( "context" "errors" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -70,6 +71,36 @@ func newTestMProcessor(retError error) ProcessMetricsFunc { } } +func TestMetricsConcurrency(t *testing.T) { + metricsFunc := func(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { + return md, nil + } + + incomingMetrics := pmetric.NewMetrics() + dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints() + + // Add 2 data points to the incoming + dps.AppendEmpty() + dps.AppendEmpty() + + mp, err := NewMetrics(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), metricsFunc) + require.NoError(t, err) + assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10000; j++ { + assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics)) + } + }() + } + wg.Wait() + assert.NoError(t, mp.Shutdown(context.Background())) +} + func TestMetrics_RecordInOut(t *testing.T) { // Regardless of how many data points are ingested, emit 3 mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) { diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index 0891751c7a4..492634541cc 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -51,13 +51,14 @@ func NewTraces( span.AddEvent("Start processing.", eventOptions) spansIn := td.SpanCount() - td, err = tracesFunc(ctx, td) + var errFunc error + td, errFunc = tracesFunc(ctx, td) span.AddEvent("End processing.", eventOptions) - if err != nil { - if errors.Is(err, ErrSkipProcessingData) { + if errFunc != nil { + if errors.Is(errFunc, ErrSkipProcessingData) { return nil } - return err + return errFunc } spansOut := td.SpanCount() obs.recordInOut(ctx, spansIn, spansOut) diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index db2df7045e6..ecc4f3c9184 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -6,6 +6,7 @@ package processorhelper import ( "context" "errors" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -70,6 +71,38 @@ func newTestTProcessor(retError error) ProcessTracesFunc { } } +func TestTracesConcurrency(t *testing.T) { + tracesFunc := func(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { + return td, nil + } + + incomingTraces := ptrace.NewTraces() + incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans() + + // Add 4 records to the incoming + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + + mp, err := NewTraces(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), tracesFunc) + require.NoError(t, err) + assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10000; j++ { + assert.NoError(t, mp.ConsumeTraces(context.Background(), incomingTraces)) + } + }() + } + wg.Wait() + assert.NoError(t, mp.Shutdown(context.Background())) +} + func TestTraces_RecordInOut(t *testing.T) { // Regardless of how many spans are ingested, emit just one mockAggregate := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) {