diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index 9ea1e8d40a2..a434fe8fb93 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -51,13 +51,14 @@ func NewLogs( span.AddEvent("Start processing.", eventOptions) recordsIn := ld.LogRecordCount() - ld, err = logsFunc(ctx, ld) + var errFunc error + ld, errFunc = logsFunc(ctx, ld) span.AddEvent("End processing.", eventOptions) - if err != nil { - if errors.Is(err, ErrSkipProcessingData) { + if errFunc != nil { + if errors.Is(errFunc, ErrSkipProcessingData) { return nil } - return err + return errFunc } recordsOut := ld.LogRecordCount() obs.recordInOut(ctx, recordsIn, recordsOut) diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index e39d5cac2b9..0e94feb2f29 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -6,6 +6,7 @@ package processorhelper import ( "context" "errors" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -70,6 +71,37 @@ func newTestLProcessor(retError error) ProcessLogsFunc { } } +func TestLogsConcurrency(t *testing.T) { + logsFunc := func(_ context.Context, ld plog.Logs) (plog.Logs, error) { + return ld, nil + } + + incomingLogs := plog.NewLogs() + incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + + // Add 3 records to the incoming + incomingLogRecords.AppendEmpty() + incomingLogRecords.AppendEmpty() + incomingLogRecords.AppendEmpty() + + lp, err := NewLogs(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), logsFunc) + require.NoError(t, err) + assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10000; j++ { + assert.NoError(t, lp.ConsumeLogs(context.Background(), incomingLogs)) + } + }() + } + wg.Wait() + assert.NoError(t, lp.Shutdown(context.Background())) +} + func TestLogs_RecordInOut(t *testing.T) { // Regardless of how many logs are ingested, emit just one mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) { diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index af8a24dd193..c8381fd9589 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -51,13 +51,14 @@ func NewMetrics( span.AddEvent("Start processing.", eventOptions) pointsIn := md.DataPointCount() - md, err = metricsFunc(ctx, md) + var errFunc error + md, errFunc = metricsFunc(ctx, md) span.AddEvent("End processing.", eventOptions) - if err != nil { - if errors.Is(err, ErrSkipProcessingData) { + if errFunc != nil { + if errors.Is(errFunc, ErrSkipProcessingData) { return nil } - return err + return errFunc } pointsOut := md.DataPointCount() obs.recordInOut(ctx, pointsIn, pointsOut) diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index 960e0c74f07..0bea27c2c22 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -6,6 +6,7 @@ package processorhelper import ( "context" "errors" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -70,6 +71,36 @@ func newTestMProcessor(retError error) ProcessMetricsFunc { } } +func TestMetricsConcurrency(t *testing.T) { + metricsFunc := func(_ context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { + return md, nil + } + + incomingMetrics := pmetric.NewMetrics() + dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints() + + // Add 2 data points to the incoming + dps.AppendEmpty() + dps.AppendEmpty() + + mp, err := NewMetrics(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), metricsFunc) + require.NoError(t, err) + assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10000; j++ { + assert.NoError(t, mp.ConsumeMetrics(context.Background(), incomingMetrics)) + } + }() + } + wg.Wait() + assert.NoError(t, mp.Shutdown(context.Background())) +} + func TestMetrics_RecordInOut(t *testing.T) { // Regardless of how many data points are ingested, emit 3 mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) { diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index 0891751c7a4..492634541cc 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -51,13 +51,14 @@ func NewTraces( span.AddEvent("Start processing.", eventOptions) spansIn := td.SpanCount() - td, err = tracesFunc(ctx, td) + var errFunc error + td, errFunc = tracesFunc(ctx, td) span.AddEvent("End processing.", eventOptions) - if err != nil { - if errors.Is(err, ErrSkipProcessingData) { + if errFunc != nil { + if errors.Is(errFunc, ErrSkipProcessingData) { return nil } - return err + return errFunc } spansOut := td.SpanCount() obs.recordInOut(ctx, spansIn, spansOut) diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index db2df7045e6..ecc4f3c9184 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -6,6 +6,7 @@ package processorhelper import ( "context" "errors" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -70,6 +71,38 @@ func newTestTProcessor(retError error) ProcessTracesFunc { } } +func TestTracesConcurrency(t *testing.T) { + tracesFunc := func(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { + return td, nil + } + + incomingTraces := ptrace.NewTraces() + incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans() + + // Add 4 records to the incoming + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + + mp, err := NewTraces(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), tracesFunc) + require.NoError(t, err) + assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 10000; j++ { + assert.NoError(t, mp.ConsumeTraces(context.Background(), incomingTraces)) + } + }() + } + wg.Wait() + assert.NoError(t, mp.Shutdown(context.Background())) +} + func TestTraces_RecordInOut(t *testing.T) { // Regardless of how many spans are ingested, emit just one mockAggregate := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) {